Moved all the unittests to a new directory called 'tests'. The command 'make test' now looks in the 'tests' directory.

pull/61/head
Christian T. Jacobs 2017-04-15 01:14:17 +01:00
rodzic 0953a2e431
commit 3a5a1525a5
11 zmienionych plików z 626 dodań i 526 usunięć

Wyświetl plik

@ -31,7 +31,7 @@ docs:
test:
@echo "*** Running the unit tests"
python3 -m unittest discover --start-directory=pyqso --pattern=*.py --verbose
python3 -m unittest discover --start-directory=tests --pattern=*.py --verbose
clean:
@echo "*** Cleaning docs directory"

Wyświetl plik

@ -531,196 +531,3 @@ class ADIF:
else:
return True
class TestADIF(unittest.TestCase):
""" The unit tests for the ADIF module. """
def setUp(self):
""" Set up the ADIF object needed for the unit tests. """
self.adif = ADIF()
def test_adif_read(self):
""" Check that a single ADIF record can be read and parsed correctly. """
f = open("ADIF.test_read.adi", 'w')
f.write("""Some test ADI data.<eoh>
<call:4>TEST<band:3>40m<mode:2>CW
<qso_date:8:d>20130322<time_on:4>1955<eor>""")
f.close()
records = self.adif.read("ADIF.test_read.adi")
expected_records = [{'TIME_ON': '1955', 'BAND': '40m', 'CALL': 'TEST', 'MODE': 'CW', 'QSO_DATE': '20130322'}]
print("Imported records: ", records)
print("Expected records: ", expected_records)
assert(len(records) == 1)
assert(len(list(records[0].keys())) == len(list(expected_records[0].keys())))
assert(records == expected_records)
def test_adif_read_multiple(self):
""" Check that multiple ADIF records can be read and parsed correctly. """
f = open("ADIF.test_read_multiple.adi", 'w')
f.write("""Some test ADI data.<eoh>
<call:4>TEST<band:3>40m<mode:2>CW
<qso_date:8:d>20130322<time_on:4>1955<eor>
<call:8>TEST2ABC<band:3>20m<mode:3>SSB
<qso_date:8>20150227<time_on:4>0820<eor>
<call:5>HELLO<band:2>2m<mode:2>FM<qso_date:8:d>20150227<time_on:4>0832<eor>""")
f.close()
records = self.adif.read("ADIF.test_read_multiple.adi")
expected_records = [{'TIME_ON': '1955', 'BAND': '40m', 'CALL': 'TEST', 'MODE': 'CW', 'QSO_DATE': '20130322'}, {'TIME_ON': '0820', 'BAND': '20m', 'CALL': 'TEST2ABC', 'MODE': 'SSB', 'QSO_DATE': '20150227'}, {'TIME_ON': '0832', 'BAND': '2m', 'CALL': 'HELLO', 'MODE': 'FM', 'QSO_DATE': '20150227'}]
print("Imported records: ", records)
print("Expected records: ", expected_records)
assert(len(records) == 3)
for i in range(len(expected_records)):
assert(len(list(records[i].keys())) == len(list(expected_records[i].keys())))
assert(records == expected_records)
def test_adif_read_alphabet(self):
""" Check that none of the letters of the alphabet are ignored during parsing. """
f = open("ADIF.test_read_alphabet.adi", 'w')
f.write("""Some test ADI data.<eoh>
<call:64>ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ<eor>""")
f.close()
records = self.adif.read("ADIF.test_read_alphabet.adi")
expected_records = [{'CALL': 'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ'}]
print("Imported records: ", records)
print("Expected records: ", expected_records)
assert(len(records) == 1)
assert(len(list(records[0].keys())) == len(list(expected_records[0].keys())))
assert(records == expected_records)
def test_adif_read_capitalisation(self):
""" Check that the CALL field is capitalised correctly. """
f = open("ADIF.test_read_capitalisation.adi", 'w')
f.write("""Some test ADI data.<eoh>
<call:4>test<eor>""")
f.close()
records = self.adif.read("ADIF.test_read_capitalisation.adi")
expected_records = [{'CALL': 'TEST'}]
print("Imported records: ", records)
print("Expected records: ", expected_records)
assert(len(records) == 1)
assert(len(list(records[0].keys())) == len(list(expected_records[0].keys())))
assert(records == expected_records)
def test_adif_read_header_only(self):
""" Check that no records are read in if the ADIF file only contains header information. """
f = open("ADIF.test_read_header_only.adi", 'w')
f.write("""Some test ADI data.<eoh>""")
f.close()
records = self.adif.read("ADIF.test_read_header_only.adi")
expected_records = []
print("Imported records: ", records)
print("Expected records: ", expected_records)
assert(len(records) == 0)
assert(records == expected_records)
def test_adif_read_no_header(self):
""" Check that an ADIF file can be parsed with no header information. """
f = open("ADIF.test_read_no_header.adi", 'w')
f.write("""<call:4>TEST<band:3>40m<mode:2>CW<qso_date:8:d>20130322<time_on:4>1955<eor>""")
f.close()
records = self.adif.read("ADIF.test_read_no_header.adi")
expected_records = [{'TIME_ON': '1955', 'BAND': '40m', 'CALL': 'TEST', 'MODE': 'CW', 'QSO_DATE': '20130322'}]
print("Imported records: ", records)
print("Expected records: ", expected_records)
assert(len(records) == 1)
assert(len(list(records[0].keys())) == len(list(expected_records[0].keys())))
assert(records == expected_records)
def test_adif_write(self):
""" Check that records can be written to an ADIF file correctly. """
records = [{"CALL": "TEST123", "QSO_DATE": "20120402", "TIME_ON": "1234", "FREQ": "145.500", "BAND": "2m", "MODE": "FM", "RST_SENT": "59", "RST_RCVD": "59"},
{"CALL": "TEST123", "QSO_DATE": "20130312", "TIME_ON": "0101", "FREQ": "145.750", "BAND": "2m", "MODE": "FM"}]
self.adif.write(records, "ADIF.test_write.adi")
f = open("ADIF.test_write.adi", 'r')
text = f.read()
print("File 'ADIF.test_write.adi' contains the following text:", text)
assert("""
<adif_ver:5>3.0.4
<programid:5>PyQSO
<programversion:5>1.0.0
<eoh>
<call:7>TEST123
<qso_date:8>20120402
<time_on:4>1234
<freq:7>145.500
<band:2>2m
<mode:2>FM
<rst_sent:2>59
<rst_rcvd:2>59
<eor>
<call:7>TEST123
<qso_date:8>20130312
<time_on:4>0101
<freq:7>145.750
<band:2>2m
<mode:2>FM
<eor>
""" in text) # Ignore the header line here, since it contains the date and time the ADIF file was written, which will change each time 'make unittest' is run.
f.close()
def test_adif_write_sqlite3_Row(self):
""" Check that records can be written to an ADIF file from a test database file. """
import sqlite3
import os.path
path_to_test_database = os.path.join(os.path.realpath(os.path.dirname(__file__)), os.pardir, "res/test.db")
self.connection = sqlite3.connect(path_to_test_database)
self.connection.row_factory = sqlite3.Row
c = self.connection.cursor()
c.execute("SELECT * FROM test")
records = c.fetchall()
print(records)
self.adif.write(records, "ADIF.test_write_sqlite3_Row.adi")
f = open("ADIF.test_write_sqlite3_Row.adi", 'r')
text = f.read()
print("File 'ADIF.test_write_sqlite3_Row.adi' contains the following text:", text)
assert("""
<adif_ver:5>3.0.4
<programid:5>PyQSO
<programversion:5>1.0.0
<eoh>
<call:7>TEST123
<qso_date:8>20120402
<time_on:4>1234
<freq:7>145.500
<band:2>2m
<mode:2>FM
<rst_sent:2>59
<rst_rcvd:2>59
<eor>
<call:7>TEST456
<qso_date:8>20130312
<time_on:4>0101
<freq:7>145.750
<band:2>2m
<mode:2>FM
<eor>
""" in text) # Ignore the header line here, since it contains the date and time the ADIF file was written, which will change each time 'make unittest' is run.
f.close()
self.connection.close()
def test_adif_is_valid(self):
""" Check that ADIF field validation is working correctly for different data types. """
assert(self.adif.is_valid("CALL", "TEST123", "S"))
assert(self.adif.is_valid("QSO_DATE", "20120402", "D"))
assert(self.adif.is_valid("TIME_ON", "1230", "T"))
assert(self.adif.is_valid("TX_PWR", "5", "N"))
if(__name__ == '__main__'):
unittest.main()

