esp-idf/components/lwip/weekend_test/net_suite_test.py

152 wiersze
5.6 KiB
Python

import os
import re
import socket
import subprocess
import time
from shutil import copyfile
from threading import Event, Thread
import ttfw_idf
from tiny_test_fw import DUT, Utility
stop_sock_listener = Event()
stop_io_listener = Event()
sock = None
client_address = None
manual_test = False
def io_listener(dut1):
global sock
global client_address
data = b''
while not stop_io_listener.is_set():
try:
data = dut1.expect(re.compile(r'PacketOut:\[([a-fA-F0-9]+)\]'), timeout=5)
except DUT.ExpectTimeout:
continue
if data != () and data[0] != b'':
packet_data = data[0]
print('Packet_data>{}<'.format(packet_data))
response = bytearray.fromhex(packet_data.decode())
print('Sending to socket:')
packet = ' '.join(format(x, '02x') for x in bytearray(response))
print('Packet>{}<'.format(packet))
if client_address is not None:
sock.sendto(response, ('127.0.0.1', 7777))
def sock_listener(dut1):
global sock
global client_address
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(5)
server_address = '0.0.0.0'
server_port = 7771
server = (server_address, server_port)
sock.bind(server)
try:
while not stop_sock_listener.is_set():
try:
payload, client_address = sock.recvfrom(1024)
packet = ' '.join(format(x, '02x') for x in bytearray(payload))
print('Received from address {}, data {}'.format(client_address, packet))
dut1.write(str.encode(packet))
except socket.timeout:
pass
finally:
sock.close()
sock = None
@ttfw_idf.idf_example_test(env_tag='Example_WIFI')
def lwip_test_suite(env, extra_data):
global stop_io_listener
global stop_sock_listener
"""
steps: |
1. Rebuilds test suite with esp32_netsuite.ttcn
2. Starts listeners on stdout and socket
3. Execute ttcn3 test suite
4. Collect result from ttcn3
"""
dut1 = env.get_dut('net_suite', 'examples/system/network_tests', dut_class=ttfw_idf.ESP32DUT)
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path, 'net_suite.bin')
bin_size = os.path.getsize(binary_file)
ttfw_idf.log_performance('net_suite', '{}KB'.format(bin_size // 1024))
ttfw_idf.check_performance('net_suite', bin_size // 1024, dut1.TARGET)
dut1.start_app()
thread1 = Thread(target=sock_listener, args=(dut1, ))
thread2 = Thread(target=io_listener, args=(dut1, ))
if not manual_test:
# Variables refering to esp32 ttcn test suite
TTCN_SRC = 'esp32_netsuite.ttcn'
TTCN_CFG = 'esp32_netsuite.cfg'
# System Paths
netsuite_path = os.getenv('NETSUITE_PATH')
netsuite_src_path = os.path.join(netsuite_path, 'src')
test_dir = os.path.dirname(os.path.realpath(__file__))
# Building the suite
print('Rebuilding the test suite')
print('-------------------------')
# copy esp32 specific files to ttcn net-suite dir
copyfile(os.path.join(test_dir, TTCN_SRC), os.path.join(netsuite_src_path, TTCN_SRC))
copyfile(os.path.join(test_dir, TTCN_CFG), os.path.join(netsuite_src_path, TTCN_CFG))
proc = subprocess.Popen(['bash', '-c', 'cd ' + netsuite_src_path + ' && source make.sh'],
cwd=netsuite_path, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output = proc.stdout.read()
print('Note: First build step we expect failure (titan/net_suite build system not suitable for multijob make)')
print(output)
proc = subprocess.Popen(['bash', '-c', 'cd ' + netsuite_src_path + ' && make'],
cwd=netsuite_path, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print('Note: This time all dependencies shall be generated -- multijob make shall pass')
output = proc.stdout.read()
print(output)
# Executing the test suite
thread1.start()
thread2.start()
time.sleep(2)
print('Executing the test suite')
print('------------------------')
proc = subprocess.Popen(['ttcn3_start', os.path.join(netsuite_src_path,'test_suite'), os.path.join(netsuite_src_path, TTCN_CFG)],
stdout=subprocess.PIPE)
output = proc.stdout.read()
print(output)
print('Collecting results')
print('------------------')
verdict_stats = re.search('(Verdict statistics:.*)', output)
if verdict_stats:
verdict_stats = verdict_stats.group(1)
else:
verdict_stats = b''
verdict = re.search('Overall verdict: pass', output)
if verdict:
print('Test passed!')
Utility.console_log(verdict_stats, 'green')
else:
Utility.console_log(verdict_stats, 'red')
raise ValueError('Test failed with: {}'.format(verdict_stats))
else:
try:
# Executing the test suite
thread1.start()
thread2.start()
time.sleep(2)
while True:
time.sleep(0.5)
except KeyboardInterrupt:
pass
print('Executing done, waiting for tests to finish')
print('-------------------------------------------')
stop_io_listener.set()
stop_sock_listener.set()
thread1.join()
thread2.join()
if __name__ == '__main__':
print('Manual execution, please build and start ttcn in a separate console')
manual_test = True
lwip_test_suite()