Cornerstones for a new MapOSMatic daemon

This is the first step of the rewrite of the MapOSMatic rendering
daemon. The long-term objective is to get a more flexible rendering
daemon that would support the rendering of several jobs in parallel, and
with an overall more Python-y and maintainable code.

This first shot brings a completely new, simpler MapOSMatic daemon with
the same level of functionality as the previous daemon in terms of
serial rendering of the job queue. Three major changes happen here:

  1. the bash-based wrapper script has been removed, in favor of a more
     clever Python wrapper. Cleaner, and more importantly more portable.
     The wrapper still needs a bit of configuration, and after the
     config.py-template has been tweaked into a valid config.py file,
     the daemon can be started by:

       .../scripts/wrapper.py scripts/daemon.py

  2. the externalization of the rendering routine into a 'render'
     module. This module does not access the database - only the daemon
     does. The sole purpose of the render module is to encapsulate the
     rendering process and its errors+exceptions handling.

     It can also be used as a standalone, job-ID-based renderer:

       .../scripts/wrapper.py scripts/render.py <jobid>

  3. the cleanup mechanism now runs in a separate thread, in the
     background.

Signed-off-by: Maxime Petazzoni <maxime.petazzoni@bulix.org>
stable
Maxime Petazzoni 2010-01-17 10:17:01 +01:00
rodzic cf3ae043fe
commit 37ab334d9a
6 zmienionych plików z 369 dodań i 273 usunięć

Wyświetl plik

@ -0,0 +1,21 @@
#!/usr/bin/env python
# coding: utf-8
# Copy this file as 'config.py' and edit the following lines to match your
# installation.
# Path to your OCitySMap installation
OCITYSMAP_PATH = '/path/to/ocitysmap'
# Log file for MapOSMatic. Leave empty for stderr.
MAPOSMATIC_LOG = '/tmp/maposmaticd.log'
# Log level (lower is more verbose)
# 50: critical
# 40: error
# 30: warning
# 20: info
# 10: debug
# 0: not set (discouraged)
MAPOSMATIC_LVL = 20

228
scripts/daemon.py 100755
Wyświetl plik

