kopia lustrzana https://github.com/glidernet/python-ogn-client
Skip all keys where value is "None"
rodzic
b69e733df4
commit
a610900548
|
@ -1,4 +1,7 @@
|
|||
# CHANGELOG
|
||||
## not released
|
||||
- parser: Skip keys where value is "None"
|
||||
|
||||
## 0.9.8: - 2020-08-21
|
||||
- parser: Changed InReach parser (fixes #73)
|
||||
- parser: separated incompatible ID into parser dependant ID (lt24: address -> lt24_id, skylines: address -> skylines_id,
|
||||
|
|
|
@ -12,24 +12,20 @@ class FlarmParser(BaseParser):
|
|||
def parse_position(self, aprs_comment):
|
||||
match = self.position_pattern.match(aprs_comment)
|
||||
|
||||
def if_present(arg, func):
|
||||
result = match.group(arg)
|
||||
return (func(result)) if result else None
|
||||
|
||||
return {'address_type': if_present('details', lambda x: int(x, 16) & 0b00000011),
|
||||
'aircraft_type': if_present('details', lambda x: (int(x, 16) & 0b01111100) >> 2),
|
||||
'stealth': if_present('details', lambda x: (int(x, 16) & 0b10000000) >> 7 == 1),
|
||||
'address': if_present('address', lambda x: x),
|
||||
'climb_rate': if_present('climb_rate', lambda x: int(x) * FPM_TO_MS),
|
||||
'turn_rate': if_present('turn_rate', lambda x: float(x) * HPM_TO_DEGS),
|
||||
'signal_quality': if_present('signal_quality', float),
|
||||
'error_count': if_present('error_count', int),
|
||||
'frequency_offset': if_present('frequency_offset', float),
|
||||
'gps_quality': if_present('gps_quality', lambda _x: {
|
||||
'horizontal': int(match.group('gps_quality_horizontal')),
|
||||
'vertical': int(match.group('gps_quality_vertical'))}),
|
||||
'software_version': if_present('software_version', float),
|
||||
'hardware_version': if_present('hardware_version', lambda x: int(x, 16)),
|
||||
'real_address': if_present('real_address', lambda x: x),
|
||||
'signal_power': if_present('signal_power', float),
|
||||
}
|
||||
return {k: v for (k, v) in
|
||||
{'address_type': int(match.group('details'), 16) & 0b00000011 if match.group('details') else None,
|
||||
'aircraft_type': (int(match.group('details'), 16) & 0b01111100) >> 2 if match.group('details') else None,
|
||||
'stealth': (int(match.group('details'), 16) & 0b10000000) >> 7 == 1 if match.group('details') else None,
|
||||
'address': match.group('address') or None,
|
||||
'climb_rate': int(match.group('climb_rate')) * FPM_TO_MS if match.group('climb_rate') else None,
|
||||
'turn_rate': float(match.group('turn_rate')) * HPM_TO_DEGS if match.group('turn_rate') else None,
|
||||
'signal_quality': float(match.group('signal_quality')) if match.group('signal_quality') else None,
|
||||
'error_count': int(match.group('error_count')) if match.group('error_count') else None,
|
||||
'frequency_offset': float(match.group('frequency_offset')) if match.group('frequency_offset') else None,
|
||||
'gps_quality': {
|
||||
'horizontal': int(match.group('gps_quality_horizontal')),
|
||||
'vertical': int(match.group('gps_quality_vertical'))} if match.group('gps_quality') else None,
|
||||
'software_version': float(match.group('software_version')) if match.group('software_version') else None,
|
||||
'hardware_version': int(match.group('hardware_version'), 16) if match.group('hardware_version') else None,
|
||||
'real_address': match.group('real_address') or None,
|
||||
'signal_power': float(match.group('signal_power')) if match.group('signal_power') else None}.items() if v is not None}
|
||||
|
|
|
@ -30,48 +30,50 @@ class OgnParser(BaseParser):
|
|||
def parse_aircraft_beacon(self, aprs_comment):
|
||||
ab_match = self.aircraft_pattern.match(aprs_comment)
|
||||
if ab_match:
|
||||
return {'address_type': int(ab_match.group('details'), 16) & 0b00000011,
|
||||
'aircraft_type': (int(ab_match.group('details'), 16) & 0b01111100) >> 2,
|
||||
'stealth': (int(ab_match.group('details'), 16) & 0b10000000) >> 7 == 1,
|
||||
'address': ab_match.group('address'),
|
||||
'climb_rate': int(ab_match.group('climb_rate')) * FPM_TO_MS if ab_match.group('climb_rate') else None,
|
||||
'turn_rate': float(ab_match.group('turn_rate')) * HPM_TO_DEGS if ab_match.group('turn_rate') else None,
|
||||
'flightlevel': float(ab_match.group('flight_level')) if ab_match.group('flight_level') else None,
|
||||
'signal_quality': float(ab_match.group('signal_quality')) if ab_match.group('signal_quality') else None,
|
||||
'error_count': int(ab_match.group('errors')) if ab_match.group('errors') else None,
|
||||
'frequency_offset': float(ab_match.group('frequency_offset')) if ab_match.group('frequency_offset') else None,
|
||||
'gps_quality': {'horizontal': int(ab_match.group('gps_quality_horizontal')),
|
||||
'vertical': int(ab_match.group('gps_quality_vertical'))} if ab_match.group('gps_quality') else None,
|
||||
'software_version': float(ab_match.group('flarm_software_version')) if ab_match.group('flarm_software_version') else None,
|
||||
'hardware_version': int(ab_match.group('flarm_hardware_version'), 16) if ab_match.group('flarm_hardware_version') else None,
|
||||
'real_address': ab_match.group('flarm_id') if ab_match.group('flarm_id') else None,
|
||||
'signal_power': float(ab_match.group('signal_power')) if ab_match.group('signal_power') else None,
|
||||
'proximity': [hear[4:] for hear in ab_match.group('proximity').split(" ")] if ab_match.group('proximity') else None}
|
||||
return {k: v for (k, v) in
|
||||
{'address_type': int(ab_match.group('details'), 16) & 0b00000011,
|
||||
'aircraft_type': (int(ab_match.group('details'), 16) & 0b01111100) >> 2,
|
||||
'stealth': (int(ab_match.group('details'), 16) & 0b10000000) >> 7 == 1,
|
||||
'address': ab_match.group('address'),
|
||||
'climb_rate': int(ab_match.group('climb_rate')) * FPM_TO_MS if ab_match.group('climb_rate') else None,
|
||||
'turn_rate': float(ab_match.group('turn_rate')) * HPM_TO_DEGS if ab_match.group('turn_rate') else None,
|
||||
'flightlevel': float(ab_match.group('flight_level')) if ab_match.group('flight_level') else None,
|
||||
'signal_quality': float(ab_match.group('signal_quality')) if ab_match.group('signal_quality') else None,
|
||||
'error_count': int(ab_match.group('errors')) if ab_match.group('errors') else None,
|
||||
'frequency_offset': float(ab_match.group('frequency_offset')) if ab_match.group('frequency_offset') else None,
|
||||
'gps_quality': {'horizontal': int(ab_match.group('gps_quality_horizontal')),
|
||||
'vertical': int(ab_match.group('gps_quality_vertical'))} if ab_match.group('gps_quality') else None,
|
||||
'software_version': float(ab_match.group('flarm_software_version')) if ab_match.group('flarm_software_version') else None,
|
||||
'hardware_version': int(ab_match.group('flarm_hardware_version'), 16) if ab_match.group('flarm_hardware_version') else None,
|
||||
'real_address': ab_match.group('flarm_id') if ab_match.group('flarm_id') else None,
|
||||
'signal_power': float(ab_match.group('signal_power')) if ab_match.group('signal_power') else None,
|
||||
'proximity': [hear[4:] for hear in ab_match.group('proximity').split(" ")] if ab_match.group('proximity') else None}.items() if v is not None}
|
||||
else:
|
||||
return None
|
||||
|
||||
def parse_receiver_beacon(self, aprs_comment):
|
||||
rb_match = self.receiver_pattern.match(aprs_comment)
|
||||
if rb_match:
|
||||
return {'version': rb_match.group('version'),
|
||||
'platform': rb_match.group('platform'),
|
||||
'cpu_load': float(rb_match.group('cpu_load')),
|
||||
'free_ram': float(rb_match.group('ram_free')),
|
||||
'total_ram': float(rb_match.group('ram_total')),
|
||||
'ntp_error': float(rb_match.group('ntp_offset')),
|
||||
'rt_crystal_correction': float(rb_match.group('ntp_correction')),
|
||||
'voltage': float(rb_match.group('voltage')) if rb_match.group('voltage') else None,
|
||||
'amperage': float(rb_match.group('amperage')) if rb_match.group('amperage') else None,
|
||||
'cpu_temp': float(rb_match.group('cpu_temperature')) if rb_match.group('cpu_temperature') else None,
|
||||
'senders_visible': int(rb_match.group('visible_senders')) if rb_match.group('visible_senders') else None,
|
||||
'senders_total': int(rb_match.group('senders')) if rb_match.group('senders') else None,
|
||||
'rec_crystal_correction': int(rb_match.group('rf_correction_manual')) if rb_match.group('rf_correction_manual') else None,
|
||||
'rec_crystal_correction_fine': float(rb_match.group('rf_correction_automatic')) if rb_match.group('rf_correction_automatic') else None,
|
||||
'rec_input_noise': float(rb_match.group('signal_quality')) if rb_match.group('signal_quality') else None,
|
||||
'senders_signal': float(rb_match.group('senders_signal_quality')) if rb_match.group('senders_signal_quality') else None,
|
||||
'senders_messages': float(rb_match.group('senders_messages')) if rb_match.group('senders_messages') else None,
|
||||
'good_senders_signal': float(rb_match.group('good_senders_signal_quality')) if rb_match.group('good_senders_signal_quality') else None,
|
||||
'good_senders': float(rb_match.group('good_senders')) if rb_match.group('good_senders') else None,
|
||||
'good_and_bad_senders': float(rb_match.group('good_and_bad_senders')) if rb_match.group('good_and_bad_senders') else None}
|
||||
return {k: v for (k, v) in
|
||||
{'version': rb_match.group('version'),
|
||||
'platform': rb_match.group('platform'),
|
||||
'cpu_load': float(rb_match.group('cpu_load')),
|
||||
'free_ram': float(rb_match.group('ram_free')),
|
||||
'total_ram': float(rb_match.group('ram_total')),
|
||||
'ntp_error': float(rb_match.group('ntp_offset')),
|
||||
'rt_crystal_correction': float(rb_match.group('ntp_correction')),
|
||||
'voltage': float(rb_match.group('voltage')) if rb_match.group('voltage') else None,
|
||||
'amperage': float(rb_match.group('amperage')) if rb_match.group('amperage') else None,
|
||||
'cpu_temp': float(rb_match.group('cpu_temperature')) if rb_match.group('cpu_temperature') else None,
|
||||
'senders_visible': int(rb_match.group('visible_senders')) if rb_match.group('visible_senders') else None,
|
||||
'senders_total': int(rb_match.group('senders')) if rb_match.group('senders') else None,
|
||||
'rec_crystal_correction': int(rb_match.group('rf_correction_manual')) if rb_match.group('rf_correction_manual') else None,
|
||||
'rec_crystal_correction_fine': float(rb_match.group('rf_correction_automatic')) if rb_match.group('rf_correction_automatic') else None,
|
||||
'rec_input_noise': float(rb_match.group('signal_quality')) if rb_match.group('signal_quality') else None,
|
||||
'senders_signal': float(rb_match.group('senders_signal_quality')) if rb_match.group('senders_signal_quality') else None,
|
||||
'senders_messages': float(rb_match.group('senders_messages')) if rb_match.group('senders_messages') else None,
|
||||
'good_senders_signal': float(rb_match.group('good_senders_signal_quality')) if rb_match.group('good_senders_signal_quality') else None,
|
||||
'good_senders': float(rb_match.group('good_senders')) if rb_match.group('good_senders') else None,
|
||||
'good_and_bad_senders': float(rb_match.group('good_and_bad_senders')) if rb_match.group('good_and_bad_senders') else None}.items() if v is not None}
|
||||
else:
|
||||
return None
|
||||
|
|
|
@ -178,7 +178,7 @@ PATTERN_RECEIVER_BEACON = re.compile(r"""
|
|||
\s)?
|
||||
CPU:(?P<cpu_load>[\d.]+)\s
|
||||
RAM:(?P<ram_free>[\d.]+)/(?P<ram_total>[\d.]+)MB\s
|
||||
NTP:(?P<ntp_offset>[\d.]+)ms/(?P<ntp_correction>[+-][\d.]+)ppm\s
|
||||
NTP:(?P<ntp_offset>[\d.]+)ms/(?P<ntp_correction>[+-][\d.]+)ppm\s?
|
||||
(?:(?P<voltage>[\d.]+)V\s)?
|
||||
(?:(?P<amperage>[\d.]+)A\s)?
|
||||
(?:(?P<cpu_temperature>[+-][\d.]+)C\s*)?
|
||||
|
|
|
@ -22,6 +22,13 @@ class TestStringMethods(unittest.TestCase):
|
|||
self.assertEqual(message['hardware_version'], 67)
|
||||
self.assertEqual(message['real_address'], "DF0267")
|
||||
|
||||
def test_position_comment_relevant_keys_only(self):
|
||||
# return only keys where we got informations
|
||||
message = FlarmParser().parse_position("id21A8CBA8")
|
||||
|
||||
self.assertIsNotNone(message)
|
||||
self.assertEqual(sorted(message.keys()), sorted(['address_type', 'aircraft_type', 'stealth', 'address']))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
|
@ -58,6 +58,13 @@ class TestStringMethods(unittest.TestCase):
|
|||
self.assertIsNotNone(message_triple)
|
||||
self.assertIsNotNone(message_single)
|
||||
|
||||
def test_relevant_keys_only(self):
|
||||
# return only keys where we got informations
|
||||
message = OgnParser().parse_aircraft_beacon("id093D0930")
|
||||
|
||||
self.assertIsNotNone(message)
|
||||
self.assertEqual(sorted(message.keys()), sorted(['address_type', 'aircraft_type', 'stealth', 'address']))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
|
@ -42,6 +42,13 @@ class TestStringMethods(unittest.TestCase):
|
|||
self.assertEqual(message['good_senders'], 68)
|
||||
self.assertEqual(message['good_and_bad_senders'], 135)
|
||||
|
||||
def test_relevant_keys_only(self):
|
||||
# return only keys where we got informations
|
||||
message = OgnParser().parse_receiver_beacon("v0.2.5.ARM CPU:0.4 RAM:638.0/970.5MB NTP:0.2ms/-1.1ppm")
|
||||
|
||||
self.assertIsNotNone(message)
|
||||
self.assertEqual(sorted(message.keys()), sorted(['version', 'platform', 'cpu_load', 'free_ram', 'total_ram', 'ntp_error', 'rt_crystal_correction']))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
Ładowanie…
Reference in New Issue