kopia lustrzana https://github.com/espressif/esp-idf
99 wiersze
3.3 KiB
Python
99 wiersze
3.3 KiB
Python
# SPDX-FileCopyrightText: 2015-2022 Espressif Systems (Shanghai) CO LTD
|
|
# SPDX-License-Identifier: Apache-2.0
|
|
|
|
import json
|
|
import time
|
|
|
|
from .output_helpers import red_print, yellow_print
|
|
|
|
try:
|
|
import websocket
|
|
except ImportError:
|
|
# This is needed for IDE integration only.
|
|
pass
|
|
|
|
|
|
class WebSocketClient(object):
|
|
"""
|
|
WebSocket client used to advertise debug events to WebSocket server by sending and receiving JSON-serialized
|
|
dictionaries.
|
|
|
|
Advertisement of debug event:
|
|
{'event': 'gdb_stub', 'port': '/dev/ttyUSB1', 'prog': 'build/elf_file'} for GDB Stub, or
|
|
{'event': 'coredump', 'file': '/tmp/xy', 'prog': 'build/elf_file'} for coredump,
|
|
where 'port' is the port for the connected device, 'prog' is the full path to the ELF file and 'file' is the
|
|
generated coredump file.
|
|
|
|
Expected end of external debugging:
|
|
{'event': 'debug_finished'}
|
|
"""
|
|
|
|
RETRIES = 3
|
|
CONNECTION_RETRY_DELAY = 1
|
|
|
|
def __init__(self, url): # type: (str) -> None
|
|
self.url = url
|
|
self._connect()
|
|
|
|
def _connect(self): # type: () -> None
|
|
"""
|
|
Connect to WebSocket server at url
|
|
"""
|
|
|
|
self.close()
|
|
for _ in range(self.RETRIES):
|
|
try:
|
|
self.ws = websocket.create_connection(self.url)
|
|
break # success
|
|
except NameError:
|
|
raise RuntimeError('Please install the websocket_client package for IDE integration!')
|
|
except Exception as e: # noqa
|
|
red_print('WebSocket connection error: {}'.format(e))
|
|
time.sleep(self.CONNECTION_RETRY_DELAY)
|
|
else:
|
|
raise RuntimeError('Cannot connect to WebSocket server')
|
|
|
|
def close(self): # type: () -> None
|
|
try:
|
|
self.ws.close()
|
|
except AttributeError:
|
|
# Not yet connected
|
|
pass
|
|
except Exception as e: # noqa
|
|
red_print('WebSocket close error: {}'.format(e))
|
|
|
|
def send(self, payload_dict): # type: (dict) -> None
|
|
"""
|
|
Serialize payload_dict in JSON format and send it to the server
|
|
"""
|
|
for _ in range(self.RETRIES):
|
|
try:
|
|
self.ws.send(json.dumps(payload_dict))
|
|
yellow_print('WebSocket sent: {}'.format(payload_dict))
|
|
break
|
|
except Exception as e: # noqa
|
|
red_print('WebSocket send error: {}'.format(e))
|
|
self._connect()
|
|
else:
|
|
raise RuntimeError('Cannot send to WebSocket server')
|
|
|
|
def wait(self, expect_iterable): # type: (list) -> None
|
|
"""
|
|
Wait until a dictionary in JSON format is received from the server with all (key, value) tuples from
|
|
expect_iterable.
|
|
"""
|
|
for _ in range(self.RETRIES):
|
|
try:
|
|
r = self.ws.recv()
|
|
except Exception as e:
|
|
red_print('WebSocket receive error: {}'.format(e))
|
|
self._connect()
|
|
continue
|
|
obj = json.loads(r)
|
|
if all([k in obj and obj[k] == v for k, v in expect_iterable]):
|
|
yellow_print('WebSocket received: {}'.format(obj))
|
|
break
|
|
red_print('WebSocket expected: {}, received: {}'.format(dict(expect_iterable), obj))
|
|
else:
|
|
raise RuntimeError('Cannot receive from WebSocket server')
|