@ -0,0 +1,228 @@
#!/usr/bin/python
# coding: utf-8
# maposmatic, the web front-end of the MapOSMatic city map generation system
# Copyright (C) 2009 David Decotigny
# Copyright (C) 2009 Frédéric Lehobey
# Copyright (C) 2009 David Mentré
# Copyright (C) 2009 Maxime Petazzoni
# Copyright (C) 2009 Thomas Petazzoni
# Copyright (C) 2009 Gaël Utard
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import time
import sys
import threading
import render
from www.maposmatic.models import MapRenderingJob
from www.settings import LOG
from www.settings import RENDERING_RESULT_PATH, RENDERING_RESULT_MAX_SIZE_GB
RESULT_SUCCESSFULL = 'ok'
RESULT_INTERRUPTED = 'rendering interrupted'
RESULT_FAILED = 'rendering failed'
RESULT_CANCELED = 'rendering took too long, canceled'
class MapOSMaticDaemon:
"""
This is a basic rendering daemon, base class for the different
implementations of rendering scheduling. By default, it acts as a
standalone, single-process MapOSMatic rendering daemon.
"""
def __init__(self, frequency):
self.frequency = 10
LOG.info("MapOSMatic rendering daemon started.")
self.rollback_orphaned_jobs()
def rollback_orphaned_jobs(self):
"""Reset all jobs left in the "rendering" state back to the "waiting"
state to process them correctly."""
MapRenderingJob.objects.filter(status=1).update(status=0)
def serve(self):
"""Implement a basic service loop, looking every self.frequency seconds
for a new job to render and dispatch it if one's available. This method
can of course be overloaded by subclasses of MapOSMaticDaemon depending
on their needs."""
while True:
try:
job = MapRenderingJob.objects.to_render()[0]
self.dispatch(job)
except IndexError:
try:
time.sleep(self.frequency)
except KeyboardInterrupt:
break
LOG.info("MapOSMatic rendering daemon terminating.")
def dispatch(self, job):
"""Dispatch the given job. In this simple single-process daemon, this
is as simple as rendering it."""
self.render(job)
def render(self, job):
"""Render the given job, handling the different rendering outcomes
(success or failures)."""
LOG.info("Rendering job #%d '%s'..." %
(job.id, job.maptitle))
job.start_rendering()
ret = render.render_job(job)
if ret == 0:
msg = RESULT_SUCCESSFULL
LOG.info("Finished rendering of job #%d." % job.id)
elif ret == 1:
msg = RESULT_INTERRUPTED
LOG.info("Rendering of job #%d interrupted!" % job.id)
else:
msg = RESULT_FAILED
LOG.info("Rendering of job #%d failed (exception occurred)!" %
job.id)
job.end_rendering(msg)
class RenderingsGarbageCollector(threading.Thread):
"""
A garbage collector thread that removes old rendering from
RENDERING_RESULT_PATH when the total size of the directory goes about 80%
of RENDERING_RESULT_MAX_SIZE_GB.
"""
def __init__(self, frequency=20):
threading.Thread.__init__(self)
self.frequency = frequency
self.setDaemon(True)
def run(self):
"""Run the main garbage collector thread loop, cleaning files every
self.frequency seconds until the program is stopped."""
LOG.info("Cleanup thread started.")
while True:
self.cleanup()
time.sleep(self.frequency)
def get_file_info(self, path):
"""Returns a dictionary of information on the given file.
Args:
path (string): the full path to the file.
Returns a dictionary containing:
* name: the file base name;
* path: its full path;
* size: its size;
* time: the last time the file contents were changed."""
s = os.stat(path)
return {'name': os.path.basename(path),
'path': path,
'size': s.st_size,
'time': s.st_mtime}
def get_formatted_value(self, value):
"""Returns the given value in bytes formatted for display, with its
unit."""
return '%.1f MiB' % (value/1024.0/1024.0)
def get_formatted_details(self, saved, size, threshold):
"""Returns the given saved space, size and threshold details, formatted
for display by get_formatted_value()."""
return 'saved %s, now %s/%s' % \
(self.get_formatted_value(saved),
self.get_formatted_value(size),
self.get_formatted_value(threshold))
def cleanup(self):
"""Run one iteration of the cleanup loop. A sorted list of files from
the renderings directory is first created, oldest files last. Files are
then pop()-ed out of the list and removed by cleanup_files() until
we're back below the size threshold."""
files = map(lambda f: self.get_file_info(f),
[os.path.join(RENDERING_RESULT_PATH, f)
for f in os.listdir(RENDERING_RESULT_PATH)
if not f.startswith('.')])
# Compute the total size occupied by the renderings, and the actual 80%
# threshold, in bytes.
size = reduce(lambda x,y: x+y['size'], files, 0)
threshold = 0.8 * RENDERING_RESULT_MAX_SIZE_GB * 1024 * 1024 * 1024
# Stop here if we are below the threshold
if size < threshold:
return
LOG.info("%s consumed for a %s threshold. Cleaning..." %
(self.get_formatted_value(size),
self.get_formatted_value(threshold)))
# Sort files by timestamp, oldest last, and start removing them by
# pop()-ing the list.
files.sort(lambda x,y: cmp(y['time'], x['time']))
while size > threshold:
if not len(files):
LOG.error("No files to remove and still above threshold! "
"Something's wrong!")
return
f = files.pop()
LOG.debug("Considering file %s..." % f['name'])
job = MapRenderingJob.objects.get_by_filename(f['name'])
if job:
LOG.debug("Found matching parent job #%d." % job.id)
removed, saved = job.remove_all_files()
size -= saved
if removed:
LOG.info("Removed %d files for job #%d (%s)." %
(removed, job.id,
self.get_formatted_details(saved, size,
threshold)))
else:
# If we didn't find a parent job, it means this is an orphaned
# file, we can safely remove it to get back some disk space.
LOG.debug("No parent job found.")
os.remove(f['path'])
size -= f['size']
LOG.info("Removed orphan file %s (%s)." %
(f['name'], self.get_formatted_details(f['size'],
size,
threshold)))
if __name__ == '__main__':
if (not os.path.exists(RENDERING_RESULT_PATH)
or not os.path.isdir(RENDERING_RESULT_PATH)):
LOG.error("%s does not exist or is not a directory! "
"Please use a valid RENDERING_RESULT_PATH.")
sys.exit(1)
daemon = MapOSMaticDaemon(10)
cleaner = RenderingsGarbageCollector(20)
cleaner.start()
daemon.serve()

Wyświetl plik

