Move local target detection to `ttfw_idf`

rename upper_list to upper_list_or_str
minor fix for `unit_test.py` `is 'name'` -> `== 'name`
pull/5452/head
Fu Hanxi 2020-05-29 12:39:58 +08:00
rodzic e99172fbac
commit 377d3eaaa5
5 zmienionych plików z 94 dodań i 75 usunięć

Wyświetl plik

@ -13,8 +13,6 @@
# limitations under the License. # limitations under the License.
""" Interface for test cases. """ """ Interface for test cases. """
import json
import logging
import os import os
import time import time
import traceback import traceback
@ -65,6 +63,7 @@ class DefaultEnvConfig(object):
set_default_config = DefaultEnvConfig.set_default_config set_default_config = DefaultEnvConfig.set_default_config
get_default_config = DefaultEnvConfig.get_default_config get_default_config = DefaultEnvConfig.get_default_config
MANDATORY_INFO = { MANDATORY_INFO = {
"execution_time": 1, "execution_time": 1,
"env_tag": "default", "env_tag": "default",
@ -159,13 +158,11 @@ def test_method(**kwargs):
In some cases, one test function might test many test cases. In some cases, one test function might test many test cases.
If this flag is set, test case can update junit report by its own. If this flag is set, test case can update junit report by its own.
""" """
def test(test_func): def test(test_func):
case_info = MANDATORY_INFO.copy() case_info = MANDATORY_INFO.copy()
case_info["name"] = case_info["ID"] = test_func.__name__ case_info["name"] = case_info["ID"] = test_func.__name__
case_info["junit_report_by_case"] = False case_info["junit_report_by_case"] = False
case_info.update(kwargs) case_info.update(kwargs)
@functools.wraps(test_func) @functools.wraps(test_func)
@ -183,48 +180,7 @@ def test_method(**kwargs):
if key in env_config: if key in env_config:
env_config[key] = kwargs[key] env_config[key] = kwargs[key]
# Runner.py should overwrite target with the current target.
env_config.update(overwrite) env_config.update(overwrite)
# if target not in the default_config or the overwrite, then take it from kwargs passed from the decorator
target = env_config['target'] if 'target' in env_config else kwargs['target']
# This code block is used to do run local test script target set check
if not os.getenv('CI_JOB_NAME'):
idf_target = 'ESP32' # default if sdkconfig not found or not readable
expected_json_path = os.path.join('build', 'config', 'sdkconfig.json')
if os.path.exists(expected_json_path):
sdkconfig = json.load(open(expected_json_path))
try:
idf_target = sdkconfig['IDF_TARGET'].upper()
except KeyError:
pass
else:
logging.info('IDF_TARGET: {}'.format(idf_target))
else:
logging.warning('{} not found. IDF_TARGET set to esp32'.format(os.path.abspath(expected_json_path)))
if isinstance(target, list):
if idf_target in target:
target = idf_target
else:
raise ValueError('IDF_TARGET set to {}, not in decorator target value'.format(idf_target))
else:
if idf_target != target:
raise ValueError('IDF_TARGET set to {}, not equal to decorator target value'.format(idf_target))
dut_dict = kwargs['dut_dict']
if target not in dut_dict:
raise Exception('target can only be {%s} (case insensitive)' % ', '.join(dut_dict.keys()))
dut = dut_dict[target]
try:
# try to config the default behavior of erase nvs
dut.ERASE_NVS = kwargs['erase_nvs']
except AttributeError:
pass
env_config['dut'] = dut
env_inst = Env.Env(**env_config) env_inst = Env.Env(**env_config)
# prepare for xunit test results # prepare for xunit test results
@ -271,5 +227,4 @@ def test_method(**kwargs):
handle_test.case_info = case_info handle_test.case_info = case_info
handle_test.test_method = True handle_test.test_method = True
return handle_test return handle_test
return test return test

Wyświetl plik

@ -45,6 +45,7 @@ Template Config File::
import importlib import importlib
import yaml import yaml
try: try:
from yaml import CLoader as Loader from yaml import CLoader as Loader
except ImportError: except ImportError:
@ -194,12 +195,22 @@ class Parser(object):
try: try:
_target = _filter['target'] _target = _filter['target']
except KeyError: except KeyError:
pass _target = None
else: else:
_overwrite.update({'target': _target}) _overwrite.update({'target': _target})
for test_method in test_methods: for test_method in test_methods:
if _filter_one_case(test_method, _filter): if _filter_one_case(test_method, _filter):
try:
dut_dict = test_method.case_info['dut_dict']
except (AttributeError, KeyError):
dut_dict = None
if dut_dict and _target:
if _target.upper() in dut_dict:
_overwrite.update({'dut': dut_dict[_target.upper()]})
else:
raise ValueError('target {} is not in the specified dut_dict'.format(_target))
test_case_list.append(TestCase.TestCase(test_method, _extra_data, **_overwrite)) test_case_list.append(TestCase.TestCase(test_method, _extra_data, **_overwrite))
return test_case_list return test_case_list

Wyświetl plik

@ -111,8 +111,6 @@ class Search(object):
# mark the cases with targets not in ci_target # mark the cases with targets not in ci_target
for case in replicated_cases: for case in replicated_cases:
ci_target = case.case_info['ci_target'] ci_target = case.case_info['ci_target']
if isinstance(ci_target, str):
ci_target = [ci_target]
if not ci_target or case.case_info['target'] in ci_target: if not ci_target or case.case_info['target'] in ci_target:
case.case_info['supported_in_ci'] = True case.case_info['supported_in_ci'] = True
else: else:

Wyświetl plik

@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import functools import functools
import json
import logging
import os import os
import re import re
@ -27,8 +29,8 @@ TARGET_DUT_CLS_DICT = {
} }
def format_case_id(chip, case_name): def format_case_id(target, case_name):
return "{}.{}".format(chip, case_name) return "{}.{}".format(target, case_name)
try: try:
@ -37,21 +39,66 @@ except NameError:
string_type = str string_type = str
def upper_list(text): def upper_list_or_str(text):
if not text: """
return text Return the uppercase of list of string or string. Return itself for other
data types
:param text: list or string, other instance will be returned immediately
:return: uppercase of list of string
"""
if isinstance(text, string_type): if isinstance(text, string_type):
res = text.upper() return [text.upper()]
elif isinstance(text, list):
return [item.upper() for item in text]
else: else:
res = [item.upper() for item in text] return text
return res
def local_test_check(decorator_target):
# Try to get the sdkconfig.json to read the IDF_TARGET value.
# If not set, will set to ESP32.
# For CI jobs, this is a fake procedure, the true target and dut will be
# overwritten by the job config YAML file.
idf_target = 'ESP32' # default if sdkconfig not found or not readable
if os.getenv('CI_JOB_ID'): # Only auto-detect target when running locally
return idf_target
expected_json_path = os.path.join('build', 'config', 'sdkconfig.json')
if os.path.exists(expected_json_path):
sdkconfig = json.load(open(expected_json_path))
try:
idf_target = sdkconfig['IDF_TARGET'].upper()
except KeyError:
logging.warning('IDF_TARGET not in {}. IDF_TARGET set to esp32'.format(os.path.abspath(expected_json_path)))
else:
logging.info('IDF_TARGET: {}'.format(idf_target))
else:
logging.warning('{} not found. IDF_TARGET set to esp32'.format(os.path.abspath(expected_json_path)))
if isinstance(decorator_target, list):
if idf_target not in decorator_target:
raise ValueError('IDF_TARGET set to {}, not in decorator target value'.format(idf_target))
else:
if idf_target != decorator_target:
raise ValueError('IDF_TARGET set to {}, not equal to decorator target value'.format(idf_target))
return idf_target
def get_dut_class(target, erase_nvs=None):
if target not in TARGET_DUT_CLS_DICT:
raise Exception('target can only be {%s} (case insensitive)' % ', '.join(TARGET_DUT_CLS_DICT.keys()))
dut = TARGET_DUT_CLS_DICT[target.upper()]
if erase_nvs:
dut.ERASE_NVS = 'erase_nvs'
return dut
def ci_target_check(func): def ci_target_check(func):
@functools.wraps(func) @functools.wraps(func)
def wrapper(**kwargs): def wrapper(**kwargs):
target = upper_list(kwargs.get('target', [])) target = upper_list_or_str(kwargs.get('target', []))
ci_target = upper_list(kwargs.get('ci_target', [])) ci_target = upper_list_or_str(kwargs.get('ci_target', []))
if not set(ci_target).issubset(set(target)): if not set(ci_target).issubset(set(target)):
raise ValueError('ci_target must be a subset of target') raise ValueError('ci_target must be a subset of target')
@ -79,9 +126,13 @@ def idf_example_test(app=Example, target="ESP32", ci_target=None, module="exampl
""" """
def test(func): def test(func):
original_method = TinyFW.test_method(app=app, target=upper_list(target), ci_target=upper_list(ci_target), module=module, test_target = local_test_check(target)
execution_time=execution_time, level=level, dut_dict=TARGET_DUT_CLS_DICT, dut = get_dut_class(test_target, erase_nvs)
erase_nvs=erase_nvs, **kwargs) original_method = TinyFW.test_method(
app=app, dut=dut, target=upper_list_or_str(target), ci_target=upper_list_or_str(ci_target),
module=module, execution_time=execution_time, level=level, erase_nvs=erase_nvs,
dut_dict=TARGET_DUT_CLS_DICT, **kwargs
)
test_func = original_method(func) test_func = original_method(func)
test_func.case_info["ID"] = format_case_id(target, test_func.case_info["name"]) test_func.case_info["ID"] = format_case_id(target, test_func.case_info["name"])
return test_func return test_func
@ -107,9 +158,13 @@ def idf_unit_test(app=UT, target="ESP32", ci_target=None, module="unit-test", ex
""" """
def test(func): def test(func):
original_method = TinyFW.test_method(app=app, target=upper_list(target), ci_target=upper_list(ci_target), module=module, test_target = local_test_check(target)
execution_time=execution_time, level=level, dut_dict=TARGET_DUT_CLS_DICT, dut = get_dut_class(test_target, erase_nvs)
erase_nvs=erase_nvs, **kwargs) original_method = TinyFW.test_method(
app=app, dut=dut, target=upper_list_or_str(target), ci_target=upper_list_or_str(ci_target),
module=module, execution_time=execution_time, level=level, erase_nvs=erase_nvs,
dut_dict=TARGET_DUT_CLS_DICT, **kwargs
)
test_func = original_method(func) test_func = original_method(func)
test_func.case_info["ID"] = format_case_id(target, test_func.case_info["name"]) test_func.case_info["ID"] = format_case_id(target, test_func.case_info["name"])
return test_func return test_func
@ -137,9 +192,13 @@ def idf_custom_test(app=TestApp, target="ESP32", ci_target=None, module="misc",
""" """
def test(func): def test(func):
original_method = TinyFW.test_method(app=app, target=upper_list(target), ci_target=upper_list(ci_target), module=module, test_target = local_test_check(target)
execution_time=execution_time, level=level, dut_dict=TARGET_DUT_CLS_DICT, dut = get_dut_class(test_target, erase_nvs)
erase_nvs=erase_nvs, **kwargs) original_method = TinyFW.test_method(
app=app, dut=dut, target=upper_list_or_str(target), ci_target=upper_list_or_str(ci_target),
module=module, execution_time=execution_time, level=level, erase_nvs=erase_nvs,
dut_dict=TARGET_DUT_CLS_DICT, **kwargs
)
test_func = original_method(func) test_func = original_method(func)
test_func.case_info["ID"] = format_case_id(target, test_func.case_info["name"]) test_func.case_info["ID"] = format_case_id(target, test_func.case_info["name"])
return test_func return test_func

Wyświetl plik

@ -26,7 +26,6 @@ import threading
from tiny_test_fw import TinyFW, Utility, Env, DUT from tiny_test_fw import TinyFW, Utility, Env, DUT
import ttfw_idf import ttfw_idf
UT_APP_BOOT_UP_DONE = "Press ENTER to see the list of tests." UT_APP_BOOT_UP_DONE = "Press ENTER to see the list of tests."
# matches e.g.: "rst:0xc (SW_CPU_RESET),boot:0x13 (SPI_FAST_FLASH_BOOT)" # matches e.g.: "rst:0xc (SW_CPU_RESET),boot:0x13 (SPI_FAST_FLASH_BOOT)"
@ -188,12 +187,11 @@ def reset_dut(dut):
def log_test_case(description, test_case, ut_config): def log_test_case(description, test_case, ut_config):
Utility.console_log("Running {} '{}' (config {})".format(description, test_case["name"], ut_config), color="orange") Utility.console_log("Running {} '{}' (config {})".format(description, test_case["name"], ut_config), color="orange")
Utility.console_log("Tags: %s" % ", ".join("%s=%s" % (k,v) for (k,v) in test_case.items() if k != "name" and v is not None), color="orange") Utility.console_log("Tags: %s" % ", ".join("%s=%s" % (k, v) for (k, v) in test_case.items() if k != "name" and v is not None), color="orange")
def run_one_normal_case(dut, one_case, junit_test_case): def run_one_normal_case(dut, one_case, junit_test_case):
reset_dut(dut) reset_dut(dut)
dut.start_capture_raw_data() dut.start_capture_raw_data()
@ -330,7 +328,6 @@ def run_unit_test_cases(env, extra_data):
class Handler(threading.Thread): class Handler(threading.Thread):
WAIT_SIGNAL_PATTERN = re.compile(r'Waiting for signal: \[(.+)]!') WAIT_SIGNAL_PATTERN = re.compile(r'Waiting for signal: \[(.+)]!')
SEND_SIGNAL_PATTERN = re.compile(r'Send signal: \[([^]]+)](\[([^]]+)])?!') SEND_SIGNAL_PATTERN = re.compile(r'Send signal: \[([^]]+)](\[([^]]+)])?!')
FINISH_PATTERN = re.compile(r"1 Tests (\d) Failures (\d) Ignored") FINISH_PATTERN = re.compile(r"1 Tests (\d) Failures (\d) Ignored")
@ -691,7 +688,6 @@ def run_multiple_stage_cases(env, extra_data):
def detect_update_unit_test_info(env, extra_data, app_bin): def detect_update_unit_test_info(env, extra_data, app_bin):
case_config = format_test_case_config(extra_data) case_config = format_test_case_config(extra_data)
for ut_config in case_config: for ut_config in case_config:
@ -785,7 +781,7 @@ if __name__ == '__main__':
if len(test_item) == 0: if len(test_item) == 0:
continue continue
pair = test_item.split(r':', 1) pair = test_item.split(r':', 1)
if len(pair) == 1 or pair[0] is 'name': if len(pair) == 1 or pair[0] == 'name':
test_dict['name'] = pair[0] test_dict['name'] = pair[0]
elif len(pair) == 2: elif len(pair) == 2:
if pair[0] == 'timeout' or pair[0] == 'child case num': if pair[0] == 'timeout' or pair[0] == 'child case num':