Wyświetl plik

@ -315,109 +315,3 @@ def strip(full_callsign):
except ValueError:
callsign = full_callsign
return callsign
class TestCallsignLookup(unittest.TestCase):
""" The unit tests for the CallsignLookup class. """
def setUp(self):
""" Set up the objects needed for the unit tests. """
self.qrz = CallsignLookupQRZ(parent=None)
self.hamqth = CallsignLookupHamQTH(parent=None)
def tearDown(self):
""" Destroy any unit test resources. """
pass
def test_strip(self):
""" Check that a callsign with a prefix and a suffix is stripped correctly. """
callsign = "EA3/MYCALL/MM"
assert strip(callsign) == "MYCALL"
def test_strip_prefix_only(self):
""" Check that a callsign with only a prefix is stripped correctly. """
callsign = "EA3/MYCALL"
assert strip(callsign) == "MYCALL"
def test_strip_suffix_only(self):
""" Check that a callsign with only a suffix is stripped correctly. """
callsign = "MYCALL/M"
assert strip(callsign) == "MYCALL"
def test_strip_no_prefix_or_suffix(self):
""" Check that a callsign with no prefix or suffix remains unmodified. """
callsign = "MYCALL"
assert strip(callsign) == "MYCALL"
def test_qrz_connect(self):
""" Check the example response from the qrz.com server, and make sure the session key has been correctly extracted. """
http_client.HTTPConnection = mock.Mock(spec=http_client.HTTPConnection)
http_client.HTTPResponse = mock.Mock(spec=http_client.HTTPResponse)
connection = http_client.HTTPConnection()
response = http_client.HTTPResponse()
response.read.return_value = b'<?xml version="1.0" encoding="utf-8" ?>\n<QRZDatabase version="1.33" xmlns="http://xmldata.qrz.com">\n<Session>\n<Key>3b1fd1d3ba495189984f93ff67bd45b6</Key>\n<Count>61</Count>\n<SubExp>non-subscriber</SubExp>\n<GMTime>Sun Nov 22 21:25:34 2015</GMTime>\n<Remark>cpu: 0.147s</Remark>\n</Session>\n</QRZDatabase>\n'
connection.getresponse.return_value = response
result = self.qrz.connect("hello", "world")
assert(result)
assert(self.qrz.session_key == "3b1fd1d3ba495189984f93ff67bd45b6")
def test_qrz_lookup(self):
""" Check the example callsign lookup response from the qrz.com server, and make sure the callsign information has been correctly extracted. """
http_client.HTTPConnection = mock.Mock(spec=http_client.HTTPConnection)
http_client.HTTPResponse = mock.Mock(spec=http_client.HTTPResponse)
connection = http_client.HTTPConnection()
response = http_client.HTTPResponse()
response.read.return_value = b'<?xml version="1.0" encoding="utf-8" ?>\n<QRZDatabase version="1.33" xmlns="http://xmldata.qrz.com">\n<Callsign>\n<call>MYCALL</call>\n<fname>FIRSTNAME</fname>\n<name>LASTNAME</name>\n<addr2>ADDRESS2</addr2>\n<country>COUNTRY</country>\n</Callsign>\n<Session>\n<Key>3b1fd1d3ba495189984f93ff67bd45b6</Key>\n<Count>61</Count>\n<SubExp>non-subscriber</SubExp>\n<Message>A subscription is required to access the complete record.</Message>\n<GMTime>Sun Nov 22 21:34:46 2015</GMTime>\n<Remark>cpu: 0.026s</Remark>\n</Session>\n</QRZDatabase>\n'
connection.getresponse.return_value = response
self.qrz.connection = connection
self.qrz.session_key = "3b1fd1d3ba495189984f93ff67bd45b6"
fields_and_data = self.qrz.lookup("MYCALL")
assert(fields_and_data["NAME"] == "FIRSTNAME LASTNAME")
assert(fields_and_data["ADDRESS"] == "ADDRESS2")
assert(fields_and_data["COUNTRY"] == "COUNTRY")
def test_hamqth_connect(self):
""" Check the example response from the hamqth.com server, and make sure the session ID has been correctly extracted. """
http_client.HTTPConnection = mock.Mock(spec=http_client.HTTPConnection)
http_client.HTTPResponse = mock.Mock(spec=http_client.HTTPResponse)
connection = http_client.HTTPConnection()
response = http_client.HTTPResponse()
response.read.return_value = b'<?xml version="1.0"?>\n<HamQTH version="2.6" xmlns="https://www.hamqth.com">\n<session>\n<session_id>09b0ae90050be03c452ad235a1f2915ad684393c</session_id>\n</session>\n</HamQTH>\n'
connection.getresponse.return_value = response
result = self.hamqth.connect("hello", "world")
assert(result)
assert(self.hamqth.session_id == "09b0ae90050be03c452ad235a1f2915ad684393c")
def test_hamqth_lookup(self):
""" Check the example callsign lookup response from the hamqth.com server, and make sure the callsign information has been correctly extracted. """
http_client.HTTPConnection = mock.Mock(spec=http_client.HTTPConnection)
http_client.HTTPResponse = mock.Mock(spec=http_client.HTTPResponse)
connection = http_client.HTTPConnection()
response = http_client.HTTPResponse()
response.read.return_value = b'<?xml version="1.0"?>\n<HamQTH version="2.6" xmlns="https://www.hamqth.com">\n<search>\n<callsign>MYCALL</callsign>\n<nick>NAME</nick>\n<country>COUNTRY</country>\n<itu>ITU</itu>\n<cq>CQ</cq>\n<grid>GRID</grid>\n<adr_street1>ADDRESS</adr_street1>\n</search>\n</HamQTH>\n'
connection.getresponse.return_value = response
self.hamqth.connection = connection
self.hamqth.session_id = "09b0ae90050be03c452ad235a1f2915ad684393c"
fields_and_data = self.hamqth.lookup("MYCALL")
assert(fields_and_data["NAME"] == "NAME")
assert(fields_and_data["ADDRESS"] == "ADDRESS")
assert(fields_and_data["COUNTRY"] == "COUNTRY")
assert(fields_and_data["CQZ"] == "CQ")
assert(fields_and_data["ITUZ"] == "ITU")
assert(fields_and_data["IOTA"] == "GRID")
if(__name__ == '__main__'):
unittest.main()