@ -1,246 +0,0 @@
#!/usr/bin/python
# coding: utf-8
# maposmatic, the web front-end of the MapOSMatic city map generation system
# Copyright (C) 2009 David Decotigny
# Copyright (C) 2009 Frédéric Lehobey
# Copyright (C) 2009 David Mentré
# Copyright (C) 2009 Maxime Petazzoni
# Copyright (C) 2009 Thomas Petazzoni
# Copyright (C) 2009 Gaël Utard
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import time, os , sys, select, signal, traceback, logging
from datetime import datetime, timedelta
from www.settings import RENDERING_RESULT_PATH, LOG, RENDERING_RESULT_FORMATS, RENDERING_RESULT_MAX_SIZE_GB, OCITYSMAP_CFG_PATH
from www.maposmatic.models import MapRenderingJob
from ocitysmap.coords import BoundingBox as OCMBoundingBox
from ocitysmap.street_index import OCitySMap
import Image
def sigcld_handler(signum, frame, pipe_write):
f = os.fdopen(pipe_write, 'w')
f.write("end")
def render_job_process(job):
prefix = 'maposmaticd_%d_' % os.getpid()
if job.administrative_osmid is None:
bbox = OCMBoundingBox(job.lat_upper_left, job.lon_upper_left,
job.lat_bottom_right, job.lon_bottom_right)
renderer = OCitySMap(config_file=OCITYSMAP_CFG_PATH, map_areas_prefix=prefix,
boundingbox=bbox, language=job.map_language)
else:
renderer = OCitySMap(config_file=OCITYSMAP_CFG_PATH, map_areas_prefix=prefix,
osmid=job.administrative_osmid, language=job.map_language)
outfile_prefix = os.path.join(RENDERING_RESULT_PATH, job.files_prefix())
_map = renderer.render_map_into_files(job.maptitle, outfile_prefix,
RENDERING_RESULT_FORMATS, "zoom:16")
renderer.render_index(job.maptitle, outfile_prefix,
RENDERING_RESULT_FORMATS, _map.width, _map.height)
if "png" in RENDERING_RESULT_FORMATS:
mapimg = outfile_prefix + ".png"
i = Image.open(mapimg)
i.thumbnail((200,200), Image.ANTIALIAS)
i.save(outfile_prefix + "_small.png")
return 0
def render_job(job):
LOG.info("[job %d] starting rendering, title '%s'" \
% (job.id, job.maptitle))
job.start_rendering()
(pipe_read, pipe_write) = os.pipe()
pid = os.fork()
if pid == 0:
# Son
tell_dad = os.fdopen(pipe_write, 'w')
os.close(pipe_read)
retval = 1
try:
retval = render_job_process(job)
except KeyboardInterrupt:
# Catch Ctrl-C ~ gracefully
tell_dad.write('Ctrl-C pressed. Bailing out.')
except SystemExit, rv:
# Pass-through any sys.exit() done from deep inside
retval = rv
except:
# Tell the father what happened
LOG.exception("Exception in worker process")
tell_dad.write('Exception occured')
finally:
# And always return the proper exit code
sys.exit(retval)
else:
# Father
signal.signal(signal.SIGCHLD,
lambda signal, frame: sigcld_handler(signal, frame,
pipe_write))
LOG.debug("start of process %d" % pid)
# Don't close pipe_write here because the sigcld handler depends on it
child_message = ""
try:
(rlist, wlist, xlist) = select.select([pipe_read], [], [], 20*60)
if pipe_read in rlist:
try:
child_endpoint = os.fdopen(pipe_read, 'r')
child_message = child_endpoint.read()
except Exception:
child_message = "(Could not retrieve error details)"
traceback.print_exc() # Dump this on stderr too
else:
# Ignore exceptions when closing the pipe (child endpoint
# already closed, etc.)
try:
child_endpoint.close()
except:
pass
return
elif rlist == [] and wlist == [] and xlist == []:
os.kill(pid, signal.SIGTERM)
time.sleep(2)
os.kill(pid, signal.SIGKILL)
resultmsg = "rendering took too long, killed"
LOG.info("[job %d] %s" % (job.id, resultmsg))
job.end_rendering(resultmsg)
return
finally:
LOG.debug("end of process %d" % pid)
for fd in (pipe_read, pipe_write):
try:
os.close(fd)
except OSError:
pass
(pid, status) = os.waitpid(pid, 0)
resultmsg = "unknown error"
if os.WIFEXITED(status):
error_code = os.WEXITSTATUS(status)
if error_code == 0:
resultmsg = "ok"
else:
resultmsg = "rendering failed with %d" % error_code
LOG.error("Failure in rendering child process: %s." \
% child_message)
elif os.WIFSIGNALED(status):
resultmsg = "rendering killed by signal %d" \
% os.WTERMSIG(status)
LOG.info("[job %d] %s" % (job.id, resultmsg))
job.end_rendering(resultmsg)
return
def cleanup_files():
"""This cleanup function checks that the total size of the files in
RENDERING_RESULT_PATH does not exceed 80% of the defined threshold
RENDERING_RESULT_MAX_SIZE_GB. If it does, files are removed until the
constraint is met again, oldest first, and grouped by job."""
def get_formatted_value(v):
return '%.2f MiB' % (v/1024.0/1024.0)
def get_formatted_details(saved, size, threshold):
return 'saved %s, now %s/%s' % \
(get_formatted_value(saved),
get_formatted_value(size),
get_formatted_value(threshold))
files = [os.path.join(RENDERING_RESULT_PATH, f)
for f in os.listdir(RENDERING_RESULT_PATH)
if not (f.startswith('.') or f.endswith('_small.png'))]
files = map(lambda f: (f, os.stat(f).st_mtime, os.stat(f).st_size), files)
# Compute the total size occupied by the renderings, and the actual 80%
# threshold, in bytes
size = reduce(lambda x, y: x + y[2], files, 0)
threshold = 0.8 * RENDERING_RESULT_MAX_SIZE_GB * 1024 * 1024 * 1024
# Stop here if we are below the threshold
if size < threshold:
return
LOG.info("%s consumed for a %s threshold. Cleaning..." %
(get_formatted_value(size), get_formatted_value(threshold)))
# Sort files by timestamp, oldest last, and start removing them by
# pop()-ing the list
files.sort(lambda x, y: cmp(y[1], x[1]))
while size > threshold:
if not len(files):
LOG.error("No files to remove and still above threshold! Something's wrong!")
return
# Get the next file to remove, and try to identify the job it comes
# from
f = files.pop()
name = os.path.basename(f[0])
job = MapRenderingJob.objects.get_by_filename(name)
if job:
removed, saved = job.remove_all_files()
size -= saved
# If files were removed, log it. If not, it only means only the
# thumbnail remained, and that's good.
if removed:
LOG.info("Removed %d files from job #%d (%s)." %
(removed, job.id, get_formatted_details(saved, size, threshold)))
else:
# If we didn't find a parent job, it means this is an orphaned
# file, and we can safely remove it to get back some disk space.
os.remove(f[0])
saved = f[2]
size -= saved
LOG.info("Removed orphan file %s (%s)." %
(name, get_formatted_details(saved, size, threshold)))
if not os.path.isdir(RENDERING_RESULT_PATH):
LOG.error("ERROR: please set RENDERING_RESULT_PATH ('%s') to an existing directory" % \
RENDERING_RESULT_PATH)
sys.exit(1)
LOG.info("started")
# Reset the job that might have been left into the "rendering" state
# due to a daemon interruption back into the "waiting for rendering"
# state
jobs = MapRenderingJob.objects.filter(status=1)
for job in jobs:
LOG.debug("reset job %d into waiting for rendering state" % job.id)
job.status = 0
job.save()
last_file_cleanup = None
while True:
# Test each 20 seconds if we need to cleanup files
if not last_file_cleanup or last_file_cleanup < (datetime.now() - timedelta(0, 20)):
cleanup_files()
last_file_cleanup = datetime.now()
jobs = MapRenderingJob.objects.to_render()
if not jobs:
time.sleep(10)
else:
for job in jobs:
render_job(job)

