refactor: changed logic of unity_tester, replaced threads by generators

pull/12769/head
igor.udot 2023-09-04 14:56:35 +08:00
rodzic 0704733708
commit e0a40feb5c
6 zmienionych plików z 6 dodań i 405 usunięć

Wyświetl plik

@ -2,8 +2,8 @@
# SPDX-License-Identifier: CC0-1.0
import pytest
from idf_unity_tester import CaseTester
from pytest_embedded import Dut
from pytest_embedded_idf.unity_tester import CaseTester
@pytest.mark.esp32s2

Wyświetl plik

@ -2,7 +2,7 @@
# SPDX-License-Identifier: Unlicense OR CC0-1.0
import pytest
from idf_unity_tester import CaseTester
from pytest_embedded_idf.unity_tester import CaseTester
@pytest.mark.esp32

Wyświetl plik

@ -2,8 +2,8 @@
# SPDX-License-Identifier: CC0-1.0
import pytest
from idf_unity_tester import CaseTester
from pytest_embedded import Dut
from pytest_embedded_idf.unity_tester import CaseTester
@pytest.mark.generic

Wyświetl plik

@ -28,20 +28,19 @@ from _pytest.config import Config
from _pytest.fixtures import FixtureRequest
from pytest_embedded.plugin import multi_dut_argument, multi_dut_fixture
from pytest_embedded_idf.dut import IdfDut
from pytest_embedded_idf.unity_tester import CaseTester
try:
from idf_ci_utils import IDF_PATH
from idf_pytest.constants import DEFAULT_SDKCONFIG, ENV_MARKERS, SPECIAL_MARKERS, TARGET_MARKERS
from idf_pytest.plugin import IDF_PYTEST_EMBEDDED_KEY, IdfPytestEmbedded
from idf_pytest.utils import format_case_id, get_target_marker_from_expr
from idf_unity_tester import CaseTester
except ImportError:
sys.path.append(os.path.join(os.path.dirname(__file__), 'tools', 'ci'))
from idf_ci_utils import IDF_PATH
from idf_pytest.constants import DEFAULT_SDKCONFIG, ENV_MARKERS, SPECIAL_MARKERS, TARGET_MARKERS
from idf_pytest.plugin import IDF_PYTEST_EMBEDDED_KEY, IdfPytestEmbedded
from idf_pytest.utils import format_case_id, get_target_marker_from_expr
from idf_unity_tester import CaseTester
try:
import common_test_methods # noqa: F401
@ -70,8 +69,8 @@ def session_tempdir() -> str:
@pytest.fixture
def case_tester(dut: IdfDut, **kwargs): # type: ignore
yield CaseTester(dut, **kwargs)
def case_tester(unity_tester: CaseTester) -> CaseTester:
return unity_tester
@pytest.fixture

Wyświetl plik

@ -5,7 +5,6 @@ tools/ci/check_*.txt
tools/ci/check_*.sh
tools/ci/check_copyright_config.yaml
tools/ci/get_all_test_results.py
tools/ci/idf_unity_tester.py
tools/gdb_panic_server.py
tools/check_term.py
tools/check_python_dependencies.py

Wyświetl plik