Wyświetl plik

@ -324,30 +324,3 @@ class DXCluster:
self.items["DISCONNECT"].set_sensitive(not sensitive)
self.items["SEND"].set_sensitive(not sensitive)
return
class TestDXCluster(unittest.TestCase):
""" The unit tests for the DXCluster class. """
def setUp(self):
""" Set up the objects needed for the unit tests. """
PyQSO = mock.MagicMock()
self.dxcluster = DXCluster(application=PyQSO())
def tearDown(self):
""" Destroy any unit test resources. """
pass
def test_on_telnet_io(self):
""" Check that the response from the Telnet server can be correctly decoded. """
telnetlib.Telnet = mock.Mock(spec=telnetlib.Telnet)
connection = telnetlib.Telnet("hello", "world")
connection.read_very_eager.return_value = b"Test message from the Telnet server."
self.dxcluster.connection = connection
result = self.dxcluster.on_telnet_io()
assert(result)
if(__name__ == '__main__'):
unittest.main()

Wyświetl plik

@ -331,164 +331,3 @@ class Log(Gtk.ListStore):
except (sqlite.Error, IndexError) as e:
logging.exception(e)
return None
class TestLog(unittest.TestCase):
def setUp(self):
self.connection = sqlite.connect(":memory:")
self.connection.row_factory = sqlite.Row
self.field_names = ["CALL", "QSO_DATE", "TIME_ON", "FREQ", "BAND", "MODE", "RST_SENT", "RST_RCVD"]
self.fields_and_data = {"CALL": "TEST123", "QSO_DATE": "20130312", "TIME_ON": "1234", "FREQ": "145.500", "BAND": "2m", "MODE": "FM", "RST_SENT": "59", "RST_RCVD": "59"}
c = self.connection.cursor()
query = "CREATE TABLE test (id INTEGER PRIMARY KEY AUTOINCREMENT"
for field_name in self.field_names:
s = ", %s TEXT" % field_name.lower()
query = query + s
query = query + ")"
c.execute(query)
self.log = Log(self.connection, "test")
def tearDown(self):
self.connection.close()
def test_log_add_missing_db_columns(self):
column_names_before = []
column_names_after = []
c = self.connection.cursor()
c.execute("PRAGMA table_info(test)")
result = c.fetchall()
for t in result:
column_names_before.append(t[1].upper())
self.log.add_missing_db_columns()
c.execute("PRAGMA table_info(test)")
result = c.fetchall()
for t in result:
column_names_after.append(t[1].upper())
print("Column names before: ", column_names_before)
print("Column names after: ", column_names_after)
assert(len(column_names_before) == len(self.field_names) + 1) # Added 1 here because of the "ID" column in all database tables.
assert(len(column_names_after) == len(AVAILABLE_FIELD_NAMES_ORDERED) + 1)
for field_name in AVAILABLE_FIELD_NAMES_ORDERED:
assert(field_name in column_names_after)
def test_log_add_record(self):
self.log.add_record(self.fields_and_data)
c = self.connection.cursor()
c.execute("SELECT * FROM test")
records = c.fetchall()
assert len(records) == 1
for field_name in self.field_names:
print(self.fields_and_data[field_name], records[0][field_name])
assert self.fields_and_data[field_name] == records[0][field_name]
def test_log_delete_record(self):
query = "INSERT INTO test VALUES (NULL, ?, ?, ?, ?, ?, ?, ?, ?)"
c = self.connection.cursor()
c.execute(query, (self.fields_and_data["CALL"], self.fields_and_data["QSO_DATE"], self.fields_and_data["TIME_ON"], self.fields_and_data["FREQ"], self.fields_and_data["BAND"], self.fields_and_data["MODE"], self.fields_and_data["RST_SENT"], self.fields_and_data["RST_RCVD"]))
c.execute("SELECT * FROM test")
records_before = c.fetchall()
self.log.delete_record(1)
c.execute("SELECT * FROM test")
records_after = c.fetchall()
assert(len(records_before) == 1)
assert(len(records_after) == 0)
def test_log_edit_record(self):
query = "INSERT INTO test VALUES (NULL, ?, ?, ?, ?, ?, ?, ?, ?)"
c = self.connection.cursor()
c.execute(query, (self.fields_and_data["CALL"], self.fields_and_data["QSO_DATE"], self.fields_and_data["TIME_ON"], self.fields_and_data["FREQ"], self.fields_and_data["BAND"], self.fields_and_data["MODE"], self.fields_and_data["RST_SENT"], self.fields_and_data["RST_RCVD"]))
c.execute("SELECT * FROM test")
record_before = c.fetchall()[0]
self.log.edit_record(1, "CALL", "TEST456")
self.log.edit_record(1, "FREQ", "145.450")
c.execute("SELECT * FROM test")
record_after = c.fetchall()[0]
assert(record_before["CALL"] == "TEST123")
assert(record_after["CALL"] == "TEST456")
assert(record_before["FREQ"] == "145.500")
assert(record_after["FREQ"] == "145.450")
def test_log_get_record_by_index(self):
query = "INSERT INTO test VALUES (NULL, ?, ?, ?, ?, ?, ?, ?, ?)"
c = self.connection.cursor()
c.execute(query, (self.fields_and_data["CALL"], self.fields_and_data["QSO_DATE"], self.fields_and_data["TIME_ON"], self.fields_and_data["FREQ"], self.fields_and_data["BAND"], self.fields_and_data["MODE"], self.fields_and_data["RST_SENT"], self.fields_and_data["RST_RCVD"]))
record = self.log.get_record_by_index(1)
print("Contents of retrieved record: ", record)
for field_name in list(record.keys()):
if(field_name.upper() == "ID"):
continue
else:
assert(record[field_name.upper()] == self.fields_and_data[field_name.upper()])
assert(len(record) == len(self.fields_and_data) + 1)
def test_log_get_all_records(self):
query = "INSERT INTO test VALUES (NULL, ?, ?, ?, ?, ?, ?, ?, ?)"
c = self.connection.cursor()
# Add the same record twice
c.execute(query, (self.fields_and_data["CALL"], self.fields_and_data["QSO_DATE"], self.fields_and_data["TIME_ON"], self.fields_and_data["FREQ"], self.fields_and_data["BAND"], self.fields_and_data["MODE"], self.fields_and_data["RST_SENT"], self.fields_and_data["RST_RCVD"]))
c.execute(query, (self.fields_and_data["CALL"], self.fields_and_data["QSO_DATE"], self.fields_and_data["TIME_ON"], self.fields_and_data["FREQ"], self.fields_and_data["BAND"], self.fields_and_data["MODE"], self.fields_and_data["RST_SENT"], self.fields_and_data["RST_RCVD"]))
records = self.log.get_all_records()
print("Contents of all retrieved records: ", records)
assert(len(records) == 2) # There should be 2 records
for field_name in self.field_names:
assert(records[0][field_name] == self.fields_and_data[field_name])
assert(records[1][field_name] == self.fields_and_data[field_name])
def test_log_record_count(self):
query = "INSERT INTO test VALUES (NULL, ?, ?, ?, ?, ?, ?, ?, ?)"
c = self.connection.cursor()
# Add the same record twice
c.execute(query, (self.fields_and_data["CALL"], self.fields_and_data["QSO_DATE"], self.fields_and_data["TIME_ON"], self.fields_and_data["FREQ"], self.fields_and_data["BAND"], self.fields_and_data["MODE"], self.fields_and_data["RST_SENT"], self.fields_and_data["RST_RCVD"]))
c.execute(query, (self.fields_and_data["CALL"], self.fields_and_data["QSO_DATE"], self.fields_and_data["TIME_ON"], self.fields_and_data["FREQ"], self.fields_and_data["BAND"], self.fields_and_data["MODE"], self.fields_and_data["RST_SENT"], self.fields_and_data["RST_RCVD"]))
record_count = self.log.record_count
print("Number of records in the log: ", record_count)
assert(record_count == 2) # There should be 2 records
def test_log_get_duplicates(self):
query = "INSERT INTO test VALUES (NULL, ?, ?, ?, ?, ?, ?, ?, ?)"
c = self.connection.cursor()
n = 5 # The total number of records to insert.
for i in range(0, n):
c.execute(query, (self.fields_and_data["CALL"], self.fields_and_data["QSO_DATE"], self.fields_and_data["TIME_ON"], self.fields_and_data["FREQ"], self.fields_and_data["BAND"], self.fields_and_data["MODE"], self.fields_and_data["RST_SENT"], self.fields_and_data["RST_RCVD"]))
assert(len(self.log.get_duplicates()) == n-1) # Expecting n-1 duplicates.
def test_log_rename(self):
old_name = "test"
new_name = "hello"
success = self.log.rename(new_name)
assert(success)
with self.connection:
c = self.connection.cursor()
c.execute("SELECT EXISTS(SELECT 1 FROM sqlite_master WHERE name=?)", [old_name])
exists = c.fetchone()
assert(exists[0] == 0) # Old log name should no longer exist.
c.execute("SELECT EXISTS(SELECT 1 FROM sqlite_master WHERE name=?)", [new_name])
exists = c.fetchone()
assert(exists[0] == 1) # New log name should now exist.
assert(self.log.name == new_name)
if(__name__ == '__main__'):
unittest.main()