105
scripts/render.py 100755
Wyświetl plik

@ -0,0 +1,105 @@
#!/usr/bin/python
# coding: utf-8
# maposmatic, the web front-end of the MapOSMatic city map generation system
# Copyright (C) 2009 David Decotigny
# Copyright (C) 2009 Frédéric Lehobey
# Copyright (C) 2009 David Mentré
# Copyright (C) 2009 Maxime Petazzoni
# Copyright (C) 2009 Thomas Petazzoni
# Copyright (C) 2009 Gaël Utard
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import Image
import os
import sys
from ocitysmap.coords import BoundingBox
from ocitysmap.street_index import OCitySMap
from www.maposmatic.models import MapRenderingJob
from www.settings import RENDERING_RESULT_PATH, RENDERING_RESULT_FORMATS
from www.settings import OCITYSMAP_CFG_PATH
def render_job(job, prefix=None):
"""Renders the given job, encapsulating all processing errors and
exceptions.
This does not affect the job entry in the database in any way. It's the
responsibility of the caller to do maintain the job status in the database.
Returns:
* 0 on success;
* 1 on ^C;
* 2 on a rendering exception from OCitySMap.
"""
if job.administrative_city is None:
bbox = BoundingBox(job.lat_upper_left, job.lon_upper_left,
job.lat_bottom_right, job.lon_bottom_right)
renderer = OCitySMap(config_file=OCITYSMAP_CFG_PATH,
map_areas_prefix=prefix,
boundingbox=bbox,
language=job.map_language)
else:
renderer = OCitySMap(config_file=OCITYSMAP_CFG_PATH,
map_areas_prefix=prefix,
osmid=job.administrative_osmid,
language=job.map_language)
prefix = os.path.join(RENDERING_RESULT_PATH, job.files_prefix())
try:
# Render the map in all RENDERING_RESULT_FORMATS
result = renderer.render_map_into_files(job.maptitle, prefix,
RENDERING_RESULT_FORMATS,
'zoom:16')
# Render the index in all RENDERING_RESULT_FORMATS, using the
# same map size.
renderer.render_index(job.maptitle, prefix, RENDERING_RESULT_FORMATS,
result.width, result.height)
# Create thumbnail
if 'png' in RENDERING_RESULT_FORMATS:
img = Image.open(prefix + '.png')
img.thumbnail((200, 200), Image.ANTIALIAS)
img.save(prefix + '_small.png')
return 0
except KeyboardInterrupt:
return 1
except:
return 2
if __name__ == '__main__':
def usage():
sys.stderr.write('usage: %s <jobid>' % sys.argv[0])
if len(sys.argv) != 2:
usage()
sys.exit(3)
try:
jobid = int(sys.argv[1])
job = MapRenderingJob.objects.get(id=jobid)
if job:
sys.exit(render_job(job, 'renderer_%d' % os.getpid()))
else:
sys.stderr.write('Job #%d not found!' % jobid)
sys.exit(4)
except ValueError:
usage()
sys.exit(3)

