tiny-test-fw: support detect exception in IDFDUT

pull/3216/head
He Yin Ling 2019-03-16 20:07:52 +08:00
rodzic 5beb2802e0
commit f11eba7802
4 zmienionych plików z 164 dodań i 47 usunięć

Wyświetl plik

@ -42,19 +42,19 @@ import time
import re
import threading
import copy
import sys
import functools
# python2 and python3 queue package name is different
try:
import Queue as _queue
except ImportError:
import queue as _queue
import serial
from serial.tools import list_ports
import Utility
if sys.version_info[0] == 2:
import Queue as _queue
else:
import queue as _queue
class ExpectTimeout(ValueError):
""" timeout for expect method """
@ -201,55 +201,61 @@ class _LogThread(threading.Thread, _queue.Queue):
self.flush_data()
class _RecvThread(threading.Thread):
class RecvThread(threading.Thread):
PERFORMANCE_PATTERN = re.compile(r"\[Performance]\[(\w+)]: ([^\r\n]+)\r?\n")
CHECK_FUNCTIONS = []
""" DUT subclass can define a few check functions to process received data. """
def __init__(self, read, data_cache, recorded_data, record_data_lock):
super(_RecvThread, self).__init__()
super(RecvThread, self).__init__()
self.exit_event = threading.Event()
self.setDaemon(True)
self.read = read
self.data_cache = data_cache
self.recorded_data = recorded_data
self.record_data_lock = record_data_lock
# cache the last line of recv data for collecting performance
self._line_cache = str()
def collect_performance(self, data):
""" collect performance """
if data:
decoded_data = _decode_data(data)
def _line_completion(self, data):
"""
Usually check functions requires to check for one complete line.
This method will do line completion for the first line, and strip incomplete last line.
"""
ret = self._line_cache
decoded_data = _decode_data(data)
matches = self.PERFORMANCE_PATTERN.findall(self._line_cache + decoded_data)
for match in matches:
Utility.console_log("[Performance][{}]: {}".format(match[0], match[1]),
color="orange")
# cache incomplete line to later process
lines = decoded_data.splitlines(True)
last_line = lines[-1]
# cache incomplete line to later process
lines = decoded_data.splitlines(True)
last_line = lines[-1]
if last_line[-1] != "\n":
if len(lines) == 1:
# only one line and the line is not finished, then append this to cache
self._line_cache += lines[-1]
else:
# more than one line and not finished, replace line cache
self._line_cache = lines[-1]
if last_line[-1] != "\n":
if len(lines) == 1:
# only one line and the line is not finished, then append this to cache
self._line_cache += lines[-1]
ret = str()
else:
# line finishes, flush cache
self._line_cache = str()
# more than one line and not finished, replace line cache
self._line_cache = lines[-1]
ret += "".join(lines[:-1])
else:
# line finishes, flush cache
self._line_cache = str()
ret += decoded_data
return ret
def run(self):
while not self.exit_event.isSet():
data = self.read(1000)
if data:
raw_data = self.read(1000)
if raw_data:
with self.record_data_lock:
self.data_cache.put(data)
self.data_cache.put(raw_data)
for capture_id in self.recorded_data:
self.recorded_data[capture_id].put(data)
self.collect_performance(data)
self.recorded_data[capture_id].put(raw_data)
# we need to do line completion before call check functions
comp_data = self._line_completion(raw_data)
for check_function in self.CHECK_FUNCTIONS:
check_function(self, comp_data)
def exit(self):
self.exit_event.set()
@ -267,7 +273,9 @@ class BaseDUT(object):
DEFAULT_EXPECT_TIMEOUT = 10
MAX_EXPECT_FAILURES_TO_SAVED = 10
RECV_THREAD_CLS = RecvThread
""" DUT subclass can specify RECV_THREAD_CLS to do add some extra stuff when receive data.
For example, DUT can implement exception detect & analysis logic in receive thread subclass. """
LOG_THREAD = _LogThread()
LOG_THREAD.start()
@ -398,8 +406,8 @@ class BaseDUT(object):
:return: None
"""
self.receive_thread = _RecvThread(self._port_read, self.data_cache,
self.recorded_data, self.record_data_lock)
self.receive_thread = self.RECV_THREAD_CLS(self._port_read, self.data_cache,
self.recorded_data, self.record_data_lock)
self.receive_thread.start()
def stop_receive(self):
@ -429,9 +437,9 @@ class BaseDUT(object):
if isinstance(data, type(u'')):
try:
data = data.encode('utf-8')
except Exception:
except Exception as e:
print(u'Cannot encode {} of type {}'.format(data, type(data)))
raise
raise e
return data
def write(self, data, eol="\r\n", flush=True):

Wyświetl plik

@ -62,7 +62,7 @@ class Env(object):
self.lock = threading.RLock()
@_synced
def get_dut(self, dut_name, app_path, dut_class=None, app_class=None):
def get_dut(self, dut_name, app_path, dut_class=None, app_class=None, **dut_init_args):
"""
get_dut(dut_name, app_path, dut_class=None, app_class=None)
@ -70,6 +70,7 @@ class Env(object):
:param app_path: application path, app instance will use this path to process application info
:param dut_class: dut class, if not specified will use default dut class of env
:param app_class: app class, if not specified will use default app of env
:keyword dut_init_args: extra kwargs used when creating DUT instance
:return: dut instance
"""
if dut_name in self.allocated_duts:
@ -97,6 +98,7 @@ class Env(object):
dut_config = self.get_variable(dut_name + "_port_config")
except ValueError:
dut_config = dict()
dut_config.update(dut_init_args)
dut = self.default_dut_cls(dut_name, port,
os.path.join(self.log_path, dut_name + ".log"),
app_inst,
@ -168,11 +170,16 @@ class Env(object):
close all DUTs of the Env.
:param dut_debug: if dut_debug is True, then print all dut expect failures before close it
:return: None
:return: exceptions during close DUT
"""
dut_close_errors = []
for dut_name in self.allocated_duts:
dut = self.allocated_duts[dut_name]["dut"]
if dut_debug:
dut.print_debug_info()
dut.close()
try:
dut.close()
except Exception as e:
dut_close_errors.append(e)
self.allocated_duts = dict()
return dut_close_errors

