ogn-python/app/backend/ognrange.py

55 wiersze
1.9 KiB
Python
Czysty Zwykły widok Historia

2019-08-31 08:14:41 +00:00
import json
from datetime import datetime, timedelta
from sqlalchemy import func, case
from sqlalchemy.sql.expression import label
from app.model import Receiver, ReceiverCoverage
from app import db
def alchemyencoder(obj):
"""JSON encoder function for SQLAlchemy special classes."""
import decimal
from datetime import datetime
if isinstance(obj, datetime):
return obj.strftime("%Y-%m-%d %H:%M")
elif isinstance(obj, decimal.Decimal):
return float(obj)
def stations2_filtered_pl(start, end):
last_10_minutes = datetime.utcnow() - timedelta(minutes=10)
query = (
db.session.query(
Receiver.name.label("s"),
label("lt", func.round(func.ST_Y(Receiver.location_wkt) * 10000) / 10000),
label("lg", func.round(func.ST_X(Receiver.location_wkt) * 10000) / 10000),
case([(Receiver.lastseen > last_10_minutes, "U")], else_="D").label("u"),
Receiver.lastseen.label("ut"),
label("v", Receiver.version + "." + Receiver.platform),
)
.order_by(Receiver.lastseen)
.filter(db.or_(db.and_(start < Receiver.firstseen, end > Receiver.firstseen), db.and_(start < Receiver.lastseen, end > Receiver.lastseen)))
)
res = db.session.execute(query)
stations = json.dumps({"stations": [dict(r) for r in res]}, default=alchemyencoder)
return stations
def max_tile_mgrs_pl(station, start, end, squares):
query = (
db.session.query(func.right(ReceiverCoverage.location_mgrs_short, 4), func.count(ReceiverCoverage.location_mgrs_short))
.filter(db.and_(Receiver.id == ReceiverCoverage.receiver_id, Receiver.name == station))
.filter(ReceiverCoverage.location_mgrs_short.like(squares + "%"))
.group_by(func.right(ReceiverCoverage.location_mgrs_short, 4))
)
res = {"t": squares, "p": ["{}/{}".format(r[0], r[1]) for r in query.all()]}
return json.dumps(res)