Wyświetl plik

@ -971,41 +971,3 @@ class Logbook:
log_index = i
break
return log_index
class TestLogbook(unittest.TestCase):
""" The unit tests for the Logbook class. """
def setUp(self):
""" Set up the Logbook object and connection to the test database needed for the unit tests. """
self.logbook = Logbook(application=mock.MagicMock())
path_to_test_database = os.path.join(os.path.realpath(os.path.dirname(__file__)), os.pardir, "res/test.db")
success = self.logbook.db_connect(path_to_test_database)
assert success
# Populate test logs.
for log_name in ["test", "test2"]:
l = Log(self.logbook.connection, log_name)
l.populate()
self.logbook.logs.append(l)
def tearDown(self):
""" Disconnect from the test database. """
success = self.logbook.db_disconnect()
assert success
def test_log_name_exists(self):
""" Check that only the log called 'test' exists. """
assert self.logbook.log_name_exists("test") # Log 'test' exists.
assert not self.logbook.log_name_exists("hello") # Log 'hello' should not exist.
def test_log_count(self):
""" Check that the log count equals 2. """
assert self.logbook.log_count == 2 # A total of 2 logs in the logbook.
def test_record_count(self):
""" Check that the record count equals 5. """
assert self.logbook.record_count == 5 # A total of 5 records over all the logs in the logbook.
if(__name__ == '__main__'):
unittest.main()

213
tests/test_adif.py 100644
Wyświetl plik

