kopia lustrzana https://github.com/espressif/esp-idf
91 wiersze
3.8 KiB
Python
91 wiersze
3.8 KiB
Python
from __future__ import unicode_literals
|
|
from SimpleWebSocketServer import SimpleWebSocketServer, WebSocket
|
|
from tiny_test_fw import Utility
|
|
import glob
|
|
import json
|
|
import os
|
|
import re
|
|
import threading
|
|
import ttfw_idf
|
|
|
|
|
|
class IDEWSProtocol(WebSocket):
|
|
|
|
def handleMessage(self):
|
|
try:
|
|
j = json.loads(self.data)
|
|
except Exception as e:
|
|
Utility.console_log('Server ignores error: {}'.format(e), 'orange')
|
|
return
|
|
event = j.get('event')
|
|
if event and 'prog' in j and ((event == 'gdb_stub' and 'port' in j) or
|
|
(event == 'coredump' and 'file' in j)):
|
|
payload = {'event': 'debug_finished'}
|
|
self.sendMessage(json.dumps(payload))
|
|
Utility.console_log('Server sent: {}'.format(payload))
|
|
else:
|
|
Utility.console_log('Server received: {}'.format(j), 'orange')
|
|
|
|
def handleConnected(self):
|
|
Utility.console_log('{} connected to server'.format(self.address))
|
|
|
|
def handleClose(self):
|
|
Utility.console_log('{} closed the connection'.format(self.address))
|
|
|
|
|
|
class WebSocketServer(object):
|
|
HOST = '127.0.0.1'
|
|
PORT = 1123
|
|
|
|
def run(self):
|
|
server = SimpleWebSocketServer(self.HOST, self.PORT, IDEWSProtocol)
|
|
while not self.exit_event.is_set():
|
|
server.serveonce()
|
|
|
|
def __init__(self):
|
|
self.exit_event = threading.Event()
|
|
self.thread = threading.Thread(target=self.run)
|
|
self.thread.start()
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_value, traceback):
|
|
self.exit_event.set()
|
|
self.thread.join(10)
|
|
if self.thread.is_alive():
|
|
Utility.console_log('Thread cannot be joined', 'orange')
|
|
|
|
|
|
@ttfw_idf.idf_custom_test(env_tag='test_jtag_arm', group='test-apps')
|
|
def test_monitor_ide_integration(env, extra_data):
|
|
config_files = glob.glob(os.path.join(os.path.dirname(__file__), 'sdkconfig.ci.*'))
|
|
config_names = [os.path.basename(s).replace('sdkconfig.ci.', '') for s in config_files]
|
|
rel_proj_path = 'tools/test_apps/system/monitor_ide_integration'
|
|
for name in config_names:
|
|
Utility.console_log('Checking config "{}"... '.format(name), 'green', end='')
|
|
dut = env.get_dut('panic', rel_proj_path, app_config_name=name)
|
|
monitor_path = os.path.join(dut.app.idf_path, 'tools/idf_monitor.py')
|
|
elf_path = os.path.join(dut.app.binary_path, 'panic.elf')
|
|
dut.start_app()
|
|
# Closing the DUT because we will reconnect with IDF Monitor
|
|
env.close_dut(dut.name)
|
|
|
|
with WebSocketServer(), ttfw_idf.CustomProcess(' '.join([monitor_path,
|
|
elf_path,
|
|
'--ws', 'ws://{}:{}'.format(WebSocketServer.HOST,
|
|
WebSocketServer.PORT)]),
|
|
logfile='monitor_{}.log'.format(name)) as p:
|
|
p.pexpect_proc.expect(re.compile(r'Guru Meditation Error'), timeout=10)
|
|
p.pexpect_proc.expect_exact('Communicating through WebSocket', timeout=5)
|
|
# "u?" is for Python 2 only in the following regular expressions.
|
|
# The elements of dictionary can be printed in different order depending on the Python version.
|
|
p.pexpect_proc.expect(re.compile(r"WebSocket sent: \{u?.*'event': u?'" + name + "'"), timeout=5)
|
|
p.pexpect_proc.expect_exact('Waiting for debug finished event', timeout=5)
|
|
p.pexpect_proc.expect(re.compile(r"WebSocket received: \{u?'event': u?'debug_finished'\}"), timeout=5)
|
|
p.pexpect_proc.expect_exact('Communications through WebSocket is finished', timeout=5)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
test_monitor_ide_integration()
|