Wyświetl plik

@ -20,9 +20,17 @@ import re
import functools
import tempfile
# python2 and python3 queue package name is different
try:
import Queue as _queue
except ImportError:
import queue as _queue
from serial.tools import list_ports
import DUT
import Utility
try:
import esptool
@ -38,6 +46,56 @@ class IDFToolError(OSError):
pass
class IDFDUTException(RuntimeError):
pass
class IDFRecvThread(DUT.RecvThread):
PERFORMANCE_PATTERN = re.compile(r"\[Performance]\[(\w+)]: ([^\r\n]+)\r?\n")
EXCEPTION_PATTERNS = [
re.compile(r"(Guru Meditation Error: Core\s+\d panic'ed \([\w].*?\))"),
re.compile(r"(abort\(\) was called at PC 0x[a-fA-F\d]{8} on core \d)"),
re.compile(r"(rst 0x\d+ \(TG\dWDT_SYS_RESET|TGWDT_CPU_RESET\))")
]
BACKTRACE_PATTERN = re.compile(r"Backtrace:((\s(0x[0-9a-f]{8}):0x[0-9a-f]{8})+)")
def __init__(self, read, data_cache, recorded_data, record_data_lock):
super(IDFRecvThread, self).__init__(read, data_cache, recorded_data, record_data_lock)
self.exceptions = _queue.Queue()
def collect_performance(self, comp_data):
matches = self.PERFORMANCE_PATTERN.findall(comp_data)
for match in matches:
Utility.console_log("[Performance][{}]: {}".format(match[0], match[1]),
color="orange")
def detect_exception(self, comp_data):
for pattern in self.EXCEPTION_PATTERNS:
start = 0
while True:
match = pattern.search(comp_data, pos=start)
if match:
start = match.end()
self.exceptions.put(match.group(0))
Utility.console_log("[Exception]: {}".format(match.group(0)), color="red")
else:
break
def detect_backtrace(self, comp_data):
# TODO: to support auto parse backtrace
start = 0
while True:
match = self.BACKTRACE_PATTERN.search(comp_data, pos=start)
if match:
start = match.end()
Utility.console_log("[Backtrace]:{}".format(match.group(1)), color="red")
else:
break
CHECK_FUNCTIONS = [collect_performance, detect_exception, detect_backtrace]
def _uses_esptool(func):
""" Suspend listener thread, connect with esptool,
call target function with esptool instance,
@ -78,9 +136,13 @@ class IDFDUT(DUT.SerialDUT):
INVALID_PORT_PATTERN = re.compile(r"AMA|Bluetooth")
# if need to erase NVS partition in start app
ERASE_NVS = True
RECV_THREAD_CLS = IDFRecvThread
def __init__(self, name, port, log_file, app, **kwargs):
def __init__(self, name, port, log_file, app, allow_dut_exception=False, **kwargs):
self.download_config, self.partition_table = app.process_app_info()
super(IDFDUT, self).__init__(name, port, log_file, app, **kwargs)
self.allow_dut_exception = allow_dut_exception
self.exceptions = _queue.Queue()
@classmethod
def get_mac(cls, app, port):
@ -252,3 +314,33 @@ class IDFDUT(DUT.SerialDUT):
return [port_hint] + ports
return ports
def stop_receive(self):
if self.receive_thread:
while True:
try:
self.exceptions.put(self.receive_thread.exceptions.get(timeout=0))
except _queue.Empty:
break
super(IDFDUT, self).stop_receive()
def get_exceptions(self):
""" Get exceptions detected by DUT receive thread. """
if self.receive_thread:
while True:
try:
self.exceptions.put(self.receive_thread.exceptions.get(timeout=0))
except _queue.Empty:
break
exceptions = []
while True:
try:
exceptions.append(self.exceptions.get(timeout=0))
except _queue.Empty:
break
return exceptions
def close(self):
super(IDFDUT, self).close()
if not self.allow_dut_exception and self.get_exceptions():
raise IDFDUTException()

Wyświetl plik

@ -184,10 +184,20 @@ def test_method(**kwargs):
# log failure
junit_test_case.add_failure_info(str(e) + ":\r\n" + traceback.format_exc())
finally:
# do close all DUTs, if result is False then print DUT debug info
close_errors = env_inst.close(dut_debug=(not result))
# We have a hook in DUT close, allow DUT to raise error to fail test case.
# For example, we don't allow DUT exception (reset) during test execution.
# We don't want to implement in exception detection in test function logic,
# as we need to add it to every test case.
# We can implement it in DUT receive thread,
# and raise exception in DUT close to fail test case if reset detected.
if close_errors:
for error in close_errors:
junit_test_case.add_failure_info(str(error))
result = False
if not case_info["junit_report_by_case"]:
JunitReport.test_case_finish(junit_test_case)
# do close all DUTs, if result is False then print DUT debug info
env_inst.close(dut_debug=(not result))
# end case and output result
JunitReport.output_report(junit_file_path)