kopia lustrzana https://github.com/espressif/esp-idf
tiny-test-fw: print expect failure when test fails
rodzic
b354770f42
commit
23d98aa358
|
@ -117,7 +117,7 @@ class _DataCache(_queue.Queue):
|
|||
break
|
||||
return ret
|
||||
|
||||
def get_data(self, timeout=0):
|
||||
def get_data(self, timeout=0.0):
|
||||
"""
|
||||
get a copy of data from cache.
|
||||
|
||||
|
@ -214,6 +214,7 @@ class BaseDUT(object):
|
|||
"""
|
||||
|
||||
DEFAULT_EXPECT_TIMEOUT = 5
|
||||
MAX_EXPECT_FAILURES_TO_SAVED = 10
|
||||
|
||||
def __init__(self, name, port, log_file, app, **kwargs):
|
||||
|
||||
|
@ -224,12 +225,18 @@ class BaseDUT(object):
|
|||
self.app = app
|
||||
self.data_cache = _DataCache()
|
||||
self.receive_thread = None
|
||||
self.expect_failures = []
|
||||
# open and start during init
|
||||
self.open()
|
||||
|
||||
def __str__(self):
|
||||
return "DUT({}: {})".format(self.name, str(self.port))
|
||||
|
||||
def _save_expect_failure(self, pattern, data, start_time):
|
||||
self.expect_failures.insert(0, {"pattern": pattern, "data": data,
|
||||
"start": start_time, "end": time.time()})
|
||||
self.expect_failures = self.expect_failures[:self.MAX_EXPECT_FAILURES_TO_SAVED]
|
||||
|
||||
# define for methods need to be overwritten by Port
|
||||
@classmethod
|
||||
def list_available_ports(cls):
|
||||
|
@ -444,7 +451,9 @@ class BaseDUT(object):
|
|||
data = self.data_cache.get_data(time.time() + timeout - start_time)
|
||||
|
||||
if ret is None:
|
||||
raise ExpectTimeout(self.name + ": " + _pattern_to_string(pattern))
|
||||
pattern = _pattern_to_string(pattern)
|
||||
self._save_expect_failure(pattern, data, start_time)
|
||||
raise ExpectTimeout(self.name + ": " + pattern)
|
||||
return ret
|
||||
|
||||
def _expect_multi(self, expect_all, expect_item_list, timeout):
|
||||
|
@ -508,7 +517,9 @@ class BaseDUT(object):
|
|||
# flush already matched data
|
||||
self.data_cache.flush(slice_index)
|
||||
else:
|
||||
raise ExpectTimeout(self.name + ": " + str([_pattern_to_string(x) for x in expect_items]))
|
||||
pattern = str([_pattern_to_string(x["pattern"]) for x in expect_items])
|
||||
self._save_expect_failure(pattern, data, start_time)
|
||||
raise ExpectTimeout(self.name + ": " + pattern)
|
||||
|
||||
@_expect_lock
|
||||
def expect_any(self, *expect_items, **timeout):
|
||||
|
@ -554,6 +565,22 @@ class BaseDUT(object):
|
|||
timeout["timeout"] = self.DEFAULT_EXPECT_TIMEOUT
|
||||
return self._expect_multi(True, expect_items, **timeout)
|
||||
|
||||
@staticmethod
|
||||
def _format_ts(ts):
|
||||
return "{}:{}".format(time.strftime("%m-%d %H:%M:%S", time.localtime(ts)), str(ts % 1)[2:5])
|
||||
|
||||
def print_debug_info(self):
|
||||
"""
|
||||
Print debug info of current DUT. Currently we will print debug info for expect failures.
|
||||
"""
|
||||
Utility.console_log("DUT debug info for DUT: {}:".format(self.name), color="orange")
|
||||
|
||||
for failure in self.expect_failures:
|
||||
Utility.console_log("\t[pattern]: {}\r\n\t[data]: {}\r\n\t[time]: {} - {}\r\n"
|
||||
.format(failure["pattern"], failure["data"],
|
||||
self._format_ts(failure["start"]), self._format_ts(failure["end"])),
|
||||
color="orange")
|
||||
|
||||
|
||||
class SerialDUT(BaseDUT):
|
||||
""" serial with logging received data feature """
|
||||
|
@ -574,17 +601,14 @@ class SerialDUT(BaseDUT):
|
|||
self.serial_configs.update(kwargs)
|
||||
super(SerialDUT, self).__init__(name, port, log_file, app, **kwargs)
|
||||
|
||||
@staticmethod
|
||||
def _format_data(data):
|
||||
def _format_data(self, data):
|
||||
"""
|
||||
format data for logging. do decode and add timestamp.
|
||||
|
||||
:param data: raw data from read
|
||||
:return: formatted data (str)
|
||||
"""
|
||||
timestamp = time.time()
|
||||
timestamp = "[{}:{}]".format(time.strftime("%m-%d %H:%M:%S", time.localtime(timestamp)),
|
||||
str(timestamp % 1)[2:5])
|
||||
timestamp = "[{}]".format(self._format_ts(time.time()))
|
||||
formatted_data = timestamp.encode() + b"\r\n" + data + b"\r\n"
|
||||
return formatted_data
|
||||
|
||||
|
|
|
@ -162,14 +162,17 @@ class Env(object):
|
|||
return if_addr[self.PROTO_MAP[proto]][0]
|
||||
|
||||
@_synced
|
||||
def close(self):
|
||||
def close(self, dut_debug=False):
|
||||
"""
|
||||
close()
|
||||
close all DUTs of the Env.
|
||||
|
||||
:param dut_debug: if dut_debug is True, then print all dut expect failures before close it
|
||||
:return: None
|
||||
"""
|
||||
for dut_name in self.allocated_duts:
|
||||
dut = self.allocated_duts[dut_name]["dut"]
|
||||
if dut_debug:
|
||||
dut.print_debug_info()
|
||||
dut.close()
|
||||
self.allocated_duts = dict()
|
||||
|
|
|
@ -154,6 +154,7 @@ def test_method(**kwargs):
|
|||
xunit_file = os.path.join(env_inst.app_cls.get_log_folder(env_config["test_suite_name"]),
|
||||
XUNIT_FILE_NAME)
|
||||
XUNIT_RECEIVER.begin_case(test_func.__name__, time.time(), test_func_file_name)
|
||||
result = False
|
||||
try:
|
||||
Utility.console_log("starting running test: " + test_func.__name__, color="green")
|
||||
# execute test function
|
||||
|
@ -163,12 +164,11 @@ def test_method(**kwargs):
|
|||
except Exception as e:
|
||||
# handle all the exceptions here
|
||||
traceback.print_exc()
|
||||
result = False
|
||||
# log failure
|
||||
XUNIT_RECEIVER.failure(str(e), test_func_file_name)
|
||||
finally:
|
||||
# do close all DUTs
|
||||
env_inst.close()
|
||||
# do close all DUTs, if result is False then print DUT debug info
|
||||
env_inst.close(dut_debug=(not result))
|
||||
# end case and output result
|
||||
XUNIT_RECEIVER.end_case(test_func.__name__, time.time())
|
||||
with open(xunit_file, "ab+") as f:
|
||||
|
|
Ładowanie…
Reference in New Issue