Merge pull request #315 from osm2vectortiles/hotfix/requeue-failures

Stability and Performance Improvements
pull/320/head
Manuel Roth 2016-05-09 09:55:24 +02:00
commit 3ec4a4f34e
7 zmienionych plików z 67 dodań i 31 usunięć

Wyświetl plik

@ -41,7 +41,13 @@ Layer:
WHERE geometry && !bbox!
GROUP BY type
UNION ALL
SELECT osm_ids2mbid(osm_id, true) AS osm_id, geometry, landuse_class(type) AS class, type
SELECT
osm_ids2mbid(osm_id, true) AS osm_id,
CASE WHEN ST_Area(geometry) > 10000000
THEN ST_Intersection(ST_MakeValid(geometry), !bbox!)
ELSE geometry
END AS geometry,
landuse_class(type) AS class, type
FROM (
SELECT osm_id, geometry, type
FROM landuse_z9
@ -133,7 +139,12 @@ Layer:
srid: ''
table: |-
(
SELECT osm_ids2mbid(osm_id, true) AS osm_id, geometry
SELECT
osm_ids2mbid(osm_id, true) AS osm_id,
CASE WHEN ST_Area(geometry) > 10000000 AND osm_id <> 0
THEN ST_Intersection(ST_MakeValid(geometry), !bbox!)
ELSE geometry
END AS geometry
FROM (
SELECT osm_id, geometry
FROM water_z0
@ -290,7 +301,13 @@ Layer:
srid: ''
table: |-
(
SELECT osm_ids2mbid(osm_id, true) AS osm_id, geometry, landuse_overlay_class(type) AS class, type
SELECT
osm_ids2mbid(osm_id, true) AS osm_id,
CASE WHEN ST_Area(geometry) > 10000000
THEN ST_Intersection(ST_MakeValid(geometry), !bbox!)
ELSE geometry
END AS geometry,
landuse_overlay_class(type) AS class, type
FROM (
SELECT osm_id, geometry, type FROM landuse_overlay_z5
WHERE z(!scale_denominator!) = 5

Wyświetl plik

@ -19,6 +19,7 @@ RUN npm -g outdated | grep -v npm
RUN apt-get update && apt-get install -y --no-install-recommends \
python \
python-pip \
python-dev \
&& rm -rf /var/lib/apt/lists/
VOLUME /data/tm2source /data/export

Wyświetl plik

@ -16,18 +16,22 @@ Options:
"""
import time
import subprocess
import sys
import os
import os.path
import json
import humanize
from boto.s3.connection import S3Connection, OrdinaryCallingFormat
from mbtoolbox.optimize import remove_subpyramids
import pika
from boto.s3.connection import S3Connection, OrdinaryCallingFormat
from mbtoolbox.optimize import find_optimizable_tiles, all_descendant_tiles
from mbtoolbox.mbtiles import MBTiles
from docopt import docopt
if os.name == 'posix' and sys.version_info[0] < 3:
import subprocess32 as subprocess
else:
import subprocess
def s3_url(host, port, bucket_name, file_name):
protocol = 'https' if port == 443 else 'http'
@ -86,24 +90,28 @@ def render_tile_list_command(source, sink, list_file):
def render_pyramid_command(source, sink, bounds, min_zoom, max_zoom):
# Slow tiles should timeout as fast as possible so job can fail
return [
'tilelive-copy',
'--scheme', 'pyramid',
'--minzoom', str(min_zoom),
'--maxzoom', str(max_zoom),
'--bounds={}'.format(bounds),
'--timeout=300000',
'--slow=60000',
'--timeout=40000',
source, sink
]
def optimize_mbtiles(mbtiles_file, mask_level=8):
remove_subpyramids(mbtiles_file, mask_level, 'tms')
mbtiles = MBTiles(mbtiles_file, 'tms')
for tile in find_optimizable_tiles(mbtiles, mask_level, 'tms'):
tiles = all_descendant_tiles(x=tile.x, y=tile.y, zoom=tile.z, max_zoom=14)
mbtiles.remove_tiles(tiles)
def export_remote(tm2source, rabbitmq_url, queue_name, result_queue_name,
render_scheme, bucket_name):
failed_queue_name, render_scheme, bucket_name):
host = os.getenv('AWS_S3_HOST', 'mock-s3')
port = int(os.getenv('AWS_S3_PORT', 8080))
@ -146,26 +154,31 @@ def export_remote(tm2source, rabbitmq_url, queue_name, result_queue_name,
else:
raise ValueError("Message must be either of type pyramid or list")
start = time.time()
subprocess.check_call(tilelive_cmd)
end = time.time()
try:
start = time.time()
subprocess.check_call(tilelive_cmd, timeout=5*60)
end = time.time()
print('Rendering time: {}'.format(humanize.naturaltime(end - start)))
print('Rendering time: {}'.format(humanize.naturaltime(end - start)))
print('Optimize MBTiles file size')
optimize_mbtiles(mbtiles_file)
upload_mbtiles(bucket, mbtiles_file)
os.remove(mbtiles_file)
print('Optimize MBTiles file size')
optimize_mbtiles(mbtiles_file)
upload_mbtiles(bucket, mbtiles_file)
os.remove(mbtiles_file)
print('Upload mbtiles {}'.format(mbtiles_file))
print('Upload mbtiles {}'.format(mbtiles_file))
download_link = s3_url(host, port, bucket_name, mbtiles_file)
result_msg = create_result_message(task_id, download_link, msg)
durable_publish(channel, result_queue_name,
body=json.dumps(result_msg))
channel.basic_ack(delivery_tag=method.delivery_tag)
download_link = s3_url(host, port, bucket_name, mbtiles_file)
result_msg = create_result_message(task_id, download_link, msg)
durable_publish(channel, result_queue_name,
body=json.dumps(result_msg))
channel.basic_ack(delivery_tag=method.delivery_tag)
except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e:
durable_publish(channel, failed_queue_name, body=body)
channel.basic_ack(delivery_tag=method.delivery_tag)
channel.stop_consuming()
time.sleep(5) # Give RabbitMQ some time
raise e
channel.basic_consume(callback, queue=queue_name)
try:
@ -183,6 +196,7 @@ def configure_rabbitmq(channel):
return channel.queue_declare(queue=queue, durable=True)
queue_declare('jobs')
queue_declare('failed-jobs')
queue_declare('results')
@ -208,6 +222,7 @@ def main(args):
args['<rabbitmq_url>'],
args['--job-queue'],
'results',
'failed-jobs',
args['--render_scheme'],
args['--bucket'],
)

Wyświetl plik

@ -2,5 +2,6 @@ docopt==0.6.2
boto==2.38.0
pika==0.10.0
humanize==0.5.1
subprocess32==3.2.7
-e git+https://github.com/lukasmartinelli/mbtoolbox.git@#egg=mbtoolbox
-e git+https://github.com/mapbox/mbutil.git@#egg=mbutil-0.2.0beta

Wyświetl plik

@ -321,7 +321,7 @@ tables:
- name: id
type: id
- name: geometry
type: geometry
type: validated_geometry
- name: timestamp
type: pbf_timestamp
- name: type
@ -680,7 +680,7 @@ tables:
- name: id
type: id
- name: geometry
type: geometry
type: validated_geometry
- name: timestamp
type: pbf_timestamp
- key: name

Wyświetl plik

@ -150,7 +150,7 @@ BEGIN
UNION
SELECT * FROM osm_tables_delete
LOOP
EXECUTE format('DROP TABLE %I CASCADE', t.table_name);
EXECUTE format('DROP TABLE IF EXISTS %I CASCADE', t.table_name);
END LOOP;
END;
$$ language plpgsql;

Wyświetl plik

@ -69,6 +69,8 @@ def merge_results(rabbitmq_url, merge_target, result_queue_name):
connection = pika.BlockingConnection(pika.URLParameters(rabbitmq_url))
channel = connection.channel()
channel.basic_qos(prefetch_count=3)
channel.confirm_delivery()
def callback(ch, method, properties, body):
msg = json.loads(body.decode('utf-8'))