#!/usr/bin/python # coding: utf-8 # maposmatic, the web front-end of the MapOSMatic city map generation system # Copyright (C) 2009 David Decotigny # Copyright (C) 2009 Frédéric Lehobey # Copyright (C) 2009 David Mentré # Copyright (C) 2009 Maxime Petazzoni # Copyright (C) 2009 Thomas Petazzoni # Copyright (C) 2009 Gaël Utard # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import ctypes import datetime from PIL import Image import logging import multiprocessing import os import smtplib import sys import threading import traceback import subprocess import ocitysmap from ocitysmap import renderers from www.maposmatic.models import MapRenderingJob from www.settings import ADMINS, OCITYSMAP_CFG_PATH, MEDIA_ROOT from www.settings import RENDERING_RESULT_PATH, RENDERING_RESULT_FORMATS from www.settings import DAEMON_ERRORS_SMTP_HOST, DAEMON_ERRORS_SMTP_PORT from www.settings import DAEMON_ERRORS_SMTP_ENCRYPT from www.settings import DAEMON_ERRORS_SMTP_USER, DAEMON_ERRORS_SMTP_PASSWORD from www.settings import DAEMON_ERRORS_EMAIL_FROM from www.settings import DAEMON_ERRORS_EMAIL_REPLY_TO from www.settings import DAEMON_ERRORS_JOB_URL from django.template import Context, Template from www.settings import TEMPLATES from django.template.loader import render_to_string, get_template RESULT_SUCCESS = 0 RESULT_KEYBOARD_INTERRUPT = 1 RESULT_PREPARATION_EXCEPTION = 2 RESULT_RENDERING_EXCEPTION = 3 RESULT_TIMEOUT_REACHED = 4 RESULT_MEMORY_EXCEEDED = 5 THUMBNAIL_SUFFIX = '_small.png' LOG = logging.getLogger('maposmatic') class ThreadingJobRenderer: """ The ThreadingJobRenderer is a wrapper around a JobRendered thread that implements timeout management. If the timeout is reached, the thread is suspended, cleaned up and killed. """ def __init__(self, job, timeout=3600, prefix=None): """Initializes this ThreadingJobRenderer with a given job and a timeout. Args: job (MapRenderingJob): the job to render. timeout (int): a timeout, in seconds (defaults to 20 minutes). prefix (string): renderer map_areas table prefix. """ self.__job = job self.__timeout = timeout self.__thread = JobRenderer(job, prefix) def _email_timeout(self): """Send a notification about timeouts to the request submitter""" if not DAEMON_ERRORS_SMTP_HOST or not self.__job.submittermail: return try: LOG.info("Emailing timeout message to %s via %s:%d..." % (self.__job.submittermail, DAEMON_ERRORS_SMTP_HOST, DAEMON_ERRORS_SMTP_PORT)) if DAEMON_ERRORS_SMTP_ENCRYPT == "SSL": mailer = smtplib.SMTP_SSL() else: mailer = smtplib.SMTP() mailer.connect(DAEMON_ERRORS_SMTP_HOST, DAEMON_ERRORS_SMTP_PORT) if DAEMON_ERRORS_SMTP_ENCRYPT == "TLS": mailer.starttls() if DAEMON_ERRORS_SMTP_USER and DAEMON_ERRORS_SMTP_PASSWORD: mailer.login(DAEMON_ERRORS_SMTP_USER, DAEMON_ERRORS_SMTP_PASSWORD) template = get_template("render_email_timeout.txt") context = { 'from': DAEMON_ERRORS_EMAIL_FROM, 'replyto': DAEMON_ERRORS_EMAIL_REPLY_TO, 'to': self.__job.submittermail, 'jobid': self.__job.id, 'date': datetime.datetime.now().strftime('%a, %d %b %Y %H:%M:%S +0200 (CEST)'), 'url': DAEMON_ERRORS_JOB_URL % self.__job.id, 'title': self.__job.maptitle, 'timeout': self.__timeout / 60 } msg = template.render(context) mailer.sendmail(DAEMON_ERRORS_EMAIL_FROM, [admin[1] for admin in ADMINS], msg) LOG.info("Email notification sent.") except Exception as e: LOG.exception("Could not send notification email to the submitter!") def run(self): """Renders the job using a JobRendered, encapsulating all processing errors and exceptions, with the addition here of a processing timeout. Returns one of the RESULT_ constants. """ LOG.info("Timeout is %d" % self.__timeout) self.__thread.start() self.__thread.join(self.__timeout) # If the thread is no longer alive, the timeout was not reached and all # is well. if not self.__thread.isAlive(): if self.__thread.result != 0: self.__job.remove_all_files() return self.__thread.result LOG.info("Rendering of job #%d took too long (timeout reached)!" % self.__job.id) # Kill the thread, clean up and return TIMEOUT_REACHED self.__thread.kill() del self.__thread # Remove the job files self.__job.remove_all_files() self._email_timeout() LOG.debug("Worker removed.") return RESULT_TIMEOUT_REACHED class ForkingJobRenderer: def __init__(self, job, timeout=3600, prefix=None): self.__job = job self.__timeout = timeout self.__renderer = JobRenderer(job, prefix) self.__process = multiprocessing.Process(target=self._wrap) def _email_timeout(self): """Send a notification about timeouts to the request submitter""" if not DAEMON_ERRORS_SMTP_HOST or not self.__job.submittermail: return try: LOG.info("Emailing timeout message to %s via %s:%d..." % (self.__job.submittermail, DAEMON_ERRORS_SMTP_HOST, DAEMON_ERRORS_SMTP_PORT)) if DAEMON_ERRORS_SMTP_ENCRYPT == "SSL": mailer = smtplib.SMTP_SSL() else: mailer = smtplib.SMTP() mailer.connect(DAEMON_ERRORS_SMTP_HOST, DAEMON_ERRORS_SMTP_PORT) if DAEMON_ERRORS_SMTP_ENCRYPT == "TLS": mailer.starttls() if DAEMON_ERRORS_SMTP_USER and DAEMON_ERRORS_SMTP_PASSWORD: mailer.login(DAEMON_ERRORS_SMTP_USER, DAEMON_ERRORS_SMTP_PASSWORD) template = get_template("render_email_timeout.txt") context = { 'from': DAEMON_ERRORS_EMAIL_FROM, 'replyto': DAEMON_ERRORS_EMAIL_REPLY_TO, 'to': self.__job.submittermail, 'jobid': self.__job.id, 'date': datetime.datetime.now().strftime('%a, %d %b %Y %H:%M:%S +0200 (CEST)'), 'url': DAEMON_ERRORS_JOB_URL % self.__job.id, 'title': self.__job.maptitle, 'timeout': self.__timeout / 60 } msg = template.render(context) mailer.sendmail(DAEMON_ERRORS_EMAIL_FROM, [admin[1] for admin in ADMINS], msg) LOG.info("Email notification sent.") except Exception as e: LOG.exception("Could not send notification email to the submitter!") def run(self): self.__process.start() self.__process.join(self.__timeout) # If the process is no longer alive, the timeout was not reached and # all is well. if not self.__process.is_alive(): if self.__process.exitcode != 0: self.__job.remove_all_files() # If the exit code is < 0, it means the subprocess was terminated # abnormaly (by signal). In this situation, we need to report a # rendering exception. if self.__process.exitcode >= 0: return self.__process.exitcode return RESULT_RENDERING_EXCEPTION LOG.info("Rendering of job #%d took too long (timeout reached)!" % self.__job.id) # Kill the process, clean up and return TIMEOUT_REACHED self.__process.terminate() del self.__process # Remove job files self.__job.remove_all_files() self._email_timeout() LOG.debug("Process terminated.") return RESULT_TIMEOUT_REACHED def _wrap(self): sys.exit(self.__renderer.run()) class JobRenderer(threading.Thread): """ A simple, blocking job renderer. Can be used as a thread. """ def __init__(self, job, prefix): threading.Thread.__init__(self, name='renderer-%d' % job.id) self.job = job self.prefix = prefix self.result = None def __get_my_tid(self): if not self.isAlive(): raise threading.ThreadError("the thread is not active") # Do we have it cached? if hasattr(self, '__thread_id'): return self.__thread_id # If not, look for it for tid, tobj in threading._active.items(): if tobj is self: self.__thread_id = tid return self.__thread_id raise AssertionError("Could not resolve the thread's ID") def kill(self): LOG.debug("Killing job #%d's worker thread..." % self.job.id) res = ctypes.pythonapi.PyThreadState_SetAsyncExc(self.__get_my_tid(), ctypes.py_object(SystemExit)) if res == 0: raise ValueError("Invalid thread ID") elif res != 1: ctypes.pythonapi.PyThreadState_SetAsyncExc(self.__get_my_tid(), 0) raise SystemError("PyThreadState_SetAsync failed") def _email_submitter(self, template_name): """Send a notification with status and result URL to the request submitter""" if not DAEMON_ERRORS_SMTP_HOST or not self.job.submittermail: return try: LOG.info("Emailing success/failure message to %s via %s:%d..." % (self.job.submittermail, DAEMON_ERRORS_SMTP_HOST, DAEMON_ERRORS_SMTP_PORT)) if DAEMON_ERRORS_SMTP_ENCRYPT == "SSL": mailer = smtplib.SMTP_SSL(DAEMON_ERRORS_SMTP_HOST) else: mailer = smtplib.SMTP(DAEMON_ERRORS_SMTP_HOST) mailer.connect(DAEMON_ERRORS_SMTP_HOST, DAEMON_ERRORS_SMTP_PORT) if DAEMON_ERRORS_SMTP_ENCRYPT == "TLS": mailer.starttls() if DAEMON_ERRORS_SMTP_USER and DAEMON_ERRORS_SMTP_PASSWORD: mailer.login(DAEMON_ERRORS_SMTP_USER, DAEMON_ERRORS_SMTP_PASSWORD) template = get_template(template_name) context = { 'from': DAEMON_ERRORS_EMAIL_FROM, 'replyto': DAEMON_ERRORS_EMAIL_REPLY_TO, 'to': self.job.submittermail, 'jobid': self.job.id, 'date': datetime.datetime.now().strftime('%a, %d %b %Y %H:%M:%S +0200 (CEST)'), 'url': DAEMON_ERRORS_JOB_URL % self.job.id, 'title': self.job.maptitle } msg = template.render(context) mailer.sendmail(DAEMON_ERRORS_EMAIL_FROM, self.job.submittermail, msg.encode("utf8")) LOG.info("Email notification sent.") except Exception as e: LOG.exception("Could not send notification email to the submitter!") def _email_exception(self, exc_info): """This method can be used to send the given exception by email to the configured admins in the project's settings.""" if not ADMINS or not DAEMON_ERRORS_SMTP_HOST: return try: LOG.info("Emailing rendering exceptions to the admins (%s) via %s:%d..." % (', '.join([admin[1] for admin in ADMINS]), DAEMON_ERRORS_SMTP_HOST, DAEMON_ERRORS_SMTP_PORT)) if DAEMON_ERRORS_SMTP_ENCRYPT == "SSL": mailer = smtplib.SMTP_SSL() else: mailer = smtplib.SMTP(DAEMON_ERRORS_SMTP_HOST) mailer.connect(DAEMON_ERRORS_SMTP_HOST, DAEMON_ERRORS_SMTP_PORT) if DAEMON_ERRORS_SMTP_ENCRYPT == "TLS": mailer.starttls() if DAEMON_ERRORS_SMTP_USER and DAEMON_ERRORS_SMTP_PASSWORD: mailer.login(DAEMON_ERRORS_SMTP_USER, DAEMON_ERRORS_SMTP_PASSWORD) jobinfo = [] for k in sorted(self.job.__dict__.keys()): # We don't care about state that much, especially since it # doesn't display well # for privacy reasons we remove submitter email and ip if k != '_state' and k != 'submitterip' and k != 'submittermail': jobinfo.append(' %s: %s' % (k, str(self.job.__dict__[k]))) template = get_template("render_email_exception.txt") context = { 'from': DAEMON_ERRORS_EMAIL_FROM, 'replyto': DAEMON_ERRORS_EMAIL_REPLY_TO, 'to': ', '.join(['%s <%s>' % admin for admin in ADMINS]), 'jobid': self.job.id, 'jobinfo': '\n'.join(jobinfo), 'date': datetime.datetime.now().strftime('%a, %d %b %Y %H:%M:%S +0200 (CEST)'), 'url': DAEMON_ERRORS_JOB_URL % self.job.id, 'tb': ''.join(traceback.format_exception(*exc_info)) } msg = template.render(context) mailer.sendmail(DAEMON_ERRORS_EMAIL_FROM, [admin[1] for admin in ADMINS], msg) LOG.info("Error report sent.") except Exception as e: LOG.exception("Could not send error email to the admins!") self._email_submitter("render_email_failure.txt") def _gen_thumbnail(self, prefix, paper_width_mm, paper_height_mm): LOG.info('Creating map thumbnail...') if self.job.layout == "multi_page": # Depending on whether we're rendering landscape or # portrait, adapt how the tiling is done. if paper_width_mm > paper_height_mm: tile = "1x2" else: tile = "2x1" # With the 'montage' command from ImageMagick, create an # image with the first two pages of the PDF (cover page # and overview page). try: montage_cmd = [ "montage", "-tile", tile, "%s.pdf[0]" % prefix, "%s.pdf[2]" % prefix, "-geometry", "+10+10", "-shadow", "%s%s" % (prefix, THUMBNAIL_SUFFIX) ] subprocess.check_call(montage_cmd) # And now scale it to the normal thumbnail size mogrify_cmd = [ "mogrify", "-scale", "200x200", "%s%s" % (prefix, THUMBNAIL_SUFFIX) ] subprocess.check_call(mogrify_cmd) except Exception as e: LOG.warning("thumbnail creation failed: %s" % e) LOG.warning("maybe PDF parsing is disabled in the ImageMagic Policy map? (e.g. /etc/ImageMagick-6/policy.xml)") elif 'png' in RENDERING_RESULT_FORMATS: try: Image.MAX_IMAGE_PIXELS = None img = Image.open(prefix + '.png') try: img = img.convert('RGB') img.save(prefix + '.jpg', quality=50) except Exception as e: LOG.warning("PNG to JPEG conversion failed: %s" % e) img.thumbnail((200, 200), Image.ANTIALIAS) img.save(prefix + THUMBNAIL_SUFFIX) except Exception as e: LOG.warning("PNG size reduction failed: %s" % e) img.close() try: pngquant_cmd = [ "pngquant", "--output", "%s.8bit.png" % prefix, "%s.png" % prefix ] subprocess.check_call(pngquant_cmd) except Exception as e: LOG.warning("PNG color reduction failed: %s" % e) def run(self): """Renders the given job, encapsulating all processing errors and exceptions. This does not affect the job entry in the database in any way. It's the responsibility of the caller to do maintain the job status in the database. Returns one of the RESULT_ constants. """ LOG.info("Rendering job #%d '%s'..." % (self.job.id, self.job.maptitle)) try: renderer = ocitysmap.OCitySMap(OCITYSMAP_CFG_PATH) config = ocitysmap.RenderingConfiguration() result_file_prefix = os.path.join(RENDERING_RESULT_PATH, self.job.files_prefix()) os.makedirs(os.path.dirname(result_file_prefix), exist_ok=True) # TODO have the create form provide this config.origin_url = 'https://print.get-map.org' + self.job.get_absolute_url() config.title = self.job.maptitle config.osmid = self.job.administrative_osmid if config.osmid: bbox_wkt, area_wkt \ = renderer.get_geographic_info(config.osmid) config.bounding_box = ocitysmap.coords.BoundingBox.parse_wkt( bbox_wkt) else: config.bounding_box = ocitysmap.coords.BoundingBox( self.job.lat_upper_left, self.job.lon_upper_left, self.job.lat_bottom_right, self.job.lon_bottom_right) config.language = self.job.map_language config.indexer = self.job.indexer config.logo = self.job.logo config.extra_logo = self.job.extra_logo config.stylesheet = renderer.get_stylesheet_by_name( self.job.stylesheet) config.overlays = [] if self.job.overlay: for overlay in self.job.overlay.split(","): config.overlays.append(renderer.get_overlay_by_name(overlay)) config.import_files = [] # legacy files, eventually remove these config.gpx_file = False config.umap_file = False config.poi_file = False for file in self.job.uploads.all(): config.import_files.append((file.file_type, os.path.join(MEDIA_ROOT, file.uploaded_file.name))) # legacy files, eventually remove these if file.file_type == 'gpx': config.gpx_file = os.path.join(MEDIA_ROOT, file.uploaded_file.name) if file.file_type == 'umap': config.umap_file = os.path.join(MEDIA_ROOT, file.uploaded_file.name) if file.file_type == 'poi': config.poi_file = os.path.join(MEDIA_ROOT, file.uploaded_file.name) config.paper_width_mm = self.job.paper_width_mm config.paper_height_mm = self.job.paper_height_mm except KeyboardInterrupt: self.result = RESULT_KEYBOARD_INTERRUPT LOG.info("Rendering of job #%d interrupted!" % self.job.id) return self.result except MemoryError: self.result = RESULT_MEMORY_EXCEEDED LOG.exception("Not enough memory to render job #%d" % self.job.id) self._email_exception(sys.exc_info()) return self.result except Exception as e: self.result = RESULT_PREPARATION_EXCEPTION LOG.exception("Rendering of job #%d failed (exception occurred during" " data preparation)!" % self.job.id) errfile = result_file_prefix + "-errors.txt" fp = open(errfile, "w") traceback.print_exc(file=fp) fp.close() self._email_exception(sys.exc_info()) return self.result try: # Get the list of output formats (PNG, PDF, SVGZ, CSV) # that the renderer accepts. renderer_cls = renderers.get_renderer_class_by_name(self.job.layout) compatible_output_formats = renderer_cls.get_compatible_output_formats() # Compute the intersection of the accepted output formats # with the desired output formats. output_formats = \ list(set(compatible_output_formats) & set(RENDERING_RESULT_FORMATS)) output_count = renderer.render(config, self.job.layout, output_formats, result_file_prefix) # Create thumbnail try: self._gen_thumbnail(result_file_prefix, config.paper_width_mm, config.paper_height_mm) except: pass if output_count > 0: self.result = RESULT_SUCCESS LOG.info("Finished rendering of job #%d." % self.job.id) # TODO log if some formats failed else: self.result = RESULT_RENDERING_EXCEPTION LOG.exception("Rendering of job #%d faild, no output files generated" % self.job.id) return self.result except KeyboardInterrupt: self.result = RESULT_KEYBOARD_INTERRUPT LOG.info("Rendering of job #%d interrupted!" % self.job.id) return self.result except MemoryError: self.result = RESULT_MEMORY_EXCEEDED LOG.exception("Not enough memory to render job #%d" % self.job.id) self._email_exception(sys.exc_info()) return self.result except Exception as e: self.result = RESULT_RENDERING_EXCEPTION LOG.exception("Rendering of job #%d failed (exception occurred during" " rendering)!" % self.job.id) errfile = result_file_prefix + "-errors.txt" fp = open(errfile, "w") traceback.print_exc(file=fp) fp.close() self._email_exception(sys.exc_info()) return self.result self._email_submitter("render_email_success.txt") return self.result if __name__ == '__main__': def usage(): sys.stderr.write('usage: %s [timeout]\n' % sys.argv[0]) if len(sys.argv) < 2 or len(sys.argv) > 3: usage() sys.exit(3) try: jobid = int(sys.argv[1]) job = MapRenderingJob.objects.get(id=jobid) if job: prefix = 'renderer_%d_' % os.getpid() if len(sys.argv) == 3: renderer = ThreadingJobRenderer(job, int(sys.argv[2]), prefix) else: renderer = JobRenderer(job, prefix) sys.exit(renderer.run()) else: sys.stderr.write('Job #%d not found!' % jobid) sys.exit(4) except ValueError: usage() sys.exit(3)