@ -0,0 +1,213 @@
#!/usr/bin/env python3
# Copyright (C) 2017 Christian Thomas Jacobs.
# This file is part of PyQSO.
# PyQSO is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# PyQSO is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with PyQSO. If not, see <http://www.gnu.org/licenses/>.
from pyqso.adif import *
class TestADIF(unittest.TestCase):
""" The unit tests for the ADIF module. """
def setUp(self):
""" Set up the ADIF object needed for the unit tests. """
self.adif = ADIF()
def test_adif_read(self):
""" Check that a single ADIF record can be read and parsed correctly. """
f = open("ADIF.test_read.adi", 'w')
f.write("""Some test ADI data.<eoh>
<call:4>TEST<band:3>40m<mode:2>CW
<qso_date:8:d>20130322<time_on:4>1955<eor>""")
f.close()
records = self.adif.read("ADIF.test_read.adi")
expected_records = [{'TIME_ON': '1955', 'BAND': '40m', 'CALL': 'TEST', 'MODE': 'CW', 'QSO_DATE': '20130322'}]
print("Imported records: ", records)
print("Expected records: ", expected_records)
assert(len(records) == 1)
assert(len(list(records[0].keys())) == len(list(expected_records[0].keys())))
assert(records == expected_records)
def test_adif_read_multiple(self):
""" Check that multiple ADIF records can be read and parsed correctly. """
f = open("ADIF.test_read_multiple.adi", 'w')
f.write("""Some test ADI data.<eoh>
<call:4>TEST<band:3>40m<mode:2>CW
<qso_date:8:d>20130322<time_on:4>1955<eor>
<call:8>TEST2ABC<band:3>20m<mode:3>SSB
<qso_date:8>20150227<time_on:4>0820<eor>
<call:5>HELLO<band:2>2m<mode:2>FM<qso_date:8:d>20150227<time_on:4>0832<eor>""")
f.close()
records = self.adif.read("ADIF.test_read_multiple.adi")
expected_records = [{'TIME_ON': '1955', 'BAND': '40m', 'CALL': 'TEST', 'MODE': 'CW', 'QSO_DATE': '20130322'}, {'TIME_ON': '0820', 'BAND': '20m', 'CALL': 'TEST2ABC', 'MODE': 'SSB', 'QSO_DATE': '20150227'}, {'TIME_ON': '0832', 'BAND': '2m', 'CALL': 'HELLO', 'MODE': 'FM', 'QSO_DATE': '20150227'}]
print("Imported records: ", records)
print("Expected records: ", expected_records)
assert(len(records) == 3)
for i in range(len(expected_records)):
assert(len(list(records[i].keys())) == len(list(expected_records[i].keys())))
assert(records == expected_records)
def test_adif_read_alphabet(self):
""" Check that none of the letters of the alphabet are ignored during parsing. """
f = open("ADIF.test_read_alphabet.adi", 'w')
f.write("""Some test ADI data.<eoh>
<call:64>ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ<eor>""")
f.close()
records = self.adif.read("ADIF.test_read_alphabet.adi")
expected_records = [{'CALL': 'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ'}]
print("Imported records: ", records)
print("Expected records: ", expected_records)
assert(len(records) == 1)
assert(len(list(records[0].keys())) == len(list(expected_records[0].keys())))
assert(records == expected_records)
def test_adif_read_capitalisation(self):
""" Check that the CALL field is capitalised correctly. """
f = open("ADIF.test_read_capitalisation.adi", 'w')
f.write("""Some test ADI data.<eoh>
<call:4>test<eor>""")
f.close()
records = self.adif.read("ADIF.test_read_capitalisation.adi")
expected_records = [{'CALL': 'TEST'}]
print("Imported records: ", records)
print("Expected records: ", expected_records)
assert(len(records) == 1)
assert(len(list(records[0].keys())) == len(list(expected_records[0].keys())))
assert(records == expected_records)
def test_adif_read_header_only(self):
""" Check that no records are read in if the ADIF file only contains header information. """
f = open("ADIF.test_read_header_only.adi", 'w')
f.write("""Some test ADI data.<eoh>""")
f.close()
records = self.adif.read("ADIF.test_read_header_only.adi")
expected_records = []
print("Imported records: ", records)
print("Expected records: ", expected_records)
assert(len(records) == 0)
assert(records == expected_records)
def test_adif_read_no_header(self):
""" Check that an ADIF file can be parsed with no header information. """
f = open("ADIF.test_read_no_header.adi", 'w')
f.write("""<call:4>TEST<band:3>40m<mode:2>CW<qso_date:8:d>20130322<time_on:4>1955<eor>""")
f.close()
records = self.adif.read("ADIF.test_read_no_header.adi")
expected_records = [{'TIME_ON': '1955', 'BAND': '40m', 'CALL': 'TEST', 'MODE': 'CW', 'QSO_DATE': '20130322'}]
print("Imported records: ", records)
print("Expected records: ", expected_records)
assert(len(records) == 1)
assert(len(list(records[0].keys())) == len(list(expected_records[0].keys())))
assert(records == expected_records)
def test_adif_write(self):
""" Check that records can be written to an ADIF file correctly. """
records = [{"CALL": "TEST123", "QSO_DATE": "20120402", "TIME_ON": "1234", "FREQ": "145.500", "BAND": "2m", "MODE": "FM", "RST_SENT": "59", "RST_RCVD": "59"},
{"CALL": "TEST123", "QSO_DATE": "20130312", "TIME_ON": "0101", "FREQ": "145.750", "BAND": "2m", "MODE": "FM"}]
self.adif.write(records, "ADIF.test_write.adi")
f = open("ADIF.test_write.adi", 'r')
text = f.read()
print("File 'ADIF.test_write.adi' contains the following text:", text)
assert("""
<adif_ver:5>3.0.4
<programid:5>PyQSO
<programversion:5>1.0.0
<eoh>
<call:7>TEST123
<qso_date:8>20120402
<time_on:4>1234
<freq:7>145.500
<band:2>2m
<mode:2>FM
<rst_sent:2>59
<rst_rcvd:2>59
<eor>
<call:7>TEST123
<qso_date:8>20130312
<time_on:4>0101
<freq:7>145.750
<band:2>2m
<mode:2>FM
<eor>
""" in text) # Ignore the header line here, since it contains the date and time the ADIF file was written, which will change each time 'make unittest' is run.
f.close()
def test_adif_write_sqlite3_Row(self):
""" Check that records can be written to an ADIF file from a test database file. """
import sqlite3
import os.path
path_to_test_database = os.path.join(os.path.realpath(os.path.dirname(__file__)), os.pardir, "res/test.db")
self.connection = sqlite3.connect(path_to_test_database)
self.connection.row_factory = sqlite3.Row
c = self.connection.cursor()
c.execute("SELECT * FROM test")
records = c.fetchall()
print(records)
self.adif.write(records, "ADIF.test_write_sqlite3_Row.adi")
f = open("ADIF.test_write_sqlite3_Row.adi", 'r')
text = f.read()
print("File 'ADIF.test_write_sqlite3_Row.adi' contains the following text:", text)
assert("""
<adif_ver:5>3.0.4
<programid:5>PyQSO
<programversion:5>1.0.0
<eoh>
<call:7>TEST123
<qso_date:8>20120402
<time_on:4>1234
<freq:7>145.500
<band:2>2m
<mode:2>FM
<rst_sent:2>59
<rst_rcvd:2>59
<eor>
<call:7>TEST456
<qso_date:8>20130312
<time_on:4>0101
<freq:7>145.750
<band:2>2m
<mode:2>FM
<eor>
""" in text) # Ignore the header line here, since it contains the date and time the ADIF file was written, which will change each time 'make unittest' is run.
f.close()
self.connection.close()
def test_adif_is_valid(self):
""" Check that ADIF field validation is working correctly for different data types. """
assert(self.adif.is_valid("CALL", "TEST123", "S"))
assert(self.adif.is_valid("QSO_DATE", "20120402", "D"))
assert(self.adif.is_valid("TIME_ON", "1230", "T"))
assert(self.adif.is_valid("TX_PWR", "5", "N"))
if(__name__ == '__main__'):
unittest.main()

