openthread: add ci for esp32c6

pull/10691/head
Xu Si Yu 2023-01-11 17:11:32 +08:00
rodzic 57c4508eb3
commit 389d07b9c1
7 zmienionych plików z 169 dodań i 173 usunięć

Wyświetl plik

@ -176,6 +176,7 @@ build:integration_test:
included_in: included_in:
- "build:example_test-esp32h4" - "build:example_test-esp32h4"
- "build:example_test-esp32s3" - "build:example_test-esp32s3"
- "build:example_test-esp32c6"
- "build:example_test" - "build:example_test"
- build:target_test - build:target_test

Wyświetl plik

@ -797,10 +797,11 @@ example_test_pytest_openthread_br:
- .rules:test:example_test-i154 - .rules:test:example_test-i154
needs: needs:
- build_pytest_examples_esp32s3 - build_pytest_examples_esp32s3
- build_pytest_examples_esp32c6
- build_pytest_examples_esp32h4 - build_pytest_examples_esp32h4
tags: tags:
- esp32h4 - esp32h4
- i154_multi_dut - openthread_br
component_ut_pytest_esp32s3_usb_host: component_ut_pytest_esp32s3_usb_host:
extends: extends:

Wyświetl plik

@ -121,7 +121,7 @@ ENV_MARKERS = {
'psramv0': 'Runner with PSRAM version 0', 'psramv0': 'Runner with PSRAM version 0',
# multi-dut markers # multi-dut markers
'ieee802154': 'ieee802154 related tests should run on ieee802154 runners.', 'ieee802154': 'ieee802154 related tests should run on ieee802154 runners.',
'i154_multi_dut': 'tests should be used for i154, such as openthread.', 'openthread_br': 'tests should be used for openthread border router.',
'wifi_two_dut': 'tests should be run on runners which has two wifi duts connected.', 'wifi_two_dut': 'tests should be run on runners which has two wifi duts connected.',
'generic_multi_device': 'generic multiple devices whose corresponding gpio pins are connected to each other.', 'generic_multi_device': 'generic multiple devices whose corresponding gpio pins are connected to each other.',
'twai_network': 'multiple runners form a TWAI network.', 'twai_network': 'multiple runners form a TWAI network.',

Wyświetl plik

@ -13,10 +13,6 @@ examples/openthread/ot_br:
examples/openthread/ot_cli: examples/openthread/ot_cli:
enable: enable:
- if: IDF_TARGET in ["esp32h4", "esp32c6"] - if: IDF_TARGET in ["esp32h4", "esp32c6"]
disable_test:
- if: IDF_TARGET == "esp32c6"
temporary: true
reason: only test on esp32h4
examples/openthread/ot_rcp: examples/openthread/ot_rcp:
enable: enable:

Wyświetl plik

@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD # SPDX-FileCopyrightText: 2022-2023 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Unlicense OR CC0-1.0 # SPDX-License-Identifier: Unlicense OR CC0-1.0
# !/usr/bin/env python3 # !/usr/bin/env python3
# this file defines some functions for testing cli and br under pytest framework # this file defines some functions for testing cli and br under pytest framework
@ -8,13 +8,103 @@ import socket
import struct import struct
import subprocess import subprocess
import time import time
from typing import Tuple, Union from typing import Tuple
import netifaces import netifaces
import pexpect import pexpect
from pytest_embedded_idf.dut import IdfDut from pytest_embedded_idf.dut import IdfDut
class thread_parameter:
def __init__(self, deviceRole:str='', dataset:str='', channel:str='', exaddr:str='', bbr:bool=False):
self.deviceRole = deviceRole
self.dataset = dataset
self.channel = channel
self.exaddr = exaddr
self.bbr = bbr
class wifi_parameter:
def __init__(self, ssid:str='', psk:str='', retry_times:int=10):
self.ssid = ssid
self.psk = psk
self.retry_times = retry_times
def joinThreadNetwork(dut:IdfDut, thread:thread_parameter) -> None:
if thread.dataset != '':
command = 'dataset set active ' + thread.dataset
dut.write(command)
dut.expect('Done', timeout=5)
else:
dut.write('dataset init new')
dut.expect('Done', timeout=5)
dut.write('dataset commit active')
dut.expect('Done', timeout=5)
if thread.channel != '':
command = 'channel ' + thread.channel
dut.write(command)
dut.expect('Done', timeout=5)
if thread.exaddr != '':
command = 'extaddr ' + thread.exaddr
dut.write(command)
dut.expect('Done', timeout=5)
if thread.bbr:
dut.write('bbr enable')
dut.expect('Done', timeout=5)
dut.write('ifconfig up')
dut.expect('Done', timeout=5)
dut.write('thread start')
dut.expect('Role detached ->', timeout=20)
if thread.deviceRole == 'leader':
assert getDeviceRole(dut) == 'leader'
elif thread.deviceRole == 'router':
if getDeviceRole(dut) != 'router':
changeDeviceRole(dut, 'router')
wait(dut, 10)
assert getDeviceRole(dut) == 'router'
else:
assert False
def joinWiFiNetwork(dut:IdfDut, wifi:wifi_parameter) -> Tuple[str, int]:
clean_buffer(dut)
ip_address = ''
information = ''
for order in range(1, wifi.retry_times):
dut.write('wifi connect -s ' + str(wifi.ssid) + ' -p ' + str(wifi.psk))
tmp = dut.expect(pexpect.TIMEOUT, timeout=10)
if 'sta ip' in str(tmp):
ip_address = re.findall(r'sta ip: (\w+.\w+.\w+.\w+),', str(tmp))[0]
information = dut.expect(r'wifi sta (\w+ \w+ \w+)\W', timeout=20)[1].decode()
if information == 'is connected successfully':
break
assert information == 'is connected successfully'
return ip_address, order
def getDeviceRole(dut:IdfDut) -> str:
clean_buffer(dut)
dut.write('state')
role = dut.expect(r'state\W+(\w+)\W+Done', timeout=5)[1].decode()
print(role)
return str(role)
def changeDeviceRole(dut:IdfDut, role:str) -> None:
command = 'state ' + role
dut.write(command)
def getDataset(dut:IdfDut) -> str:
clean_buffer(dut)
dut.write('dataset active -x')
dut_data = dut.expect(r'\n(\w{212})\r', timeout=5)[1].decode()
return str(dut_data)
def reset_thread(dut:IdfDut) -> None: def reset_thread(dut:IdfDut) -> None:
dut.write(' ') dut.write(' ')
dut.write('state') dut.write('state')
@ -26,28 +116,6 @@ def reset_thread(dut:IdfDut) -> None:
dut.write('state') dut.write('state')
# config thread
def config_thread(dut:IdfDut, model:str, dataset:str='0') -> Union[str, None]:
if model == 'random':
dut.write('dataset init new')
dut.expect('Done', timeout=5)
dut.write('dataset commit active')
dut.expect('Done', timeout=5)
dut.write('ifconfig up')
dut.expect('Done', timeout=5)
dut.write('dataset active -x') # get dataset
dut_data = dut.expect(r'\n(\w{212})\r', timeout=5)[1].decode()
return str(dut_data)
if model == 'appointed':
tmp = 'dataset set active ' + str(dataset)
dut.write(tmp)
dut.expect('Done', timeout=5)
dut.write('ifconfig up')
dut.expect('Done', timeout=5)
return None
return None
# get the mleid address of the thread # get the mleid address of the thread
def get_mleid_addr(dut:IdfDut) -> str: def get_mleid_addr(dut:IdfDut) -> str:
dut_adress = '' dut_adress = ''
@ -87,69 +155,6 @@ def get_global_unicast_addr(dut:IdfDut, br:IdfDut) -> str:
return dut_adress return dut_adress
# start thread
def start_thread(dut:IdfDut) -> str:
role = ''
dut.write('thread start')
tmp = dut.expect(r'Role detached -> (\w+)\W', timeout=20)[0]
role = re.findall(r'Role detached -> (\w+)\W', str(tmp))[0]
return role
def wait_key_str(leader:IdfDut, child:IdfDut) -> None:
wait(leader, 1)
leader.expect('OpenThread attached to netif', timeout=20)
leader.write(' ')
leader.write('state')
child.expect('OpenThread attached to netif', timeout=20)
child.write(' ')
child.write('state')
def config_network(leader:IdfDut, child:IdfDut, leader_name:str, thread_dataset_model:str,
thread_dataset:str, wifi:IdfDut, wifi_ssid:str, wifi_psk:str) -> str:
wait_key_str(leader, child)
return form_network_using_manual_configuration(leader, child, leader_name, thread_dataset_model,
thread_dataset, wifi, wifi_ssid, wifi_psk)
# config br and cli manually
def form_network_using_manual_configuration(leader:IdfDut, child:IdfDut, leader_name:str, thread_dataset_model:str,
thread_dataset:str, wifi:IdfDut, wifi_ssid:str, wifi_psk:str) -> str:
reset_thread(leader)
clean_buffer(leader)
reset_thread(child)
clean_buffer(child)
leader.write('channel 12')
leader.expect('Done', timeout=5)
child.write('channel 12')
child.expect('Done', timeout=5)
res = '0000'
if wifi_psk != '0000':
res = connect_wifi(wifi, wifi_ssid, wifi_psk, 10)[0]
leader_data = ''
if thread_dataset_model == 'random':
leader_data = str(config_thread(leader, 'random'))
else:
config_thread(leader, 'appointed', thread_dataset)
if leader_name == 'br':
leader.write('bbr enable')
leader.expect('Done', timeout=5)
role = start_thread(leader)
assert role == 'leader'
if thread_dataset_model == 'random':
config_thread(child, 'appointed', leader_data)
else:
config_thread(child, 'appointed', thread_dataset)
if leader_name != 'br':
child.write('bbr enable')
child.expect('Done', timeout=5)
role = start_thread(child)
assert role == 'child'
wait(leader, 10)
return res
# ping of thread # ping of thread
def ot_ping(dut:IdfDut, target:str, times:int) -> Tuple[int, int]: def ot_ping(dut:IdfDut, target:str, times:int) -> Tuple[int, int]:
command = 'ping ' + str(target) + ' 0 ' + str(times) command = 'ping ' + str(target) + ' 0 ' + str(times)
@ -161,23 +166,6 @@ def ot_ping(dut:IdfDut, target:str, times:int) -> Tuple[int, int]:
return tx_count, rx_count return tx_count, rx_count
# connect Wi-Fi
def connect_wifi(dut:IdfDut, ssid:str, psk:str, nums:int) -> Tuple[str, int]:
clean_buffer(dut)
ip_address = ''
information = ''
for order in range(1, nums):
dut.write('wifi connect -s ' + str(ssid) + ' -p ' + str(psk))
tmp = dut.expect(pexpect.TIMEOUT, timeout=10)
if 'sta ip' in str(tmp):
ip_address = re.findall(r'sta ip: (\w+.\w+.\w+.\w+),', str(tmp))[0]
information = dut.expect(r'wifi sta (\w+ \w+ \w+)\W', timeout=20)[1].decode()
if information == 'is connected successfully':
break
assert information == 'is connected successfully'
return ip_address, order
def reset_host_interface() -> None: def reset_host_interface() -> None:
interface_name = get_host_interface_name() interface_name = get_host_interface_name()
flag = False flag = False

Wyświetl plik

@ -0,0 +1,2 @@
CONFIG_IDF_TARGET="esp32c6"
CONFIG_IDF_TARGET_ESP32C6=y

Wyświetl plik

@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD # SPDX-FileCopyrightText: 2022-2023 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Unlicense OR CC0-1.0 # SPDX-License-Identifier: Unlicense OR CC0-1.0
# !/usr/bin/env python3 # !/usr/bin/env python3
@ -18,7 +18,7 @@ from pytest_embedded_idf.dut import IdfDut
# This file contains the test scripts for Thread: # This file contains the test scripts for Thread:
# Case 1: Thread network formation and attaching # Case 1: Thread network formation and attaching
# A Thread Border Router forms a Thread network, a Thread device attaches to it, then test ping connection between them. # A Thread Border Router forms a Thread network, Thread devices attache to it, then test ping connection between them.
# Case 2: Bidirectional IPv6 connectivity # Case 2: Bidirectional IPv6 connectivity
# Test IPv6 ping connection between Thread device and Linux Host (via Thread Border Router). # Test IPv6 ping connection between Thread device and Linux Host (via Thread Border Router).
@ -63,52 +63,85 @@ def fixture_Init_interface() -> bool:
return True return True
wifi_ssid = 'OTCITE' default_br_ot_para = ocf.thread_parameter('leader', '', '12', '7766554433221100', True)
wifi_psk = 'otcitest888' default_br_wifi_para = ocf.wifi_parameter('OTCITE', 'otcitest888', 10)
default_cli_ot_para = ocf.thread_parameter('router', '', '12', '', False)
# Case 1: Thread network formation and attaching # Case 1: Thread network formation and attaching
@pytest.mark.esp32s3 @pytest.mark.esp32s3
@pytest.mark.esp32h4 @pytest.mark.esp32h4
@pytest.mark.i154_multi_dut @pytest.mark.esp32c6
@pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1) @pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.parametrize( @pytest.mark.parametrize(
'port, config, count, app_path, beta_target, target', [ 'port, config, count, app_path, beta_target, target', [
('/dev/USB_RCP|/dev/USB_CLI|/dev/USB_BR', 'rcp|cli|br', 3, ('/dev/USB_RCP|/dev/USB_CLI|/dev/USB_BR|/dev/USB_CLI_C6', 'rcp|cli|br|cli_c6', 4,
f'{os.path.join(os.path.dirname(__file__), "ot_rcp")}' f'{os.path.join(os.path.dirname(__file__), "ot_rcp")}'
f'|{os.path.join(os.path.dirname(__file__), "ot_cli")}' f'|{os.path.join(os.path.dirname(__file__), "ot_cli")}'
f'|{os.path.join(os.path.dirname(__file__), "ot_br")}', f'|{os.path.join(os.path.dirname(__file__), "ot_br")}'
'esp32h2beta2|esp32h2beta2|esp32s3', 'esp32h4|esp32h4|esp32s3'), # No need to rename beta_target as it is still called h2 in esptool f'|{os.path.join(os.path.dirname(__file__), "ot_cli")}',
'esp32h2beta2|esp32h2beta2|esp32s3|esp32c6', 'esp32h4|esp32h4|esp32s3|esp32c6'),
], ],
indirect=True, indirect=True,
) )
def test_thread_connect(dut:Tuple[IdfDut, IdfDut, IdfDut]) -> None: def test_thread_connect(dut:Tuple[IdfDut, IdfDut, IdfDut, IdfDut]) -> None:
br = dut[2] br = dut[2]
cli = dut[1] cli_h2 = dut[1]
cli_c6 = dut[3]
dut[0].serial.stop_redirect_thread() dut[0].serial.stop_redirect_thread()
cli_list = [cli_h2, cli_c6]
router_extaddr_list = ['7766554433221101', '7766554433221102']
dataset = '-1' ocf.reset_thread(br)
ocf.config_network(br, cli, 'br', 'random', dataset, br, wifi_ssid, '0000') for cli in cli_list:
flag = False ocf.reset_thread(cli)
br_ot_para = default_br_ot_para
ocf.joinThreadNetwork(br, br_ot_para)
cli_ot_para = default_cli_ot_para
cli_ot_para.dataset = ocf.getDataset(br)
try: try:
cli_mleid_addr = ocf.get_mleid_addr(cli) order = 0
br_mleid_addr = ocf.get_mleid_addr(br) for cli in cli_list:
rx_nums = ocf.ot_ping(cli, br_mleid_addr, 5)[1] cli_ot_para.exaddr = router_extaddr_list[order]
assert rx_nums != 0 order = order + 1
rx_nums = ocf.ot_ping(br, cli_mleid_addr, 5)[1] ocf.joinThreadNetwork(cli, cli_ot_para)
assert rx_nums != 0 for cli in cli_list:
flag = True cli_mleid_addr = ocf.get_mleid_addr(cli)
br_mleid_addr = ocf.get_mleid_addr(br)
rx_nums = ocf.ot_ping(cli, br_mleid_addr, 5)[1]
assert rx_nums == 5
rx_nums = ocf.ot_ping(br, cli_mleid_addr, 5)[1]
assert rx_nums == 5
finally: finally:
br.write('factoryreset') br.write('factoryreset')
cli.write('factoryreset') for cli in cli_list:
cli.write('factoryreset')
time.sleep(3) time.sleep(3)
assert flag
# Form a Wi-Fi/Thread network with a Wi-Fi host, a border router and a Thread end device
# Topology:
# Border_Router
# / \
# / \
# Wi-FI_Host Thread_End_Device
def formBasicWiFiThreadNetwork(br:IdfDut, cli:IdfDut) -> None:
ocf.reset_thread(br)
ocf.reset_thread(cli)
ocf.joinWiFiNetwork(br, default_br_wifi_para)
ocf.joinThreadNetwork(br, default_br_ot_para)
ot_para = default_cli_ot_para
ot_para.dataset = ocf.getDataset(br)
ot_para.exaddr = '7766554433221101'
ocf.joinThreadNetwork(cli, ot_para)
ocf.wait(cli,10)
# Case 2: Bidirectional IPv6 connectivity # Case 2: Bidirectional IPv6 connectivity
@pytest.mark.esp32s3 @pytest.mark.esp32s3
@pytest.mark.esp32h4 @pytest.mark.esp32h4
@pytest.mark.i154_multi_dut @pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1) @pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.parametrize( @pytest.mark.parametrize(
'port, config, count, app_path, beta_target, target', [ 'port, config, count, app_path, beta_target, target', [
@ -126,9 +159,7 @@ def test_Bidirectional_IPv6_connectivity(Init_interface:bool, dut: Tuple[IdfDut,
assert Init_interface assert Init_interface
dut[0].serial.stop_redirect_thread() dut[0].serial.stop_redirect_thread()
dataset = '-1' formBasicWiFiThreadNetwork(br, cli)
ocf.config_network(br, cli, 'br', 'random', dataset, br, wifi_ssid, wifi_psk)
flag = False
try: try:
assert ocf.is_joined_wifi_network(br) assert ocf.is_joined_wifi_network(br)
cli_global_unicast_addr = ocf.get_global_unicast_addr(cli, br) cli_global_unicast_addr = ocf.get_global_unicast_addr(cli, br)
@ -148,18 +179,16 @@ def test_Bidirectional_IPv6_connectivity(Init_interface:bool, dut: Tuple[IdfDut,
txrx_nums = ocf.ot_ping(cli, str(ip_addr), 5) txrx_nums = ocf.ot_ping(cli, str(ip_addr), 5)
rx_nums = rx_nums + int(txrx_nums[1]) rx_nums = rx_nums + int(txrx_nums[1])
assert rx_nums != 0 assert rx_nums != 0
flag = True
finally: finally:
br.write('factoryreset') br.write('factoryreset')
cli.write('factoryreset') cli.write('factoryreset')
time.sleep(3) time.sleep(3)
assert flag
# Case 3: Multicast forwarding from Wi-Fi to Thread network # Case 3: Multicast forwarding from Wi-Fi to Thread network
@pytest.mark.esp32s3 @pytest.mark.esp32s3
@pytest.mark.esp32h4 @pytest.mark.esp32h4
@pytest.mark.i154_multi_dut @pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1) @pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.parametrize( @pytest.mark.parametrize(
'port, config, count, app_path, beta_target, target', [ 'port, config, count, app_path, beta_target, target', [
@ -177,9 +206,7 @@ def test_multicast_forwarding_A(Init_interface:bool, dut: Tuple[IdfDut, IdfDut,
assert Init_interface assert Init_interface
dut[0].serial.stop_redirect_thread() dut[0].serial.stop_redirect_thread()
dataset = '-1' formBasicWiFiThreadNetwork(br, cli)
ocf.config_network(br, cli, 'br', 'random', dataset, br, wifi_ssid, wifi_psk)
flag = False
try: try:
assert ocf.is_joined_wifi_network(br) assert ocf.is_joined_wifi_network(br)
br.write('bbr') br.write('bbr')
@ -191,18 +218,16 @@ def test_multicast_forwarding_A(Init_interface:bool, dut: Tuple[IdfDut, IdfDut,
print('ping result:\n', str(out_str)) print('ping result:\n', str(out_str))
role = re.findall(r' (\d+)%', str(out_str))[0] role = re.findall(r' (\d+)%', str(out_str))[0]
assert role != '100' assert role != '100'
flag = True
finally: finally:
br.write('factoryreset') br.write('factoryreset')
cli.write('factoryreset') cli.write('factoryreset')
time.sleep(3) time.sleep(3)
assert flag
# Case 4: Multicast forwarding from Thread to Wi-Fi network # Case 4: Multicast forwarding from Thread to Wi-Fi network
@pytest.mark.esp32s3 @pytest.mark.esp32s3
@pytest.mark.esp32h4 @pytest.mark.esp32h4
@pytest.mark.i154_multi_dut @pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1) @pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.parametrize( @pytest.mark.parametrize(
'port, config, count, app_path, beta_target, target', [ 'port, config, count, app_path, beta_target, target', [
@ -220,8 +245,7 @@ def test_multicast_forwarding_B(Init_interface:bool, dut: Tuple[IdfDut, IdfDut,
assert Init_interface assert Init_interface
dut[0].serial.stop_redirect_thread() dut[0].serial.stop_redirect_thread()
dataset = '-1' formBasicWiFiThreadNetwork(br, cli)
ocf.config_network(br, cli, 'br', 'random', dataset, br, wifi_ssid, wifi_psk)
try: try:
assert ocf.is_joined_wifi_network(br) assert ocf.is_joined_wifi_network(br)
br.write('bbr') br.write('bbr')
@ -254,7 +278,7 @@ def test_multicast_forwarding_B(Init_interface:bool, dut: Tuple[IdfDut, IdfDut,
# Case 5: discover dervice published by Thread device # Case 5: discover dervice published by Thread device
@pytest.mark.esp32s3 @pytest.mark.esp32s3
@pytest.mark.esp32h4 @pytest.mark.esp32h4
@pytest.mark.i154_multi_dut @pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1) @pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.parametrize( @pytest.mark.parametrize(
'port, config, count, app_path, beta_target, target', [ 'port, config, count, app_path, beta_target, target', [
@ -273,10 +297,7 @@ def test_service_discovery_of_Thread_device(Init_interface:bool, Init_avahi:bool
assert Init_avahi assert Init_avahi
dut[0].serial.stop_redirect_thread() dut[0].serial.stop_redirect_thread()
dataset = '-1' formBasicWiFiThreadNetwork(br, cli)
ocf.config_network(br, cli, 'br', 'random', dataset, br, wifi_ssid, wifi_psk)
ocf.wait(cli, 3)
flag = False
try: try:
assert ocf.is_joined_wifi_network(br) assert ocf.is_joined_wifi_network(br)
command = 'avahi-browse -rt _testyyy._udp' command = 'avahi-browse -rt _testyyy._udp'
@ -303,18 +324,16 @@ def test_service_discovery_of_Thread_device(Init_interface:bool, Init_avahi:bool
out_str = subprocess.getoutput(command) out_str = subprocess.getoutput(command)
print('avahi-browse:\n', str(out_str)) print('avahi-browse:\n', str(out_str))
assert 'myTest' in str(out_str) assert 'myTest' in str(out_str)
flag = True
finally: finally:
br.write('factoryreset') br.write('factoryreset')
cli.write('factoryreset') cli.write('factoryreset')
time.sleep(3) time.sleep(3)
assert flag
# Case 6: discover dervice published by Wi-Fi device # Case 6: discover dervice published by Wi-Fi device
@pytest.mark.esp32s3 @pytest.mark.esp32s3
@pytest.mark.esp32h4 @pytest.mark.esp32h4
@pytest.mark.i154_multi_dut @pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1) @pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.parametrize( @pytest.mark.parametrize(
'port, config, count, app_path, beta_target, target', [ 'port, config, count, app_path, beta_target, target', [
@ -333,10 +352,7 @@ def test_service_discovery_of_WiFi_device(Init_interface:bool, Init_avahi:bool,
assert Init_avahi assert Init_avahi
dut[0].serial.stop_redirect_thread() dut[0].serial.stop_redirect_thread()
dataset = '-1' formBasicWiFiThreadNetwork(br, cli)
ocf.config_network(br, cli, 'br', 'random', dataset, br, wifi_ssid, wifi_psk)
ocf.wait(cli, 3)
flag = False
try: try:
assert ocf.is_joined_wifi_network(br) assert ocf.is_joined_wifi_network(br)
br_global_unicast_addr = ocf.get_global_unicast_addr(br, br) br_global_unicast_addr = ocf.get_global_unicast_addr(br, br)
@ -372,19 +388,17 @@ def test_service_discovery_of_WiFi_device(Init_interface:bool, Init_avahi:bool,
tmp = cli.expect(pexpect.TIMEOUT, timeout=5) tmp = cli.expect(pexpect.TIMEOUT, timeout=5)
assert 'response for testxxx' in str(tmp) assert 'response for testxxx' in str(tmp)
assert 'Port:12347' in str(tmp) assert 'Port:12347' in str(tmp)
flag = True
finally: finally:
ocf.host_close_service() ocf.host_close_service()
br.write('factoryreset') br.write('factoryreset')
cli.write('factoryreset') cli.write('factoryreset')
time.sleep(3) time.sleep(3)
assert flag
# Case 7: ICMP communication via NAT64 # Case 7: ICMP communication via NAT64
@pytest.mark.esp32s3 @pytest.mark.esp32s3
@pytest.mark.esp32h4 @pytest.mark.esp32h4
@pytest.mark.i154_multi_dut @pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1) @pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.parametrize( @pytest.mark.parametrize(
'port, config, count, app_path, beta_target, target', [ 'port, config, count, app_path, beta_target, target', [
@ -402,27 +416,23 @@ def test_ICMP_NAT64(Init_interface:bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) ->
assert Init_interface assert Init_interface
dut[0].serial.stop_redirect_thread() dut[0].serial.stop_redirect_thread()
dataset = '-1' formBasicWiFiThreadNetwork(br, cli)
ocf.config_network(br, cli, 'br', 'random', dataset, br, wifi_ssid, wifi_psk)
flag = False
try: try:
assert ocf.is_joined_wifi_network(br) assert ocf.is_joined_wifi_network(br)
host_ipv4_address = ocf.get_host_ipv4_address() host_ipv4_address = ocf.get_host_ipv4_address()
print('host_ipv4_address: ', host_ipv4_address) print('host_ipv4_address: ', host_ipv4_address)
rx_nums = ocf.ot_ping(cli, str(host_ipv4_address), 5)[1] rx_nums = ocf.ot_ping(cli, str(host_ipv4_address), 5)[1]
assert rx_nums != 0 assert rx_nums != 0
flag = True
finally: finally:
br.write('factoryreset') br.write('factoryreset')
cli.write('factoryreset') cli.write('factoryreset')
time.sleep(3) time.sleep(3)
assert flag
# Case 8: UDP communication via NAT64 # Case 8: UDP communication via NAT64
@pytest.mark.esp32s3 @pytest.mark.esp32s3
@pytest.mark.esp32h4 @pytest.mark.esp32h4
@pytest.mark.i154_multi_dut @pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1) @pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.parametrize( @pytest.mark.parametrize(
'port, config, count, app_path, beta_target, target', [ 'port, config, count, app_path, beta_target, target', [
@ -440,8 +450,7 @@ def test_UDP_NAT64(Init_interface:bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> N
assert Init_interface assert Init_interface
dut[0].serial.stop_redirect_thread() dut[0].serial.stop_redirect_thread()
dataset = '-1' formBasicWiFiThreadNetwork(br, cli)
ocf.config_network(br, cli, 'br', 'random', dataset, br, wifi_ssid, wifi_psk)
try: try:
assert ocf.is_joined_wifi_network(br) assert ocf.is_joined_wifi_network(br)
br.write('bbr') br.write('bbr')
@ -475,7 +484,7 @@ def test_UDP_NAT64(Init_interface:bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> N
# Case 9: TCP communication via NAT64 # Case 9: TCP communication via NAT64
@pytest.mark.esp32s3 @pytest.mark.esp32s3
@pytest.mark.esp32h4 @pytest.mark.esp32h4
@pytest.mark.i154_multi_dut @pytest.mark.openthread_br
@pytest.mark.flaky(reruns=1, reruns_delay=1) @pytest.mark.flaky(reruns=1, reruns_delay=1)
@pytest.mark.parametrize( @pytest.mark.parametrize(
'port, config, count, app_path, beta_target, target', [ 'port, config, count, app_path, beta_target, target', [
@ -493,8 +502,7 @@ def test_TCP_NAT64(Init_interface:bool, dut: Tuple[IdfDut, IdfDut, IdfDut]) -> N
assert Init_interface assert Init_interface
dut[0].serial.stop_redirect_thread() dut[0].serial.stop_redirect_thread()
dataset = '-1' formBasicWiFiThreadNetwork(br, cli)
ocf.config_network(br, cli, 'br', 'random', dataset, br, wifi_ssid, wifi_psk)
try: try:
assert ocf.is_joined_wifi_network(br) assert ocf.is_joined_wifi_network(br)
br.write('bbr') br.write('bbr')