diff --git a/config/default.py b/config/default.py index f78c931..32ef2b1 100644 --- a/config/default.py +++ b/config/default.py @@ -11,6 +11,10 @@ CELERYBEAT_SCHEDULE = { 'task': 'ogn.collect.heatmap.update_beacon_receiver_distance_all', 'schedule': timedelta(minutes=5), }, + 'update-receiver-table': { + 'task': 'ogn.collect.database.update_receivers', + 'schedule': timedelta(minutes=5), + }, } CELERY_TIMEZONE = 'UTC' diff --git a/ogn/collect/database.py b/ogn/collect/database.py index b390795..b62ddff 100644 --- a/ogn/collect/database.py +++ b/ogn/collect/database.py @@ -1,9 +1,15 @@ -from ogn.utils import get_ddb -from ogn.model import Device, AddressOrigin +from sqlalchemy.sql import func, null +from sqlalchemy.sql.functions import coalesce +from sqlalchemy import and_, or_ from celery.utils.log import get_task_logger + +from ogn.model import Device, AddressOrigin, Receiver, ReceiverBeacon +from ogn.utils import get_ddb, get_country_code + from ogn.collect.celery import app + logger = get_task_logger(__name__) @@ -34,3 +40,80 @@ def import_file(path='tests/custom_ddb.txt'): logger.info("Import registered devices from '{}'...".format(path)) counter = update_devices(app.session, AddressOrigin.user_defined, get_ddb(path)) logger.info("Imported %i devices." % counter) + + +@app.task +def update_receivers(): + """Update the receiver table.""" + ## get current receiver data + last_entry_sq = app.session.query(coalesce(func.max(Receiver.lastseen), '2015-01-01 00:00:00').label('last_entry')) \ + .subquery() + + last_receiver_beacon_sq = app.session.query(ReceiverBeacon.name, func.min(ReceiverBeacon.timestamp).label('firstseen'), func.max(ReceiverBeacon.timestamp).label('lastseen')) \ + .filter(ReceiverBeacon.timestamp >= last_entry_sq.c.last_entry) \ + .group_by(ReceiverBeacon.name) \ + .subquery() + + ## update existing receivers + sq = app.session.query(ReceiverBeacon.name, ReceiverBeacon.latitude, ReceiverBeacon.longitude, ReceiverBeacon.altitude, last_receiver_beacon_sq.c.firstseen, last_receiver_beacon_sq.c.lastseen, ReceiverBeacon.version, ReceiverBeacon.platform) \ + .filter(and_(ReceiverBeacon.name == last_receiver_beacon_sq.c.name, ReceiverBeacon.timestamp == last_receiver_beacon_sq.c.lastseen)) \ + .subquery() + + # set country code to None if lat or lon changed + upd = app.session.query(Receiver) \ + .filter(and_(Receiver.name == sq.c.name, + or_(Receiver.latitude != sq.c.latitude, + Receiver.longitude != sq.c.longitude) + ) + ) \ + .update({"latitude": sq.c.latitude, + "longitude": sq.c.longitude, + "country_code": None}) + + logger.info("Count of receivers who changed lat or lon: {}".format(upd)) + app.session.commit() + + # update lastseen of known receivers + upd = app.session.query(Receiver) \ + .filter(Receiver.name == sq.c.name) \ + .update({"altitude": sq.c.altitude, + "lastseen": sq.c.lastseen, + "version": sq.c.version, + "platform": sq.c.platform}) + + logger.info("Count of receivers who where updated: {}".format(upd)) + + ## add new receivers + empty_sq = app.session.query(ReceiverBeacon.name, ReceiverBeacon.latitude, ReceiverBeacon.longitude, ReceiverBeacon.altitude, last_receiver_beacon_sq.c.firstseen, last_receiver_beacon_sq.c.lastseen, ReceiverBeacon.version, ReceiverBeacon.platform) \ + .filter(and_(ReceiverBeacon.name == last_receiver_beacon_sq.c.name, + ReceiverBeacon.timestamp == last_receiver_beacon_sq.c.lastseen)) \ + .outerjoin(Receiver, Receiver.name == ReceiverBeacon.name) \ + .filter(Receiver.name == null()) \ + .order_by(ReceiverBeacon.name) + + for receiver_beacon in empty_sq.all(): + receiver = Receiver() + receiver.name = receiver_beacon.name + receiver.latitude = receiver_beacon.latitude + receiver.longitude = receiver_beacon.longitude + receiver.altitude = receiver_beacon.altitude + receiver.firstseen = receiver_beacon.firstseen + receiver.lastseen = receiver_beacon.lastseen + receiver.version = receiver_beacon.version + receiver.platform = receiver_beacon.platform + + app.session.add(receiver) + logger.info("{} added".format(receiver.name)) + + app.session.commit() + + # update country code if None + unknown_country_query = app.session.query(Receiver) \ + .filter(Receiver.country_code == null()) \ + .order_by(Receiver.name) + + for receiver in unknown_country_query.all(): + receiver.country_code = get_country_code(receiver.latitude, receiver.longitude) + logger.info("Updated country_code for {} to {}".format(receiver.name, receiver.country_code)) + + app.session.commit() diff --git a/ogn/commands/showreceiver.py b/ogn/commands/showreceiver.py index ed378d5..0489584 100644 --- a/ogn/commands/showreceiver.py +++ b/ogn/commands/showreceiver.py @@ -1,6 +1,7 @@ from datetime import datetime, timedelta -from sqlalchemy.sql import func -from sqlalchemy import distinct, and_ +from sqlalchemy.sql import func, null +from sqlalchemy.sql.functions import coalesce +from sqlalchemy import distinct, and_, or_ from ogn.model import ReceiverBeacon, Receiver from ogn.commands.dbutils import session @@ -13,8 +14,7 @@ receiver_beacons_per_day = 24 * 60 / 5 @manager.command def list_all(): - """Show a list of all receivers (NOT IMPLEMENTED).""" - + """Show a list of all receivers.""" timestamp_24h_ago = datetime.utcnow() - timedelta(days=1) sq = session.query(distinct(ReceiverBeacon.name).label('name'), diff --git a/ogn/utils.py b/ogn/utils.py index a8f546d..fa4a0e5 100644 --- a/ogn/utils.py +++ b/ogn/utils.py @@ -5,6 +5,7 @@ from io import StringIO from .model import Device, AddressOrigin from geopy.geocoders import Nominatim +from geopy.exc import GeopyError DDB_URL = "http://ddb.glidernet.org/download" @@ -54,11 +55,13 @@ def get_trackable(ddb): def get_country_code(latitude, longitude): geolocator = Nominatim() - location = geolocator.reverse("%f, %f" % (latitude, longitude)) try: + location = geolocator.reverse("%f, %f" % (latitude, longitude)) country_code = location.raw["address"]["country_code"] except KeyError: country_code = None + except GeopyError: + country_code = None return country_code @@ -71,7 +74,6 @@ def haversine_distance(location0, location1): lon1 = radians(location1[1]) distance = 6366000 * 2 * asin(sqrt((sin((lat0 - lat1) / 2))**2 + cos(lat0) * cos(lat1) * (sin((lon0 - lon1) / 2))**2)) - print(distance) phi = degrees(atan2(sin(lon0 - lon1) * cos(lat1), cos(lat0) * sin(lat1) - sin(lat0) * cos(lat1) * cos(lon0 - lon1))) return distance, phi