Wyświetl plik

@ -0,0 +1,126 @@
#!/usr/bin/env python3
# Copyright (C) 2017 Christian Thomas Jacobs.
# This file is part of PyQSO.
# PyQSO is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# PyQSO is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with PyQSO. If not, see <http://www.gnu.org/licenses/>.
from pyqso.callsign_lookup import *
class TestCallsignLookup(unittest.TestCase):
""" The unit tests for the CallsignLookup class. """
def setUp(self):
""" Set up the objects needed for the unit tests. """
self.qrz = CallsignLookupQRZ(parent=None)
self.hamqth = CallsignLookupHamQTH(parent=None)
def tearDown(self):
""" Destroy any unit test resources. """
pass
def test_strip(self):
""" Check that a callsign with a prefix and a suffix is stripped correctly. """
callsign = "EA3/MYCALL/MM"
assert strip(callsign) == "MYCALL"
def test_strip_prefix_only(self):
""" Check that a callsign with only a prefix is stripped correctly. """
callsign = "EA3/MYCALL"
assert strip(callsign) == "MYCALL"
def test_strip_suffix_only(self):
""" Check that a callsign with only a suffix is stripped correctly. """
callsign = "MYCALL/M"
assert strip(callsign) == "MYCALL"
def test_strip_no_prefix_or_suffix(self):
""" Check that a callsign with no prefix or suffix remains unmodified. """
callsign = "MYCALL"
assert strip(callsign) == "MYCALL"
def test_qrz_connect(self):
""" Check the example response from the qrz.com server, and make sure the session key has been correctly extracted. """
http_client.HTTPConnection = mock.Mock(spec=http_client.HTTPConnection)
http_client.HTTPResponse = mock.Mock(spec=http_client.HTTPResponse)
connection = http_client.HTTPConnection()
response = http_client.HTTPResponse()
response.read.return_value = b'<?xml version="1.0" encoding="utf-8" ?>\n<QRZDatabase version="1.33" xmlns="http://xmldata.qrz.com">\n<Session>\n<Key>3b1fd1d3ba495189984f93ff67bd45b6</Key>\n<Count>61</Count>\n<SubExp>non-subscriber</SubExp>\n<GMTime>Sun Nov 22 21:25:34 2015</GMTime>\n<Remark>cpu: 0.147s</Remark>\n</Session>\n</QRZDatabase>\n'
connection.getresponse.return_value = response
result = self.qrz.connect("hello", "world")
assert(result)
assert(self.qrz.session_key == "3b1fd1d3ba495189984f93ff67bd45b6")
def test_qrz_lookup(self):
""" Check the example callsign lookup response from the qrz.com server, and make sure the callsign information has been correctly extracted. """
http_client.HTTPConnection = mock.Mock(spec=http_client.HTTPConnection)
http_client.HTTPResponse = mock.Mock(spec=http_client.HTTPResponse)
connection = http_client.HTTPConnection()
response = http_client.HTTPResponse()
response.read.return_value = b'<?xml version="1.0" encoding="utf-8" ?>\n<QRZDatabase version="1.33" xmlns="http://xmldata.qrz.com">\n<Callsign>\n<call>MYCALL</call>\n<fname>FIRSTNAME</fname>\n<name>LASTNAME</name>\n<addr2>ADDRESS2</addr2>\n<country>COUNTRY</country>\n</Callsign>\n<Session>\n<Key>3b1fd1d3ba495189984f93ff67bd45b6</Key>\n<Count>61</Count>\n<SubExp>non-subscriber</SubExp>\n<Message>A subscription is required to access the complete record.</Message>\n<GMTime>Sun Nov 22 21:34:46 2015</GMTime>\n<Remark>cpu: 0.026s</Remark>\n</Session>\n</QRZDatabase>\n'
connection.getresponse.return_value = response
self.qrz.connection = connection
self.qrz.session_key = "3b1fd1d3ba495189984f93ff67bd45b6"
fields_and_data = self.qrz.lookup("MYCALL")
assert(fields_and_data["NAME"] == "FIRSTNAME LASTNAME")
assert(fields_and_data["ADDRESS"] == "ADDRESS2")
assert(fields_and_data["COUNTRY"] == "COUNTRY")
def test_hamqth_connect(self):
""" Check the example response from the hamqth.com server, and make sure the session ID has been correctly extracted. """
http_client.HTTPConnection = mock.Mock(spec=http_client.HTTPConnection)
http_client.HTTPResponse = mock.Mock(spec=http_client.HTTPResponse)
connection = http_client.HTTPConnection()
response = http_client.HTTPResponse()
response.read.return_value = b'<?xml version="1.0"?>\n<HamQTH version="2.6" xmlns="https://www.hamqth.com">\n<session>\n<session_id>09b0ae90050be03c452ad235a1f2915ad684393c</session_id>\n</session>\n</HamQTH>\n'
connection.getresponse.return_value = response
result = self.hamqth.connect("hello", "world")
assert(result)
assert(self.hamqth.session_id == "09b0ae90050be03c452ad235a1f2915ad684393c")
def test_hamqth_lookup(self):
""" Check the example callsign lookup response from the hamqth.com server, and make sure the callsign information has been correctly extracted. """
http_client.HTTPConnection = mock.Mock(spec=http_client.HTTPConnection)
http_client.HTTPResponse = mock.Mock(spec=http_client.HTTPResponse)
connection = http_client.HTTPConnection()
response = http_client.HTTPResponse()
response.read.return_value = b'<?xml version="1.0"?>\n<HamQTH version="2.6" xmlns="https://www.hamqth.com">\n<search>\n<callsign>MYCALL</callsign>\n<nick>NAME</nick>\n<country>COUNTRY</country>\n<itu>ITU</itu>\n<cq>CQ</cq>\n<grid>GRID</grid>\n<adr_street1>ADDRESS</adr_street1>\n</search>\n</HamQTH>\n'
connection.getresponse.return_value = response
self.hamqth.connection = connection
self.hamqth.session_id = "09b0ae90050be03c452ad235a1f2915ad684393c"
fields_and_data = self.hamqth.lookup("MYCALL")
assert(fields_and_data["NAME"] == "NAME")
assert(fields_and_data["ADDRESS"] == "ADDRESS")
assert(fields_and_data["COUNTRY"] == "COUNTRY")
assert(fields_and_data["CQZ"] == "CQ")
assert(fields_and_data["ITUZ"] == "ITU")
assert(fields_and_data["IOTA"] == "GRID")
if(__name__ == '__main__'):
unittest.main()