Wyświetl plik

@ -1,4 +1,5 @@
#! /bin/sh
#!/usr/bin/env python
# coding: utf-8
# maposmatic, the web front-end of the MapOSMatic city map generation system
# Copyright (C) 2009 David Decotigny
@ -21,32 +22,19 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
_here=`dirname "$0"`
_pydir=`cd "$_here"/.. && /bin/pwd`
import os
import sys
PYTHONPATH="$PYTHONPATH:/path/to/ocitysmap:$_pydir"
export PYTHONPATH
from config import *
DJANGO_SETTINGS_MODULE="www.settings"
export DJANGO_SETTINGS_MODULE
if __name__ == '__main__':
here = os.path.dirname(os.path.abspath(__file__))
root = os.path.abspath(os.path.join(here, '..'))
# Set this to the empty string for stderr
MAPOSMATIC_LOG_FILE="/tmp/maposmaticd.log"
export MAPOSMATIC_LOG_FILE
os.environ['PYTHONPATH'] = '%s:%s:%s' % (OCITYSMAP_PATH, root,
os.environ.get('PYTHONPATH', ''))
os.environ['DJANGO_SETTINGS_MODULE'] = 'www.settings'
os.environ['MAPOSMATIC_LOG_FILE'] = MAPOSMATIC_LOG
os.environ['MAPOSMATIC_LOG_LEVEL'] = str(MAPOSMATIC_LVL)
### Log level (the higher, the quieter)
# Critical: 50
# Error: 40
# Warning: 30
# Info: 20
# Debug: 10
# NotSet: 0 (discouraged)
MAPOSMATIC_LOG_LEVEL=20
export MAPOSMATIC_LOG_LEVEL
# To make sure ocitysmap + maposmatic are configured the same way: that
# waym the logs for ocitysmap will go to the maposmatic logger
MAPOSMATIC_LOG_TARGET="ocitysmap"
export MAPOSMATIC_LOG_TARGET
exec "$_here"/maposmaticd
os.execv(os.path.join(root, sys.argv[1]), sys.argv[1:])

Wyświetl plik

@ -61,7 +61,7 @@ class MapRenderingJobManager(models.Manager):
job = MapRenderingJob.objects.get(id=jobid)
if name.startswith(job.files_prefix()):
return job
except (ValueError, IndexError):
except (ValueError, IndexError, MapRenderingJob.DoesNotExist):
pass
return None