@ -1,397 +0,0 @@
# SPDX-FileCopyrightText: 2022-2023 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0
import logging
import os
import time
from multiprocessing import Manager, Process, Semaphore
from multiprocessing.managers import SyncManager
from typing import List, Union
from pexpect.exceptions import TIMEOUT
from pytest_embedded import Dut, unity, utils
from pytest_embedded_idf.dut import UnittestMenuCase
class BaseTester:
"""
The base class that providing shared methods
Attributes:
dut (Dut): Object of the Device under test
test_menu (List[UnittestMenuCase]): The list of the cases
retry_times (int): The retry times when failed to start a case
args (Any): Not used
"""
# The patterns that indicate the runner is ready come from 'unity_runner.c'
ready_pattern_list = ['Press ENTER to see the list of tests',
'Enter test for running',
'Enter next test, or \'enter\' to see menu']
def __init__(self, dut: Union[Dut, List[Dut]], **kwargs) -> None: # type: ignore
self.retry_times = 30
if isinstance(dut, List):
for item in dut:
if isinstance(item, Dut):
self.dut = item
break
else:
self.dut = dut
for k, v in kwargs.items():
setattr(self, k, v)
if 'test_menu' not in kwargs:
self.get_test_menu()
def get_test_menu(self) -> None:
"""
Get the test menu of this test app
Notes:
It will do a hard reset after getting the test menu to ensure
the patterns that indicate the case is ready not taken by the parser.
Please use this function to get the test menu while using this script
"""
self.dut.write('')
self.test_menu = self.dut.parse_test_menu()
self.dut.serial.hard_reset()
class NormalCaseTester(BaseTester):
"""
Tester of normal type case
Attributes:
dut (Dut): Object of the Device under test
test_menu (List[UnittestMenuCase]): The list of the cases
retry_times (int): The retry times when failed to start a case
args (Any): Not used
"""
def run_all_normal_cases(self, reset: bool = False, timeout: int = 90) -> None:
"""
Run all normal cases
Args:
reset: whether do a hardware reset before running the case
timeout: timeout in second
"""
for case in self.test_menu:
self.run_normal_case(case, reset, timeout=timeout)
def run_normal_case(self, case: UnittestMenuCase, reset: bool = False, timeout: int = 90) -> None:
"""
Run a specific normal case
Notes:
Will skip if the case type is not normal
Args:
case: the specific case that parsed in test menu
reset: whether do a hardware reset before running the case
timeout: timeout in second
"""
if case.type == 'normal':
if reset:
self.dut.serial.hard_reset()
self.dut.expect(self.ready_pattern_list, timeout=timeout)
# Retry if write not success
for retry in range(self.retry_times):
self.dut.write(str(case.index))
try:
self.dut.expect_exact('Running {}...'.format(case.name), timeout=1)
break
except TIMEOUT as e:
if retry >= self.retry_times - 1:
raise e
self.dut.expect_unity_test_output(timeout=timeout)
class MultiStageCaseTester(BaseTester):
"""
Tester of multiple stage type case
Attributes:
dut (Dut): Object of the Device under test
test_menu (List[UnittestMenuCase]): The list of the cases
retry_times (int): The retry times when failed to start a case
args (Any): Not used
"""
def run_all_multi_stage_cases(self, reset: bool = False, timeout: int = 90) -> None:
"""
Run all multi_stage cases
Args:
reset: whether do a hardware reset before running the case
timeout: timeout in second
"""
for case in self.test_menu:
self.run_multi_stage_case(case, reset, timeout=timeout)
def run_multi_stage_case(self, case: UnittestMenuCase, reset: bool = False, timeout: int = 90) -> None:
"""
Run a specific multi_stage case
Notes:
Will skip if the case type is not multi_stage
Args:
case: the specific case that parsed in test menu
reset: whether do a hardware reset before running the case
timeout: timeout in second
"""
if case.type == 'multi_stage':
if reset:
self.dut.serial.hard_reset()
for sub_case in case.subcases:
self.dut.expect(self.ready_pattern_list, timeout=timeout)
# Retry if write not success
for retry in range(self.retry_times):
self.dut.write(str(case.index))
try:
self.dut.expect_exact('Running {}...'.format(case.name), timeout=1)
break
except TIMEOUT as e:
if retry >= self.retry_times - 1:
raise e
self.dut.write(str(sub_case['index']))
self.dut.expect_unity_test_output(timeout=timeout)
class MultiDevResource:
"""
Resources of multi_dev dut
Attributes:
dut (Dut): Object of the Device under test
sem (Semaphore): Semaphore of monitoring whether the case finished
recv_sig (List[str]): The list of received signals from other dut
thread (Process): The thread of monitoring the signals
"""
def __init__(self, dut: Dut, manager: SyncManager) -> None:
self.dut = dut
self.sem = Semaphore()
self.recv_sig = manager.list() # type: list[str]
self.process: Process = None # type: ignore
class MultiDevCaseTester(BaseTester):
"""
Tester of multi_device case
Attributes:
group (List[MultiDevResource]): The group of the devices' resources
dut (Dut): The first dut, mainly used to get the test menu only
test_menu (List[UnittestMenuCase]): The list of the cases
retry_times (int): The retry times when failed to start a case
"""
# The signal pattens come from 'test_utils.c'
SEND_SIGNAL_PREFIX = 'Send signal: '
WAIT_SIGNAL_PREFIX = 'Waiting for signal: '
UNITY_SEND_SIGNAL_REGEX = SEND_SIGNAL_PREFIX + r'\[(.*?)\]!'
UNITY_WAIT_SIGNAL_REGEX = WAIT_SIGNAL_PREFIX + r'\[(.*?)\]!'
def __init__(self, dut: Union[Dut, List[Dut]], **kwargs) -> None: # type: ignore
"""
Create the object for every dut and put them into the group
"""
super().__init__(dut, **kwargs)
self._manager = Manager()
self.group: List[MultiDevResource] = []
if isinstance(dut, List):
for item in dut:
if isinstance(item, Dut):
dev_res = MultiDevResource(item, self._manager)
self.group.append(dev_res)
else:
dev_res = MultiDevResource(dut, self._manager)
self.group.append(dev_res)
def _wait_multi_dev_case_finish(self, timeout: int = 60) -> None:
"""
Wait until all the sub-cases of this multi_device case finished
"""
for d in self.group:
if d.sem.acquire(timeout=timeout):
d.sem.release()
else:
raise TimeoutError('Wait case to finish timeout')
def _start_sub_case_process(self, dev_res: MultiDevResource, case: UnittestMenuCase, sub_case_index: int, timeout: int = 60) -> None:
"""
Start the thread monitoring on the corresponding dut of the sub-case
"""
# Allocate the kwargs that pass to '_run'
_kwargs = {}
_kwargs['dut'] = dev_res.dut
_kwargs['dev_res'] = dev_res
_kwargs['case'] = case
_kwargs['sub_case_index'] = sub_case_index
_kwargs['timeout'] = timeout
# Create the thread of the sub-case
dev_res.process = Process(target=self._run, kwargs=_kwargs, daemon=True)
dev_res.process.start()
# Process starts, acquire the semaphore to block '_wait_multi_dev_case_finish'
dev_res.sem.acquire()
def _run(self, **kwargs) -> None: # type: ignore
"""
The thread target function
Will run for each case on each dut
Call the wrapped function to trigger the case
Then keep listening on the dut for the signal
- If the dut send a signal, it will be put into others' recv_sig
- If the dut waits for a signal, it block and keep polling for the recv_sig until get the signal it requires
- If the dut finished running the case, it will quite the loop and terminate the thread
"""
signal_pattern_list = [
self.UNITY_SEND_SIGNAL_REGEX, # The dut send a signal
self.UNITY_WAIT_SIGNAL_REGEX, # The dut is blocked and waiting for a signal
unity.UNITY_SUMMARY_LINE_REGEX, # Means the case finished
]
dut = kwargs['dut']
dev_res = kwargs['dev_res']
case = kwargs['case']
sub_case_index = kwargs['sub_case_index']
timeout = kwargs['timeout']
# Start the case
dut.expect(self.ready_pattern_list)
# Retry at most 30 times if not write successfully
for retry in range(self.retry_times):
dut.write(str(case.index))
try:
dut.expect_exact('Running {}...'.format(case.name), timeout=10)
break
except TIMEOUT as e:
if retry >= self.retry_times - 1:
dev_res.sem.release()
raise e
dut.write(str(sub_case_index))
# Wait for the specific patterns, only exist when the sub-case finished
while True:
pat = dut.expect(signal_pattern_list, timeout=timeout)
if pat is not None:
match_str = pat.group().decode('utf-8')
# Send a signal
if match_str.find(self.SEND_SIGNAL_PREFIX) >= 0:
send_sig = pat.group(1).decode('utf-8')
for d in self.group:
d.recv_sig.append(send_sig)
# Waiting for a signal
elif match_str.find(self.WAIT_SIGNAL_PREFIX) >= 0:
wait_sig = pat.group(1).decode('utf-8')
while True:
if wait_sig in dev_res.recv_sig:
dev_res.recv_sig.remove(wait_sig)
dut.write('')
break
# Keep waiting the signal
else:
time.sleep(0.1)
# Case finished
elif match_str.find('Tests') >= 0:
log = utils.remove_asci_color_code(dut.pexpect_proc.before)
dut.testsuite.add_unity_test_cases(log)
break
# The case finished, release the semaphore to unblock the '_wait_multi_dev_case_finish'
#
# Manually to create the real test case junit report
# The child process attributes won't be reflected to the parent one.
junit_report = os.path.splitext(dut.logfile)[0] + f'_{case.index}_{sub_case_index}.xml'
dut.testsuite.dump(junit_report)
logging.info(f'Created unity output junit report: {junit_report}')
dev_res.sem.release()
def run_all_multi_dev_cases(self, reset: bool = False, timeout: int = 60) -> None:
"""
Run only multi_device cases
Args:
reset: whether do a hardware reset before running the case
timeout: timeout in second
"""
for case in self.test_menu:
# Run multi_device case on every device
self.run_multi_dev_case(case, reset, timeout)
def run_multi_dev_case(self, case: UnittestMenuCase, reset: bool = False, timeout: int = 60) -> None:
"""
Run a specific multi_device case
Notes:
Will skip if the case type is not multi_device
Args:
case: the specific case that parsed in test menu
reset: whether do a hardware reset before running the case
timeout: timeout in second
"""
if case.type == 'multi_device' and len(self.group) > 1:
if reset:
for dev_res in self.group:
dev_res.dut.serial.hard_reset()
# delay a few seconds to make sure the duts are ready.
time.sleep(5)
for sub_case in case.subcases:
if isinstance(sub_case['index'], str):
index = int(sub_case['index'], 10)
else:
index = sub_case['index']
self._start_sub_case_process(dev_res=self.group[index - 1], case=case,
sub_case_index=index, timeout=timeout)
# Waiting all the devices to finish their test cases
self._wait_multi_dev_case_finish(timeout=timeout)
class CaseTester(NormalCaseTester, MultiStageCaseTester, MultiDevCaseTester):
"""
The Generic tester of all the types
Attributes:
group (List[MultiDevResource]): The group of the devices' resources
dut (Dut): The first dut if there is more than one
test_menu (List[UnittestMenuCase]): The list of the cases
"""
def run_all_cases(self, reset: bool = False, timeout: int = 60) -> None:
"""
Run all cases
Args:
reset: whether do a hardware reset before running the case
timeout: timeout in second
"""
for case in self.test_menu:
self.run_case(case, reset, timeout=timeout)
def run_case(self, case: UnittestMenuCase, reset: bool = False, timeout: int = 60) -> None:
"""
Run a specific case
Args:
case: the specific case that parsed in test menu
reset: whether do a hardware reset before running the case
timeout: timeout in second, the case's timeout attribute has a higher priority than this param.
"""
_timeout = int(case.attributes.get('timeout', timeout))
if case.type == 'normal':
self.run_normal_case(case, reset, timeout=_timeout)
elif case.type == 'multi_stage':
self.run_multi_stage_case(case, reset, timeout=_timeout)
elif case.type == 'multi_device':
# here we always do a hard reset between test cases
# since the buffer can't be kept between test cases (which run in different processes)
self.run_multi_dev_case(case, reset=True, timeout=_timeout)