Wyświetl plik

@ -0,0 +1,47 @@
#!/usr/bin/env python3
# Copyright (C) 2017 Christian Thomas Jacobs.
# This file is part of PyQSO.
# PyQSO is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# PyQSO is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with PyQSO. If not, see <http://www.gnu.org/licenses/>.
from pyqso.dx_cluster import *
class TestDXCluster(unittest.TestCase):
""" The unit tests for the DXCluster class. """
def setUp(self):
""" Set up the objects needed for the unit tests. """
PyQSO = mock.MagicMock()
self.dxcluster = DXCluster(application=PyQSO())
def tearDown(self):
""" Destroy any unit test resources. """
pass
def test_on_telnet_io(self):
""" Check that the response from the Telnet server can be correctly decoded. """
telnetlib.Telnet = mock.Mock(spec=telnetlib.Telnet)
connection = telnetlib.Telnet("hello", "world")
connection.read_very_eager.return_value = b"Test message from the Telnet server."
self.dxcluster.connection = connection
result = self.dxcluster.on_telnet_io()
assert(result)
if(__name__ == '__main__'):
unittest.main()

181
tests/test_log.py 100644
Wyświetl plik

@ -0,0 +1,181 @@
#!/usr/bin/env python3
# Copyright (C) 2017 Christian Thomas Jacobs.
# This file is part of PyQSO.
# PyQSO is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# PyQSO is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with PyQSO. If not, see <http://www.gnu.org/licenses/>.
from pyqso.log import *
class TestLog(unittest.TestCase):
def setUp(self):
self.connection = sqlite.connect(":memory:")
self.connection.row_factory = sqlite.Row
self.field_names = ["CALL", "QSO_DATE", "TIME_ON", "FREQ", "BAND", "MODE", "RST_SENT", "RST_RCVD"]
self.fields_and_data = {"CALL": "TEST123", "QSO_DATE": "20130312", "TIME_ON": "1234", "FREQ": "145.500", "BAND": "2m", "MODE": "FM", "RST_SENT": "59", "RST_RCVD": "59"}
c = self.connection.cursor()
query = "CREATE TABLE test (id INTEGER PRIMARY KEY AUTOINCREMENT"
for field_name in self.field_names:
s = ", %s TEXT" % field_name.lower()
query = query + s
query = query + ")"
c.execute(query)
self.log = Log(self.connection, "test")
def tearDown(self):
self.connection.close()
def test_log_add_missing_db_columns(self):
column_names_before = []
column_names_after = []
c = self.connection.cursor()
c.execute("PRAGMA table_info(test)")
result = c.fetchall()
for t in result:
column_names_before.append(t[1].upper())
self.log.add_missing_db_columns()
c.execute("PRAGMA table_info(test)")
result = c.fetchall()
for t in result:
column_names_after.append(t[1].upper())
print("Column names before: ", column_names_before)
print("Column names after: ", column_names_after)
assert(len(column_names_before) == len(self.field_names) + 1) # Added 1 here because of the "ID" column in all database tables.
assert(len(column_names_after) == len(AVAILABLE_FIELD_NAMES_ORDERED) + 1)
for field_name in AVAILABLE_FIELD_NAMES_ORDERED:
assert(field_name in column_names_after)
def test_log_add_record(self):
self.log.add_record(self.fields_and_data)
c = self.connection.cursor()
c.execute("SELECT * FROM test")
records = c.fetchall()
assert len(records) == 1
for field_name in self.field_names:
print(self.fields_and_data[field_name], records[0][field_name])
assert self.fields_and_data[field_name] == records[0][field_name]
def test_log_delete_record(self):
query = "INSERT INTO test VALUES (NULL, ?, ?, ?, ?, ?, ?, ?, ?)"
c = self.connection.cursor()
c.execute(query, (self.fields_and_data["CALL"], self.fields_and_data["QSO_DATE"], self.fields_and_data["TIME_ON"], self.fields_and_data["FREQ"], self.fields_and_data["BAND"], self.fields_and_data["MODE"], self.fields_and_data["RST_SENT"], self.fields_and_data["RST_RCVD"]))
c.execute("SELECT * FROM test")
records_before = c.fetchall()
self.log.delete_record(1)
c.execute("SELECT * FROM test")
records_after = c.fetchall()
assert(len(records_before) == 1)
assert(len(records_after) == 0)
def test_log_edit_record(self):
query = "INSERT INTO test VALUES (NULL, ?, ?, ?, ?, ?, ?, ?, ?)"
c = self.connection.cursor()
c.execute(query, (self.fields_and_data["CALL"], self.fields_and_data["QSO_DATE"], self.fields_and_data["TIME_ON"], self.fields_and_data["FREQ"], self.fields_and_data["BAND"], self.fields_and_data["MODE"], self.fields_and_data["RST_SENT"], self.fields_and_data["RST_RCVD"]))
c.execute("SELECT * FROM test")
record_before = c.fetchall()[0]
self.log.edit_record(1, "CALL", "TEST456")
self.log.edit_record(1, "FREQ", "145.450")
c.execute("SELECT * FROM test")
record_after = c.fetchall()[0]
assert(record_before["CALL"] == "TEST123")
assert(record_after["CALL"] == "TEST456")
assert(record_before["FREQ"] == "145.500")
assert(record_after["FREQ"] == "145.450")
def test_log_get_record_by_index(self):
query = "INSERT INTO test VALUES (NULL, ?, ?, ?, ?, ?, ?, ?, ?)"
c = self.connection.cursor()
c.execute(query, (self.fields_and_data["CALL"], self.fields_and_data["QSO_DATE"], self.fields_and_data["TIME_ON"], self.fields_and_data["FREQ"], self.fields_and_data["BAND"], self.fields_and_data["MODE"], self.fields_and_data["RST_SENT"], self.fields_and_data["RST_RCVD"]))
record = self.log.get_record_by_index(1)
print("Contents of retrieved record: ", record)
for field_name in list(record.keys()):
if(field_name.upper() == "ID"):
continue
else:
assert(record[field_name.upper()] == self.fields_and_data[field_name.upper()])
assert(len(record) == len(self.fields_and_data) + 1)
def test_log_get_all_records(self):
query = "INSERT INTO test VALUES (NULL, ?, ?, ?, ?, ?, ?, ?, ?)"
c = self.connection.cursor()
# Add the same record twice
c.execute(query, (self.fields_and_data["CALL"], self.fields_and_data["QSO_DATE"], self.fields_and_data["TIME_ON"], self.fields_and_data["FREQ"], self.fields_and_data["BAND"], self.fields_and_data["MODE"], self.fields_and_data["RST_SENT"], self.fields_and_data["RST_RCVD"]))
c.execute(query, (self.fields_and_data["CALL"], self.fields_and_data["QSO_DATE"], self.fields_and_data["TIME_ON"], self.fields_and_data["FREQ"], self.fields_and_data["BAND"], self.fields_and_data["MODE"], self.fields_and_data["RST_SENT"], self.fields_and_data["RST_RCVD"]))
records = self.log.get_all_records()
print("Contents of all retrieved records: ", records)
assert(len(records) == 2) # There should be 2 records
for field_name in self.field_names:
assert(records[0][field_name] == self.fields_and_data[field_name])
assert(records[1][field_name] == self.fields_and_data[field_name])
def test_log_record_count(self):
query = "INSERT INTO test VALUES (NULL, ?, ?, ?, ?, ?, ?, ?, ?)"
c = self.connection.cursor()
# Add the same record twice
c.execute(query, (self.fields_and_data["CALL"], self.fields_and_data["QSO_DATE"], self.fields_and_data["TIME_ON"], self.fields_and_data["FREQ"], self.fields_and_data["BAND"], self.fields_and_data["MODE"], self.fields_and_data["RST_SENT"], self.fields_and_data["RST_RCVD"]))
c.execute(query, (self.fields_and_data["CALL"], self.fields_and_data["QSO_DATE"], self.fields_and_data["TIME_ON"], self.fields_and_data["FREQ"], self.fields_and_data["BAND"], self.fields_and_data["MODE"], self.fields_and_data["RST_SENT"], self.fields_and_data["RST_RCVD"]))
record_count = self.log.record_count
print("Number of records in the log: ", record_count)
assert(record_count == 2) # There should be 2 records
def test_log_get_duplicates(self):
query = "INSERT INTO test VALUES (NULL, ?, ?, ?, ?, ?, ?, ?, ?)"
c = self.connection.cursor()
n = 5 # The total number of records to insert.
for i in range(0, n):
c.execute(query, (self.fields_and_data["CALL"], self.fields_and_data["QSO_DATE"], self.fields_and_data["TIME_ON"], self.fields_and_data["FREQ"], self.fields_and_data["BAND"], self.fields_and_data["MODE"], self.fields_and_data["RST_SENT"], self.fields_and_data["RST_RCVD"]))
assert(len(self.log.get_duplicates()) == n-1) # Expecting n-1 duplicates.
def test_log_rename(self):
old_name = "test"
new_name = "hello"
success = self.log.rename(new_name)
assert(success)
with self.connection:
c = self.connection.cursor()
c.execute("SELECT EXISTS(SELECT 1 FROM sqlite_master WHERE name=?)", [old_name])
exists = c.fetchone()
assert(exists[0] == 0) # Old log name should no longer exist.
c.execute("SELECT EXISTS(SELECT 1 FROM sqlite_master WHERE name=?)", [new_name])
exists = c.fetchone()
assert(exists[0] == 1) # New log name should now exist.
assert(self.log.name == new_name)
if(__name__ == '__main__'):
unittest.main()

