kopia lustrzana https://github.com/espressif/esp-idf
Merge branch 'feat/add_multi_target_for_example_test' into 'master'
example test: Add multi target support Closes IDF-1531 and IDF-807 See merge request espressif/esp-idf!8242pull/5452/head
commit
4f3624bbe9
|
@ -44,6 +44,7 @@ import re
|
|||
import json
|
||||
|
||||
import yaml
|
||||
|
||||
try:
|
||||
from yaml import CLoader as Loader
|
||||
except ImportError:
|
||||
|
@ -67,7 +68,7 @@ class Group(object):
|
|||
self.case_list = [case]
|
||||
self.filters = dict(zip(self.SORT_KEYS, [self._get_case_attr(case, x) for x in self.SORT_KEYS]))
|
||||
# we use ci_job_match_keys to match CI job tags. It's a set of required tags.
|
||||
self.ci_job_match_keys = set([self._get_case_attr(case, x) for x in self.CI_JOB_MATCH_KEYS])
|
||||
self.ci_job_match_keys = self._get_match_keys(case)
|
||||
|
||||
@staticmethod
|
||||
def _get_case_attr(case, attr):
|
||||
|
@ -75,6 +76,16 @@ class Group(object):
|
|||
# this method will do get attribute form cases
|
||||
return case.case_info[attr]
|
||||
|
||||
def _get_match_keys(self, case):
|
||||
keys = []
|
||||
for attr in self.CI_JOB_MATCH_KEYS:
|
||||
val = self._get_case_attr(case, attr)
|
||||
if isinstance(val, list):
|
||||
keys.extend(val)
|
||||
else:
|
||||
keys.append(val)
|
||||
return set(keys)
|
||||
|
||||
def accept_new_case(self):
|
||||
"""
|
||||
check if allowed to add any case to this group
|
||||
|
@ -143,6 +154,7 @@ class AssignTest(object):
|
|||
DEFAULT_FILTER = {
|
||||
"category": "function",
|
||||
"ignore": False,
|
||||
"supported_in_ci": True,
|
||||
}
|
||||
|
||||
def __init__(self, test_case_path, ci_config_file, case_group=Group):
|
||||
|
@ -234,12 +246,11 @@ class AssignTest(object):
|
|||
|
||||
:return: filter for search test cases
|
||||
"""
|
||||
bot_filter = os.getenv("BOT_CASE_FILTER")
|
||||
if bot_filter:
|
||||
bot_filter = json.loads(bot_filter)
|
||||
else:
|
||||
bot_filter = dict()
|
||||
return bot_filter
|
||||
res = dict()
|
||||
for bot_filter in [os.getenv('BOT_CASE_FILTER'), os.getenv('BOT_TARGET_FILTER')]:
|
||||
if bot_filter:
|
||||
res.update(json.loads(bot_filter))
|
||||
return res
|
||||
|
||||
def _apply_bot_test_count(self):
|
||||
"""
|
||||
|
|
|
@ -45,6 +45,7 @@ Template Config File::
|
|||
import importlib
|
||||
|
||||
import yaml
|
||||
|
||||
try:
|
||||
from yaml import CLoader as Loader
|
||||
except ImportError:
|
||||
|
@ -127,7 +128,6 @@ def filter_test_cases(test_methods, case_filter):
|
|||
* user case filter is ``chip: ["esp32", "esp32c"]``, case attribute is ``chip: "esp32"``
|
||||
* user case filter is ``chip: "esp32"``, case attribute is ``chip: "esp32"``
|
||||
|
||||
|
||||
:param test_methods: a list of test methods functions
|
||||
:param case_filter: case filter
|
||||
:return: filtered test methods
|
||||
|
@ -190,8 +190,29 @@ class Parser(object):
|
|||
_overwrite = cls.handle_overwrite_args(_config.pop("overwrite", dict()))
|
||||
_extra_data = _config.pop("extra_data", None)
|
||||
_filter.update(_config)
|
||||
|
||||
# Try get target from yml
|
||||
try:
|
||||
_target = _filter['target']
|
||||
except KeyError:
|
||||
_target = None
|
||||
else:
|
||||
_overwrite.update({'target': _target})
|
||||
|
||||
for test_method in test_methods:
|
||||
if _filter_one_case(test_method, _filter):
|
||||
try:
|
||||
dut_dict = test_method.case_info['dut_dict']
|
||||
except (AttributeError, KeyError):
|
||||
dut_dict = None
|
||||
|
||||
if dut_dict and _target:
|
||||
dut = test_method.case_info.get('dut')
|
||||
if _target.upper() in dut_dict:
|
||||
if dut and dut in dut_dict.values(): # don't overwrite special cases
|
||||
_overwrite.update({'dut': dut_dict[_target.upper()]})
|
||||
else:
|
||||
raise ValueError('target {} is not in the specified dut_dict'.format(_target))
|
||||
test_case_list.append(TestCase.TestCase(test_method, _extra_data, **_overwrite))
|
||||
return test_case_list
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@ from . import load_source
|
|||
|
||||
class Search(object):
|
||||
TEST_CASE_FILE_PATTERN = "*_test.py"
|
||||
SUPPORT_REPLICATE_CASES_KEY = ['target']
|
||||
|
||||
@classmethod
|
||||
def _search_cases_from_file(cls, file_name):
|
||||
|
@ -42,9 +43,14 @@ class Search(object):
|
|||
continue
|
||||
except ImportError as e:
|
||||
print("ImportError: \r\n\tFile:" + file_name + "\r\n\tError:" + str(e))
|
||||
for i, test_function in enumerate(test_functions):
|
||||
|
||||
test_functions_out = []
|
||||
for case in test_functions:
|
||||
test_functions_out += cls.replicate_case(case)
|
||||
|
||||
for i, test_function in enumerate(test_functions_out):
|
||||
print("\t{}. ".format(i + 1) + test_function.case_info["name"])
|
||||
return test_functions
|
||||
return test_functions_out
|
||||
|
||||
@classmethod
|
||||
def _search_test_case_files(cls, test_case, file_pattern):
|
||||
|
@ -74,21 +80,41 @@ class Search(object):
|
|||
"""
|
||||
replicate_config = []
|
||||
for key in case.case_info:
|
||||
if key == 'ci_target': # ci_target is used to filter target, should not be duplicated.
|
||||
continue
|
||||
if isinstance(case.case_info[key], (list, tuple)):
|
||||
replicate_config.append(key)
|
||||
|
||||
def _replicate_for_key(case_list, replicate_key, replicate_list):
|
||||
def _replicate_for_key(cases, replicate_key, replicate_list):
|
||||
def deepcopy_func(f, name=None):
|
||||
fn = types.FunctionType(f.__code__, f.__globals__, name if name else f.__name__,
|
||||
f.__defaults__, f.__closure__)
|
||||
fn.__dict__.update(copy.deepcopy(f.__dict__))
|
||||
return fn
|
||||
|
||||
case_out = []
|
||||
for _case in case_list:
|
||||
for inner_case in cases:
|
||||
for value in replicate_list:
|
||||
new_case = copy.deepcopy(_case)
|
||||
new_case = deepcopy_func(inner_case)
|
||||
new_case.case_info[replicate_key] = value
|
||||
case_out.append(new_case)
|
||||
return case_out
|
||||
|
||||
replicated_cases = [case]
|
||||
for key in replicate_config:
|
||||
replicated_cases = _replicate_for_key(replicated_cases, key, case.case_info[key])
|
||||
while replicate_config:
|
||||
if not replicate_config:
|
||||
break
|
||||
key = replicate_config.pop()
|
||||
if key in cls.SUPPORT_REPLICATE_CASES_KEY:
|
||||
replicated_cases = _replicate_for_key(replicated_cases, key, case.case_info[key])
|
||||
|
||||
# mark the cases with targets not in ci_target
|
||||
for case in replicated_cases:
|
||||
ci_target = case.case_info['ci_target']
|
||||
if not ci_target or case.case_info['target'] in ci_target:
|
||||
case.case_info['supported_in_ci'] = True
|
||||
else:
|
||||
case.case_info['supported_in_ci'] = False
|
||||
|
||||
return replicated_cases
|
||||
|
||||
|
@ -104,8 +130,4 @@ class Search(object):
|
|||
test_cases = []
|
||||
for test_case_file in test_case_files:
|
||||
test_cases += cls._search_cases_from_file(test_case_file)
|
||||
# handle replicate cases
|
||||
test_case_out = []
|
||||
for case in test_cases:
|
||||
test_case_out += cls.replicate_case(case)
|
||||
return test_case_out
|
||||
return test_cases
|
||||
|
|
|
@ -29,7 +29,7 @@ IDF_PATH_FROM_ENV = os.getenv("IDF_PATH")
|
|||
|
||||
|
||||
class ExampleGroup(CIAssignTest.Group):
|
||||
SORT_KEYS = CI_JOB_MATCH_KEYS = ["env_tag", "chip"]
|
||||
SORT_KEYS = CI_JOB_MATCH_KEYS = ["env_tag", "target"]
|
||||
BUILD_LOCAL_DIR = "build_examples"
|
||||
BUILD_JOB_NAMES = ["build_examples_cmake_esp32", "build_examples_cmake_esp32s2"]
|
||||
|
||||
|
@ -61,7 +61,7 @@ def create_artifact_index_file(project_id=None, pipeline_id=None, case_group=Exa
|
|||
artifact_index_list = []
|
||||
|
||||
def format_build_log_path():
|
||||
parallel = job_info["parallel_num"] # Could be None if "parallel_num" not defined for the job
|
||||
parallel = job_info["parallel_num"] # Could be None if "parallel_num" not defined for the job
|
||||
return "{}/list_job_{}.json".format(case_group.BUILD_LOCAL_DIR, parallel or 1)
|
||||
|
||||
for build_job_name in case_group.BUILD_JOB_NAMES:
|
||||
|
@ -99,7 +99,7 @@ if __name__ == '__main__':
|
|||
help="file name pattern used to find Python test case files")
|
||||
parser.add_argument('--custom-group',
|
||||
help='select custom-group for the test cases, if other than ExampleTest',
|
||||
choices=['example','test-apps'], default='example')
|
||||
choices=['example', 'test-apps'], default='example')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
|
|
|
@ -11,6 +11,9 @@
|
|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import functools
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
|
||||
|
@ -19,19 +22,100 @@ from .IDFApp import IDFApp, Example, LoadableElfTestApp, UT, TestApp # noqa: ex
|
|||
from .IDFDUT import IDFDUT, ESP32DUT, ESP32S2DUT, ESP8266DUT, ESP32QEMUDUT # noqa: export DUTs for users
|
||||
from .DebugUtils import OCDProcess, GDBProcess, TelnetProcess, CustomProcess # noqa: export DebugUtils for users
|
||||
|
||||
|
||||
def format_case_id(chip, case_name):
|
||||
return "{}.{}".format(chip, case_name)
|
||||
# pass TARGET_DUT_CLS_DICT to Env.py to avoid circular dependency issue.
|
||||
TARGET_DUT_CLS_DICT = {
|
||||
'ESP32': ESP32DUT,
|
||||
'ESP32S2': ESP32S2DUT,
|
||||
}
|
||||
|
||||
|
||||
def idf_example_test(app=Example, dut=IDFDUT, chip="ESP32", module="examples", execution_time=1,
|
||||
def format_case_id(target, case_name):
|
||||
return "{}.{}".format(target, case_name)
|
||||
|
||||
|
||||
try:
|
||||
string_type = basestring
|
||||
except NameError:
|
||||
string_type = str
|
||||
|
||||
|
||||
def upper_list_or_str(text):
|
||||
"""
|
||||
Return the uppercase of list of string or string. Return itself for other
|
||||
data types
|
||||
:param text: list or string, other instance will be returned immediately
|
||||
:return: uppercase of list of string
|
||||
"""
|
||||
if isinstance(text, string_type):
|
||||
return [text.upper()]
|
||||
elif isinstance(text, list):
|
||||
return [item.upper() for item in text]
|
||||
else:
|
||||
return text
|
||||
|
||||
|
||||
def local_test_check(decorator_target):
|
||||
# Try to get the sdkconfig.json to read the IDF_TARGET value.
|
||||
# If not set, will set to ESP32.
|
||||
# For CI jobs, this is a fake procedure, the true target and dut will be
|
||||
# overwritten by the job config YAML file.
|
||||
idf_target = 'ESP32' # default if sdkconfig not found or not readable
|
||||
if os.getenv('CI_JOB_ID'): # Only auto-detect target when running locally
|
||||
return idf_target
|
||||
|
||||
expected_json_path = os.path.join('build', 'config', 'sdkconfig.json')
|
||||
if os.path.exists(expected_json_path):
|
||||
sdkconfig = json.load(open(expected_json_path))
|
||||
try:
|
||||
idf_target = sdkconfig['IDF_TARGET'].upper()
|
||||
except KeyError:
|
||||
logging.warning('IDF_TARGET not in {}. IDF_TARGET set to esp32'.format(os.path.abspath(expected_json_path)))
|
||||
else:
|
||||
logging.info('IDF_TARGET: {}'.format(idf_target))
|
||||
else:
|
||||
logging.warning('{} not found. IDF_TARGET set to esp32'.format(os.path.abspath(expected_json_path)))
|
||||
|
||||
if isinstance(decorator_target, list):
|
||||
if idf_target not in decorator_target:
|
||||
raise ValueError('IDF_TARGET set to {}, not in decorator target value'.format(idf_target))
|
||||
else:
|
||||
if idf_target != decorator_target:
|
||||
raise ValueError('IDF_TARGET set to {}, not equal to decorator target value'.format(idf_target))
|
||||
return idf_target
|
||||
|
||||
|
||||
def get_dut_class(target, erase_nvs=None):
|
||||
if target not in TARGET_DUT_CLS_DICT:
|
||||
raise Exception('target can only be {%s} (case insensitive)' % ', '.join(TARGET_DUT_CLS_DICT.keys()))
|
||||
|
||||
dut = TARGET_DUT_CLS_DICT[target.upper()]
|
||||
if erase_nvs:
|
||||
dut.ERASE_NVS = 'erase_nvs'
|
||||
return dut
|
||||
|
||||
|
||||
def ci_target_check(func):
|
||||
@functools.wraps(func)
|
||||
def wrapper(**kwargs):
|
||||
target = upper_list_or_str(kwargs.get('target', []))
|
||||
ci_target = upper_list_or_str(kwargs.get('ci_target', []))
|
||||
if not set(ci_target).issubset(set(target)):
|
||||
raise ValueError('ci_target must be a subset of target')
|
||||
|
||||
return func(**kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
@ci_target_check
|
||||
def idf_example_test(app=Example, target="ESP32", ci_target=None, module="examples", execution_time=1,
|
||||
level="example", erase_nvs=True, config_name=None, **kwargs):
|
||||
"""
|
||||
decorator for testing idf examples (with default values for some keyword args).
|
||||
|
||||
:param app: test application class
|
||||
:param dut: dut class
|
||||
:param chip: chip supported, string or tuple
|
||||
:param target: target supported, string or list
|
||||
:param ci_target: target auto run in CI, if None than all target will be tested, None, string or list
|
||||
:param module: module, string
|
||||
:param execution_time: execution time in minutes, int
|
||||
:param level: test level, could be used to filter test cases, string
|
||||
|
@ -40,31 +124,31 @@ def idf_example_test(app=Example, dut=IDFDUT, chip="ESP32", module="examples", e
|
|||
:param kwargs: other keyword args
|
||||
:return: test method
|
||||
"""
|
||||
try:
|
||||
# try to config the default behavior of erase nvs
|
||||
dut.ERASE_NVS = erase_nvs
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
original_method = TinyFW.test_method(app=app, dut=dut, chip=chip, module=module,
|
||||
execution_time=execution_time, level=level, **kwargs)
|
||||
|
||||
def test(func):
|
||||
test_target = local_test_check(target)
|
||||
dut = get_dut_class(test_target, erase_nvs)
|
||||
original_method = TinyFW.test_method(
|
||||
app=app, dut=dut, target=upper_list_or_str(target), ci_target=upper_list_or_str(ci_target),
|
||||
module=module, execution_time=execution_time, level=level, erase_nvs=erase_nvs,
|
||||
dut_dict=TARGET_DUT_CLS_DICT, **kwargs
|
||||
)
|
||||
test_func = original_method(func)
|
||||
test_func.case_info["ID"] = format_case_id(chip, test_func.case_info["name"])
|
||||
test_func.case_info["ID"] = format_case_id(target, test_func.case_info["name"])
|
||||
return test_func
|
||||
|
||||
return test
|
||||
|
||||
|
||||
def idf_unit_test(app=UT, dut=IDFDUT, chip="ESP32", module="unit-test", execution_time=1,
|
||||
@ci_target_check
|
||||
def idf_unit_test(app=UT, target="ESP32", ci_target=None, module="unit-test", execution_time=1,
|
||||
level="unit", erase_nvs=True, **kwargs):
|
||||
"""
|
||||
decorator for testing idf unit tests (with default values for some keyword args).
|
||||
|
||||
:param app: test application class
|
||||
:param dut: dut class
|
||||
:param chip: chip supported, string or tuple
|
||||
:param target: target supported, string or list
|
||||
:param ci_target: target auto run in CI, if None than all target will be tested, None, string or list
|
||||
:param module: module, string
|
||||
:param execution_time: execution time in minutes, int
|
||||
:param level: test level, could be used to filter test cases, string
|
||||
|
@ -72,32 +156,31 @@ def idf_unit_test(app=UT, dut=IDFDUT, chip="ESP32", module="unit-test", executio
|
|||
:param kwargs: other keyword args
|
||||
:return: test method
|
||||
"""
|
||||
try:
|
||||
# try to config the default behavior of erase nvs
|
||||
dut.ERASE_NVS = erase_nvs
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
original_method = TinyFW.test_method(app=app, dut=dut, chip=chip, module=module,
|
||||
execution_time=execution_time, level=level, **kwargs)
|
||||
|
||||
def test(func):
|
||||
test_target = local_test_check(target)
|
||||
dut = get_dut_class(test_target, erase_nvs)
|
||||
original_method = TinyFW.test_method(
|
||||
app=app, dut=dut, target=upper_list_or_str(target), ci_target=upper_list_or_str(ci_target),
|
||||
module=module, execution_time=execution_time, level=level, erase_nvs=erase_nvs,
|
||||
dut_dict=TARGET_DUT_CLS_DICT, **kwargs
|
||||
)
|
||||
test_func = original_method(func)
|
||||
test_func.case_info["ID"] = format_case_id(chip, test_func.case_info["name"])
|
||||
test_func.case_info["ID"] = format_case_id(target, test_func.case_info["name"])
|
||||
return test_func
|
||||
|
||||
return test
|
||||
|
||||
|
||||
def idf_custom_test(app=TestApp, dut=IDFDUT, chip="ESP32", module="misc", execution_time=1,
|
||||
@ci_target_check
|
||||
def idf_custom_test(app=TestApp, target="ESP32", ci_target=None, module="misc", execution_time=1,
|
||||
level="integration", erase_nvs=True, config_name=None, group="test-apps", **kwargs):
|
||||
|
||||
"""
|
||||
decorator for idf custom tests (with default values for some keyword args).
|
||||
|
||||
:param app: test application class
|
||||
:param dut: dut class
|
||||
:param chip: chip supported, string or tuple
|
||||
:param target: target supported, string or list
|
||||
:param ci_target: target auto run in CI, if None than all target will be tested, None, string or list
|
||||
:param module: module, string
|
||||
:param execution_time: execution time in minutes, int
|
||||
:param level: test level, could be used to filter test cases, string
|
||||
|
@ -107,18 +190,20 @@ def idf_custom_test(app=TestApp, dut=IDFDUT, chip="ESP32", module="misc", execut
|
|||
:param kwargs: other keyword args
|
||||
:return: test method
|
||||
"""
|
||||
try:
|
||||
# try to config the default behavior of erase nvs
|
||||
dut.ERASE_NVS = erase_nvs
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
original_method = TinyFW.test_method(app=app, dut=dut, chip=chip, module=module,
|
||||
execution_time=execution_time, level=level, **kwargs)
|
||||
|
||||
def test(func):
|
||||
test_target = local_test_check(target)
|
||||
dut = get_dut_class(test_target, erase_nvs)
|
||||
if 'dut' in kwargs: # panic_test() will inject dut, resolve conflicts here
|
||||
dut = kwargs['dut']
|
||||
del kwargs['dut']
|
||||
original_method = TinyFW.test_method(
|
||||
app=app, dut=dut, target=upper_list_or_str(target), ci_target=upper_list_or_str(ci_target),
|
||||
module=module, execution_time=execution_time, level=level, erase_nvs=erase_nvs,
|
||||
dut_dict=TARGET_DUT_CLS_DICT, **kwargs
|
||||
)
|
||||
test_func = original_method(func)
|
||||
test_func.case_info["ID"] = format_case_id(chip, test_func.case_info["name"])
|
||||
test_func.case_info["ID"] = format_case_id(target, test_func.case_info["name"])
|
||||
return test_func
|
||||
|
||||
return test
|
||||
|
|
|
@ -26,7 +26,6 @@ import threading
|
|||
from tiny_test_fw import TinyFW, Utility, Env, DUT
|
||||
import ttfw_idf
|
||||
|
||||
|
||||
UT_APP_BOOT_UP_DONE = "Press ENTER to see the list of tests."
|
||||
|
||||
# matches e.g.: "rst:0xc (SW_CPU_RESET),boot:0x13 (SPI_FAST_FLASH_BOOT)"
|
||||
|
@ -188,12 +187,11 @@ def reset_dut(dut):
|
|||
|
||||
|
||||
def log_test_case(description, test_case, ut_config):
|
||||
Utility.console_log("Running {} '{}' (config {})".format(description, test_case["name"], ut_config), color="orange")
|
||||
Utility.console_log("Tags: %s" % ", ".join("%s=%s" % (k,v) for (k,v) in test_case.items() if k != "name" and v is not None), color="orange")
|
||||
Utility.console_log("Running {} '{}' (config {})".format(description, test_case["name"], ut_config), color="orange")
|
||||
Utility.console_log("Tags: %s" % ", ".join("%s=%s" % (k, v) for (k, v) in test_case.items() if k != "name" and v is not None), color="orange")
|
||||
|
||||
|
||||
def run_one_normal_case(dut, one_case, junit_test_case):
|
||||
|
||||
reset_dut(dut)
|
||||
|
||||
dut.start_capture_raw_data()
|
||||
|
@ -330,7 +328,6 @@ def run_unit_test_cases(env, extra_data):
|
|||
|
||||
|
||||
class Handler(threading.Thread):
|
||||
|
||||
WAIT_SIGNAL_PATTERN = re.compile(r'Waiting for signal: \[(.+)]!')
|
||||
SEND_SIGNAL_PATTERN = re.compile(r'Send signal: \[([^]]+)](\[([^]]+)])?!')
|
||||
FINISH_PATTERN = re.compile(r"1 Tests (\d) Failures (\d) Ignored")
|
||||
|
@ -691,7 +688,6 @@ def run_multiple_stage_cases(env, extra_data):
|
|||
|
||||
|
||||
def detect_update_unit_test_info(env, extra_data, app_bin):
|
||||
|
||||
case_config = format_test_case_config(extra_data)
|
||||
|
||||
for ut_config in case_config:
|
||||
|
@ -785,7 +781,7 @@ if __name__ == '__main__':
|
|||
if len(test_item) == 0:
|
||||
continue
|
||||
pair = test_item.split(r':', 1)
|
||||
if len(pair) == 1 or pair[0] is 'name':
|
||||
if len(pair) == 1 or pair[0] == 'name':
|
||||
test_dict['name'] = pair[0]
|
||||
elif len(pair) == 2:
|
||||
if pair[0] == 'timeout' or pair[0] == 'child case num':
|
||||
|
|
Ładowanie…
Reference in New Issue