Removed AmbigousTimeError

pull/62/head
Konstantin Gründger 2018-05-01 11:21:28 +02:00
rodzic 79ab1cfc8a
commit a1f84c6cb7
8 zmienionych plików z 74 dodań i 106 usunięć

Wyświetl plik

@ -1,3 +1,3 @@
from ogn.parser import parse as parse_module # only for test functions. Without this a mock of parse would mock the function instead of the module
from ogn.parser.parse import parse, parse_aprs, parse_comment # flake8: noqa
from ogn.parser.exceptions import ParseError, AprsParseError, OgnParseError, AmbigousTimeError # flake8: noqa
from ogn.parser.exceptions import ParseError, AprsParseError, OgnParseError # flake8: noqa

Wyświetl plik

@ -1,7 +1,6 @@
"""
exception definitions
"""
from datetime import datetime
class ParseError(Exception):
@ -24,14 +23,3 @@ class OgnParseError(ParseError):
self.message = "This is not a valid OGN message: {}".format(aprs_comment)
super(OgnParseError, self).__init__(self.message)
class AmbigousTimeError(ParseError):
"""Timstamp from the past/future, can't fully reconstruct datetime from timestamp."""
def __init__(self, reference, packet_time):
self.reference = reference
self.packet_time = packet_time
self.timedelta = reference - datetime.combine(reference, packet_time)
self.message = "Can't reconstruct timestamp, {:.0f}s from past.".format(self.timedelta.total_seconds())
super(AmbigousTimeError, self).__init__(self.message)

Wyświetl plik

