ogn-python/ogn/collect/database.py

71 wiersze
1.9 KiB
Python
Czysty Zwykły widok Historia

2016-05-22 05:26:02 +00:00
from sqlalchemy.sql import null
from celery.utils.log import get_task_logger
2016-02-03 22:09:11 +00:00
from ogn.model import Device, AddressOrigin
from ogn.utils import get_ddb
2016-02-03 22:09:11 +00:00
from ogn.collect.celery import app
2016-02-03 22:09:11 +00:00
logger = get_task_logger(__name__)
2016-05-22 05:26:02 +00:00
temp_address_origin = 7
def add_devices(session, origin):
before_sq = session.query(Device.address) \
.filter(Device.address_origin == origin) \
.subquery()
add_query = session.query(Device) \
.filter(Device.address_origin == temp_address_origin) \
.filter(~Device.address.in_(before_sq))
result = add_query.update({Device.address_origin: origin},
synchronize_session='fetch')
return result
def update_devices(session, origin, devices):
session.query(Device) \
2016-05-22 05:26:02 +00:00
.filter(Device.address_origin == temp_address_origin) \
.delete()
session.bulk_save_objects(devices)
2016-05-22 05:26:02 +00:00
# mark temporary added devices
session.query(Device) \
.filter(Device.address_origin == null()) \
.update({Device.address_origin: temp_address_origin})
logger.info('Added {} devices'.format(add_devices(session, origin)))
# delete temporary added devices
session.query(Device) \
.filter(Device.address_origin == temp_address_origin) \
.delete()
session.commit()
return len(devices)
@app.task
def import_ddb():
"""Import registered devices from the DDB."""
logger.info("Import registered devices fom the DDB...")
counter = update_devices(app.session, AddressOrigin.ogn_ddb, get_ddb())
logger.info("Imported {} devices.".format(counter))
@app.task
def import_file(path='tests/custom_ddb.txt'):
"""Import registered devices from a local file."""
logger.info("Import registered devices from '{}'...".format(path))
2016-04-28 10:38:48 +00:00
counter = update_devices(app.session, AddressOrigin.user_defined,
get_ddb(path))
logger.info("Imported {} devices.".format(counter))