Wyświetl plik

@ -0,0 +1,58 @@
#!/usr/bin/env python3
# Copyright (C) 2017 Christian Thomas Jacobs.
# This file is part of PyQSO.
# PyQSO is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# PyQSO is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with PyQSO. If not, see <http://www.gnu.org/licenses/>.
from pyqso.logbook import *
class TestLogbook(unittest.TestCase):
""" The unit tests for the Logbook class. """
def setUp(self):
""" Set up the Logbook object and connection to the test database needed for the unit tests. """
self.logbook = Logbook(application=mock.MagicMock())
path_to_test_database = os.path.join(os.path.realpath(os.path.dirname(__file__)), os.pardir, "res/test.db")
success = self.logbook.db_connect(path_to_test_database)
assert success
# Populate test logs.
for log_name in ["test", "test2"]:
l = Log(self.logbook.connection, log_name)
l.populate()
self.logbook.logs.append(l)
def tearDown(self):
""" Disconnect from the test database. """
success = self.logbook.db_disconnect()
assert success
def test_log_name_exists(self):
""" Check that only the log called 'test' exists. """
assert self.logbook.log_name_exists("test") # Log 'test' exists.
assert not self.logbook.log_name_exists("hello") # Log 'hello' should not exist.
def test_log_count(self):
""" Check that the log count equals 2. """
assert self.logbook.log_count == 2 # A total of 2 logs in the logbook.
def test_record_count(self):
""" Check that the record count equals 5. """
assert self.logbook.record_count == 5 # A total of 5 records over all the logs in the logbook.
if(__name__ == '__main__'):
unittest.main()