@ -17,13 +17,11 @@ from ogn.parser.aprs_comment.spider_parser import SpiderParser
from ogn.parser.aprs_comment.spot_parser import SpotParser
def parse(aprs_message, reference_date=None, reference_time=None):
if reference_date is None:
now = datetime.utcnow()
reference_date = now.date()
reference_time = now.time()
def parse(aprs_message, reference_timestamp=None):
if reference_timestamp is None:
reference_timestamp = datetime.utcnow()
message = parse_aprs(aprs_message, reference_date, reference_time)
message = parse_aprs(aprs_message, reference_timestamp)
if message['aprs_type'] == 'position' or message['aprs_type'] == 'status':
message.update(parse_comment(message['comment'],
dstcall=message['dstcall'],
@ -31,7 +29,10 @@ def parse(aprs_message, reference_date=None, reference_time=None):
return message
def parse_aprs(message, reference_date, reference_time=None):
def parse_aprs(message, reference_timestamp=None):
if reference_timestamp is None:
reference_timestamp = datetime.utcnow()
if message and message[0] == '#':
match_server = re.search(PATTERN_SERVER, message)
if match_server:
@ -56,7 +57,7 @@ def parse_aprs(message, reference_date, reference_time=None):
'dstcall': match.group('dstcall'),
'relay': match.group('relay') if match.group('relay') else None,
'receiver_name': match.group('receiver'),
'timestamp': createTimestamp(match_position.group('time'), reference_date, reference_time),
'timestamp': createTimestamp(match_position.group('time'), reference_timestamp),
'latitude': parseAngle('0' + match_position.group('latitude') + (match_position.group('latitude_enhancement') or '0')) *
(-1 if match_position.group('latitude_sign') == 'S' else 1),
'symboltable': match_position.group('symbol_table'),
@ -74,7 +75,7 @@ def parse_aprs(message, reference_date, reference_time=None):
return {'name': match.group('callsign'),
'dstcall': match.group('dstcall'),
'receiver_name': match.group('receiver'),
'timestamp': createTimestamp(match_status.group('time'), reference_date, reference_time),
'timestamp': createTimestamp(match_status.group('time'), reference_timestamp),
'comment': match_status.group('comment') if match_status.group('comment') else "",
'aprs_type': aprs_type}

Wyświetl plik

@ -5,9 +5,7 @@ from ogn.parser.utils import createTimestamp
def parse(telnet_data):
now = datetime.utcnow()
reference_date = now.date()
reference_time = now.time()
reference_timestamp = datetime.utcnow()
try:
return {'pps_offset': float(telnet_data[0:5]),
@ -15,7 +13,7 @@ def parse(telnet_data):
'aircraft_type': int(telnet_data[20:24]),
'address_type': int(telnet_data[25]),
'address': telnet_data[27:33],
'timestamp': createTimestamp(telnet_data[34:40] + 'h', reference_date, reference_time),
'timestamp': createTimestamp(telnet_data[34:40] + 'h', reference_timestamp),
'latitude': float(telnet_data[43:53]),
'longitude': float(telnet_data[54:64]),
'altitude': int(telnet_data[68:73]),

Wyświetl plik

@ -1,8 +1,5 @@
from datetime import datetime, timedelta
from ogn.parser.exceptions import AmbigousTimeError
FEETS_TO_METER = 0.3048 # ratio feets to meter
FPM_TO_MS = FEETS_TO_METER / 60 # ratio fpm to m/s
KNOTS_TO_MS = 0.5144 # ratio knots to m/s
@ -14,46 +11,38 @@ def parseAngle(dddmmhht):
return float(dddmmhht[:3]) + float(dddmmhht[3:]) / 60
def createTimestamp(timestamp, reference_date, reference_time=None):
if timestamp[-1] == "z":
day = int(timestamp[0:2])
hhmm = timestamp[2:6]
if reference_date.day < day:
if reference_date.month == 1:
reference_date = reference_date.replace(year=reference_date.year - 1, month=12, day=day)
else:
reference_date = reference_date.replace(month=reference_date.month - 1, day=day)
else:
reference_date = reference_date.replace(day=day)
packet_time = datetime.strptime(hhmm, '%H%M').time()
return datetime.combine(reference_date, packet_time)
elif timestamp[-1] == "h":
hhmmss = timestamp[:-1]
packet_time = datetime.strptime(hhmmss, '%H%M%S').time()
def createTimestamp(time_string, reference_timestamp=None):
if time_string[-1] == "z":
dd = int(time_string[0:2])
hh = int(time_string[2:4])
mm = int(time_string[4:6])
result = datetime(reference_timestamp.year,
reference_timestamp.month,
dd,
hh, mm, 0)
if result > reference_timestamp + timedelta(days=14):
# shift timestamp to previous month
result = (result.replace(day=1) - timedelta(days=5)).replace(day=result.day)
elif result < reference_timestamp - timedelta(days=14):
# shift timestamp to next month
result = (result.replace(day=28) + timedelta(days=5)).replace(day=result.day)
else:
raise ValueError()
hh = int(time_string[0:2])
mm = int(time_string[2:4])
ss = int(time_string[4:6])
if reference_time is None:
return datetime.combine(reference_date, packet_time)
else:
reference_datetime = datetime.combine(reference_date, reference_time)
timestamp = datetime.combine(reference_date, packet_time)
delta = timestamp - reference_datetime
result = datetime(reference_timestamp.year,
reference_timestamp.month,
reference_timestamp.day,
hh, mm, ss)
# This function reconstructs the packet date from the timestamp and a reference_datetime time.
# delta vs. packet date:
# -24h -12h 0 +12h +24h
# |-------------------------|---------------------|------------------------|----------------------|
# [-] <-- tomorrow [---------today---------] [-------yesterday------]
if result > reference_timestamp + timedelta(hours=12):
# shift timestamp to previous day
result -= timedelta(days=1)
elif result < reference_timestamp - timedelta(hours=12):
# shift timestamp to next day
result += timedelta(days=1)
if timedelta(hours=-12) <= delta <= timedelta(minutes=30):
# Packet less than 12h from the past or 30min from the future
return timestamp
elif delta < timedelta(hours=-23, minutes=-30):
# Packet from next day, less than 30min from the future
return datetime.combine(reference_datetime + timedelta(hours=+12), packet_time)
elif timedelta(hours=12) < delta:
# Packet from previous day, less than 12h from the past
return datetime.combine(reference_datetime + timedelta(hours=-12), packet_time)
else:
raise AmbigousTimeError(reference_datetime, packet_time)
return result

Wyświetl plik

@ -68,15 +68,15 @@ class TestStringMethods(unittest.TestCase):
def test_fail_bad_dstcall(self):
with self.assertRaises(OgnParseError):
parse("EPZR>WTFDSTCALL,TCPIP*,qAC,GLIDERN1:>093456h this is a comment", reference_date=datetime(2015, 1, 1))
parse("EPZR>WTFDSTCALL,TCPIP*,qAC,GLIDERN1:>093456h this is a comment")
def test_v026_chile(self):
# receiver beacons from chile have a APRS position message with a pure user comment
message = parse("VITACURA1>APRS,TCPIP*,qAC,GLIDERN4:/201146h3322.79SI07034.80W&/A=002329 Vitacura Municipal Aerodrome, Club de Planeadores Vitacura", reference_date=datetime(2015, 1, 1))
message = parse("VITACURA1>APRS,TCPIP*,qAC,GLIDERN4:/201146h3322.79SI07034.80W&/A=002329 Vitacura Municipal Aerodrome, Club de Planeadores Vitacura")
self.assertEqual(message['user_comment'], "Vitacura Municipal Aerodrome, Club de Planeadores Vitacura")
message_with_id = parse("ALFALFAL>APRS,TCPIP*,qAC,GLIDERN4:/221830h3330.40SI07007.88W&/A=008659 Alfalfal Hidroelectric Plant, Club de Planeadores Vitacurs", reference_date=datetime(2015, 1, 1))
message_with_id = parse("ALFALFAL>APRS,TCPIP*,qAC,GLIDERN4:/221830h3330.40SI07007.88W&/A=008659 Alfalfal Hidroelectric Plant, Club de Planeadores Vitacurs")
self.assertEqual(message_with_id['user_comment'], "Alfalfal Hidroelectric Plant, Club de Planeadores Vitacurs")
@ -96,7 +96,7 @@ class TestStringMethods(unittest.TestCase):
def test_copy_constructor(self):
valid_aprs_string = "FLRDDA5BA>APRS,qAS,LFMX:/160829h4415.41N/00600.03E'342/049/A=005524 id0ADDA5BA -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5"
message = parse(valid_aprs_string, reference_date=datetime(2015, 1, 1, 16, 8, 29))
message = parse(valid_aprs_string)
self.assertEqual(message['name'], 'FLRDDA5BA')
self.assertEqual(message['address'], 'DDA5BA')

Wyświetl plik

@ -10,11 +10,10 @@ from ogn.parser.exceptions import AprsParseError
class TestStringMethods(unittest.TestCase):
def test_fail_validation(self):
with self.assertRaises(AprsParseError):
parse_aprs("notAValidString", reference_date=datetime(2015, 1, 1))
parse_aprs("notAValidString")
def test_basic(self):
message = parse_aprs("FLRDDA5BA>APRS,qAS,LFMX:/160829h4415.41N/00600.03E'342/049/A=005524 this is a comment",
reference_date=datetime(2015, 1, 1, 16, 8, 29))
message = parse_aprs("FLRDDA5BA>APRS,qAS,LFMX:/160829h4415.41N/00600.03E'342/049/A=005524 this is a comment")
self.assertEqual(message['name'], "FLRDDA5BA")
self.assertEqual(message['dstcall'], "APRS")
self.assertEqual(message['receiver_name'], "LFMX")
@ -33,7 +32,7 @@ class TestStringMethods(unittest.TestCase):
def test_v024(self):
# higher precision datum format introduced
raw_message = "FLRDDA5BA>APRS,qAS,LFMX:/160829h4415.41N/00600.03E'342/049/A=005524 !W26! id21400EA9 -2454fpm +0.9rot 19.5dB 0e -6.6kHz gps1x1 s6.02 h44 rDF0C56"
message = parse_aprs(raw_message, reference_date=datetime(2015, 1, 1, 16, 8, 29))
message = parse_aprs(raw_message)
self.assertAlmostEqual(message['latitude'] - 44.2568 - 1 / 30000, 2 / 1000 / 60, 10)
self.assertAlmostEqual(message['longitude'] - 6.0005, 6 / 1000 / 60, 10)
@ -41,7 +40,7 @@ class TestStringMethods(unittest.TestCase):
def test_v025(self):
# introduced the "aprs status" format where many informations (lat, lon, alt, speed, ...) are just optional
raw_message = "EPZR>APRS,TCPIP*,qAC,GLIDERN1:>093456h this is a comment"
message = parse_aprs(raw_message, reference_date=datetime(2015, 1, 1, 9, 35, 29))
message = parse_aprs(raw_message)
self.assertEqual(message['name'], "EPZR")
self.assertEqual(message['receiver_name'], "GLIDERN1")
@ -53,60 +52,60 @@ class TestStringMethods(unittest.TestCase):
def test_v026(self):
# from 0.2.6 the ogn comment of a receiver beacon is just optional
raw_message = "Ulrichamn>APRS,TCPIP*,qAC,GLIDERN1:/085616h5747.30NI01324.77E&/A=001322"
message = parse_aprs(raw_message, reference_date=datetime(2015, 1, 1, 8, 56, 0))
message = parse_aprs(raw_message)
self.assertEqual(message['comment'], '')
def test_v026_relay(self):
# beacons can be relayed
raw_message = "FLRFFFFFF>OGNAVI,NAV07220E*,qAS,NAVITER:/092002h1000.00S/01000.00W'000/000/A=003281 !W00! id2820FFFFFF +300fpm +1.7rot"
message = parse_aprs(raw_message, reference_date=datetime(2015, 1, 1, 8, 56, 0))
message = parse_aprs(raw_message)
self.assertEqual(message['relay'], "NAV07220E")
def test_v027_ddhhmm(self):
# beacons can have hhmmss or ddhhmm timestamp
raw_message = "ICA4B0678>APRS,qAS,LSZF:/301046z4729.50N/00812.89E'227/091/A=002854 !W01! id054B0678 +040fpm +0.0rot 19.0dB 0e +1.5kHz gps1x1"
message = parse_aprs(raw_message, reference_date=datetime(2015, 1, 1, 9, 35, 29))
message = parse_aprs(raw_message)
self.assertEqual(message['timestamp'].strftime('%d %H:%M'), "30 10:46")
def test_negative_altitude(self):
# some devices can report negative altitudes
raw_message = "OGNF71F40>APRS,qAS,NAVITER:/080852h4414.37N/01532.06E'253/052/A=-00013 !W73! id1EF71F40 -060fpm +0.0rot"
message = parse_aprs(raw_message, reference_date=datetime(2015, 1, 1))
message = parse_aprs(raw_message)
self.assertAlmostEqual(message['altitude'], -13 * FEETS_TO_METER, 5)
def test_invalid_coordinates(self):
# sometimes the coordinates leave their valid range: -90<=latitude<=90 or -180<=longitude<=180
with self.assertRaises(AprsParseError):
parse_aprs("RND000000>APRS,qAS,TROCALAN1:/210042h6505.31S/18136.75W^054/325/A=002591 !W31! idA4000000 +099fpm +1.8rot FL029.04 12.0dB 5e -6.3kHz gps11x17", reference_date=datetime(2015, 1, 1))
parse_aprs("RND000000>APRS,qAS,TROCALAN1:/210042h6505.31S/18136.75W^054/325/A=002591 !W31! idA4000000 +099fpm +1.8rot FL029.04 12.0dB 5e -6.3kHz gps11x17")
with self.assertRaises(AprsParseError):
parse_aprs("RND000000>APRS,qAS,TROCALAN1:/210042h9505.31S/17136.75W^054/325/A=002591 !W31! idA4000000 +099fpm +1.8rot FL029.04 12.0dB 5e -6.3kHz gps11x17", reference_date=datetime(2015, 1, 1))
parse_aprs("RND000000>APRS,qAS,TROCALAN1:/210042h9505.31S/17136.75W^054/325/A=002591 !W31! idA4000000 +099fpm +1.8rot FL029.04 12.0dB 5e -6.3kHz gps11x17")
def test_invalid_timestamp(self):
with self.assertRaises(AprsParseError):
parse_aprs("OGND4362A>APRS,qAS,Eternoz:/194490h4700.25N/00601.47E'003/063/A=000000 !W22! id07D4362A 0fpm +0.0rot FL000.00 2.0dB 3e -2.8kHz gps3x4 +12.2dBm", reference_date=datetime(2015, 1, 1))
parse_aprs("OGND4362A>APRS,qAS,Eternoz:/194490h4700.25N/00601.47E'003/063/A=000000 !W22! id07D4362A 0fpm +0.0rot FL000.00 2.0dB 3e -2.8kHz gps3x4 +12.2dBm")
with self.assertRaises(AprsParseError):
parse_aprs("Ulrichamn>APRS,TCPIP*,qAC,GLIDERN1:/194490h5747.30NI01324.77E&/A=001322", reference_date=datetime(2015, 1, 1))
parse_aprs("Ulrichamn>APRS,TCPIP*,qAC,GLIDERN1:/194490h5747.30NI01324.77E&/A=001322")
def test_invalid_altitude(self):
with self.assertRaises(AprsParseError):
parse_aprs("Ulrichamn>APRS,TCPIP*,qAC,GLIDERN1:/085616h5747.30NI01324.77E&/A=12-345", reference_date=datetime(2015, 1, 1))
parse_aprs("Ulrichamn>APRS,TCPIP*,qAC,GLIDERN1:/085616h5747.30NI01324.77E&/A=12-345")
def test_bad_comment(self):
raw_message = "# bad configured ogn receiver"
message = parse_aprs(raw_message, reference_date=datetime(2015, 1, 1))
message = parse_aprs(raw_message)
self.assertEqual(message['comment'], raw_message)
self.assertEqual(message['aprs_type'], 'comment')
def test_server_comment(self):
raw_message = "# aprsc 2.1.4-g408ed49 17 Mar 2018 09:30:36 GMT GLIDERN1 37.187.40.234:10152"
message = parse_aprs(raw_message, reference_date=datetime(2015, 1, 1))
message = parse_aprs(raw_message)
self.assertEqual(message['version'], '2.1.4-g408ed49')
self.assertEqual(message['timestamp'], datetime(2018, 3, 17, 9, 30, 36))

Wyświetl plik

@ -1,8 +1,7 @@
import unittest
from datetime import date, time, datetime
from datetime import datetime
from ogn.parser.utils import parseAngle, createTimestamp
from ogn.parser.exceptions import AmbigousTimeError
class TestStringMethods(unittest.TestCase):
@ -11,31 +10,25 @@ class TestStringMethods(unittest.TestCase):
def proceed_test_data(self, test_data={}):
for test in test_data:
if test[3]:
timestamp = createTimestamp(test[0], reference_date=test[1], reference_time=test[2])
self.assertEqual(timestamp, test[3])
else:
with self.assertRaises(AmbigousTimeError):
createTimestamp(test[0], reference_date=test[1], reference_time=test[2])
timestamp = createTimestamp(test[0], reference_timestamp=test[1])
self.assertEqual(timestamp, test[2])
def test_createTimestamp_hhmmss(self):
test_data = [
('000001h', date(2015, 1, 10), time(0, 0, 1), datetime(2015, 1, 10, 0, 0, 1)), # packet from current day (on the tick)
('235959h', date(2015, 1, 10), time(0, 0, 1), datetime(2015, 1, 9, 23, 59, 59)), # packet from previous day (2 seconds old)
('110000h', date(2015, 1, 10), time(0, 0, 1), None), # packet 11 hours from future or 13 hours old
('123500h', date(2015, 1, 10), time(23, 50, 0), datetime(2015, 1, 10, 12, 35, 0)), # packet from current day (11 hours old)
('000001h', date(2015, 1, 10), time(23, 50, 0), datetime(2015, 1, 11, 0, 0, 1)), # packet from next day (11 minutes from future)
('000001h', date(2015, 1, 10), None, datetime(2015, 1, 10, 0, 0, 1)), # first packet of a specific day
('235959h', date(2015, 1, 10), None, datetime(2015, 1, 10, 23, 59, 59)), # last packet of a specific day
('000001h', datetime(2015, 1, 10, 0, 0, 1), datetime(2015, 1, 10, 0, 0, 1)), # packet from current day (on the tick)
('235959h', datetime(2015, 1, 10, 0, 0, 1), datetime(2015, 1, 9, 23, 59, 59)), # packet from previous day (2 seconds old)
('110000h', datetime(2015, 1, 10, 0, 0, 1), datetime(2015, 1, 10, 11, 0, 0)), # packet 11 hours from future or 13 hours old
('123500h', datetime(2015, 1, 10, 23, 50, 0), datetime(2015, 1, 10, 12, 35, 0)), # packet from current day (11 hours old)
('000001h', datetime(2015, 1, 10, 23, 50, 0), datetime(2015, 1, 11, 0, 0, 1)), # packet from next day (11 minutes from future)
]
self.proceed_test_data(test_data)
def test_createTimestamp_ddhhmm(self):
test_data = [
('011212z', date(2017, 9, 28), time(0, 0, 1), datetime(2017, 9, 1, 12, 12, 0)), # packet from 1st of month, received on september 28th,
('281313z', date(2017, 10, 1), time(0, 0, 1), datetime(2017, 9, 28, 13, 13, 0)), # packet from 28th of month, received on october 1st,
('281414z', date(2017, 1, 1), time(0, 0, 1), datetime(2016, 12, 28, 14, 14, 0)), # packet from 28th of month, received on january 1st,
('011212z', datetime(2017, 9, 28, 0, 0, 1), datetime(2017, 10, 1, 12, 12, 0)), # packet from 1st of month, received on september 28th,
('281313z', datetime(2017, 10, 1, 0, 0, 1), datetime(2017, 9, 28, 13, 13, 0)), # packet from 28th of month, received on october 1st,
('281414z', datetime(2017, 1, 1, 0, 0, 1), datetime(2016, 12, 28, 14, 14, 0)), # packet from 28th of month, received on january 1st,
]
self.proceed_test_data(test_data)