Merge pull request #13 from Meisterschueler/reference_datetime

Split reference_datetime to reference_date and reference_time
pull/16/head
Fabian P. Schmidt 2016-09-29 17:28:25 +02:00 zatwierdzone przez GitHub
commit 8a6a0d1985
5 zmienionych plików z 45 dodań i 36 usunięć

Wyświetl plik

@ -4,6 +4,8 @@
- Added aprs destination callsign as `dstcall` to aprs beacon keys (#9)
- Changed aprs parser to allow other destination calls than `APRS`
- Fixed parsing of APRS precision and datum option (#7)
- Added optional `reference_time` argument to `parse_aprs` function and disabled
magic date correction if this argument is missing
## 0.4.0 - 2016-03-29
- aprs client: Added the possibility of a timed callback

Wyświetl plik

@ -19,10 +19,10 @@ Parse APRS/OGN packet.
```
from ogn.parser import parse_aprs, parse_ogn_beacon
from datetime import datetime
from datetime import date, time
beacon = parse_aprs("FLRDDDEAD>APRS,qAS,EDER:/114500h5029.86N/00956.98E'342/049/A=005524 id0ADDDEAD -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5",
reference_date=datetime(2016,1,1,11,46))
reference_date=date(2016,1,1), reference_time=time(11,46))
beacon.update(parse_ogn_beacon(beacon['comment']))
```

Wyświetl plik

@ -6,16 +6,18 @@ from ogn.parser.pattern import PATTERN_APRS, PATTERN_RECEIVER_BEACON, PATTERN_AI
from ogn.parser.exceptions import AprsParseError, OgnParseError
def parse_aprs(message, reference_date=None):
def parse_aprs(message, reference_date=None, reference_time=None):
if reference_date is None:
reference_date = datetime.utcnow()
now = datetime.utcnow()
reference_date = now.date()
reference_time = now.time()
match = re.search(PATTERN_APRS, message)
if match:
return {'name': match.group('callsign'),
'receiver_name': match.group('receiver'),
'dstcall': match.group('dstcall'),
'timestamp': createTimestamp(match.group('time'), reference_date),
'timestamp': createTimestamp(match.group('time'), reference_date, reference_time),
'latitude': parseAngle('0' + match.group('latitude') + (match.group('latitude_enhancement') or '0')) *
(-1 if match.group('latitude_sign') == 'S' else 1),
'symboltable': match.group('symbol_table'),

Wyświetl plik

@ -16,25 +16,29 @@ def parseAngle(dddmmhht):
return float(dddmmhht[:3]) + float(dddmmhht[3:]) / 60
def createTimestamp(hhmmss, reference):
def createTimestamp(hhmmss, reference_date, reference_time=None):
packet_time = datetime.strptime(hhmmss, '%H%M%S').time()
timestamp = datetime.combine(reference, packet_time)
delta = timestamp - reference
# This function reconstructs the packet date from the timestamp and a reference time.
# delta vs. packet date:
# -24h -12h 0 +12h +24h
# |-------------------------|---------------------|------------------------|----------------------|
# [-] <-- tomorrow [---------today---------] [-------yesterday------]
if timedelta(hours=-12) <= delta <= timedelta(minutes=30):
# Packet less than 12h from the past or 30min from the future
return timestamp
elif delta < timedelta(hours=-23, minutes=-30):
# Packet from next day, less than 30min from the future
return datetime.combine(reference + timedelta(hours=+12), packet_time)
elif timedelta(hours=12) < delta:
# Packet from previous day, less than 12h from the past
return datetime.combine(reference + timedelta(hours=-12), packet_time)
if reference_time is None:
return datetime.combine(reference_date, packet_time)
else:
raise AmbigousTimeError(reference, packet_time)
reference_datetime = datetime.combine(reference_date, reference_time)
timestamp = datetime.combine(reference_date, packet_time)
delta = timestamp - reference_datetime
# This function reconstructs the packet date from the timestamp and a reference_datetime time.
# delta vs. packet date:
# -24h -12h 0 +12h +24h
# |-------------------------|---------------------|------------------------|----------------------|
# [-] <-- tomorrow [---------today---------] [-------yesterday------]
if timedelta(hours=-12) <= delta <= timedelta(minutes=30):
# Packet less than 12h from the past or 30min from the future
return timestamp
elif delta < timedelta(hours=-23, minutes=-30):
# Packet from next day, less than 30min from the future
return datetime.combine(reference_datetime + timedelta(hours=+12), packet_time)
elif timedelta(hours=12) < delta:
# Packet from previous day, less than 12h from the past
return datetime.combine(reference_datetime + timedelta(hours=-12), packet_time)
else:
raise AmbigousTimeError(reference_datetime, packet_time)

Wyświetl plik

@ -1,5 +1,5 @@
import unittest
from datetime import datetime
from datetime import date, time, datetime
from ogn.parser.utils import parseAngle, createTimestamp
from ogn.parser.exceptions import AmbigousTimeError
@ -11,21 +11,22 @@ class TestStringMethods(unittest.TestCase):
def test_createTimestamp(self):
test_data = [
('000001', datetime(2015, 1, 10, 0, 0, 1), datetime(2015, 1, 10, 0, 0, 1)), # packet from current day (on the tick)
('235959', datetime(2015, 1, 10, 0, 0, 1), datetime(2015, 1, 9, 23, 59, 59)), # packet from previous day (2 seconds old)
('110000', datetime(2015, 1, 10, 0, 0, 1), None), # packet 11 hours from future or 13 hours old
('123500', datetime(2015, 1, 10, 23, 50, 0), datetime(2015, 1, 10, 12, 35, 0)), # packet from current day (11 hours old)
('000001', datetime(2015, 1, 10, 23, 50, 0), datetime(2015, 1, 11, 0, 0, 1)) # packet from next day (11 minutes from future)
('000001', date(2015, 1, 10), time(0, 0, 1), datetime(2015, 1, 10, 0, 0, 1)), # packet from current day (on the tick)
('235959', date(2015, 1, 10), time(0, 0, 1), datetime(2015, 1, 9, 23, 59, 59)), # packet from previous day (2 seconds old)
('110000', date(2015, 1, 10), time(0, 0, 1), None), # packet 11 hours from future or 13 hours old
('123500', date(2015, 1, 10), time(23, 50, 0), datetime(2015, 1, 10, 12, 35, 0)), # packet from current day (11 hours old)
('000001', date(2015, 1, 10), time(23, 50, 0), datetime(2015, 1, 11, 0, 0, 1)), # packet from next day (11 minutes from future)
('000001', date(2015, 1, 10), None, datetime(2015, 1, 10, 0, 0, 1)), # first packet of a specific day
('235959', date(2015, 1, 10), None, datetime(2015, 1, 10, 23, 59, 59)), # last packet of a specific day
]
for test in test_data:
if test[2]:
timestamp = createTimestamp(test[0], reference=test[1])
self.assertEqual(timestamp, test[2])
if test[3]:
timestamp = createTimestamp(test[0], reference_date=test[1], reference_time=test[2])
self.assertEqual(timestamp, test[3])
else:
with self.assertRaises(AmbigousTimeError):
createTimestamp(test[0], reference=test[1])
createTimestamp(test[0], reference_date=test[1], reference_time=test[2])
if __name__ == '__main__':
unittest.main()