kopia lustrzana https://github.com/glidernet/ogn-python
Test process
rodzic
33f9ddbf51
commit
184853fa59
|
@ -3,7 +3,6 @@ import re
|
|||
|
||||
from manager import Manager
|
||||
from ogn.commands.dbutils import session
|
||||
from ogn.gateway.process import message_to_beacon
|
||||
from ogn.model import AircraftBeacon, ReceiverBeacon
|
||||
from ogn.utils import open_file
|
||||
|
||||
|
@ -34,6 +33,8 @@ def convert(sourcefile, path=''):
|
|||
import gzip
|
||||
import datetime
|
||||
|
||||
from ogn.gateway.process import message_to_beacon
|
||||
|
||||
match = re.search(PATTERN, sourcefile)
|
||||
if match:
|
||||
reference_date_string = match.group(1)
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
import datetime
|
||||
import unittest
|
||||
import unittest.mock as mock
|
||||
|
||||
from ogn.gateway.process import process_beacon
|
||||
|
||||
|
||||
class ProcessManagerTest(unittest.TestCase):
|
||||
@mock.patch('ogn.gateway.process.session')
|
||||
@mock.patch('ogn.gateway.process.datetime')
|
||||
def test_process_beacon(self, mock_datetime, mock_session):
|
||||
import ogn.gateway.process as gateway_process
|
||||
gateway_process.last_commit = datetime.datetime(2015, 1, 1, 10, 0, 0)
|
||||
mock_datetime.utcnow.return_value = datetime.datetime(2015, 1, 1, 10, 0, 0)
|
||||
|
||||
string1 = "ICA3DD6CD>APRS,qAS,Moosburg:/195919h4820.93N/01151.39EX264/127/A=002204 !W20! id0D3DD6CD -712fpm -0.1rot 8.5dB 0e -2.1kHz gps2x2"
|
||||
string2 = "ICA3DD6CD>APRS,qAS,Moosburg:/195925h4820.90N/01151.07EX263/126/A=002139 !W74! id0D3DD6CD -712fpm +0.0rot 7.8dB 1e -2.1kHz"
|
||||
|
||||
process_beacon(string1)
|
||||
mock_session.bulk_save_objects.assert_not_called()
|
||||
|
||||
mock_datetime.utcnow.return_value = datetime.datetime(2015, 1, 1, 10, 0, 1) # one second later
|
||||
process_beacon(string2)
|
||||
self.assertEqual(mock_session.bulk_save_objects.call_count, 1)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Ładowanie…
Reference in New Issue