""" Mock serial port for tests and simulations. This module provides :class:`MockSerial`, a lightweight stand-in for ``pyserial.Serial`` that can be injected where real UART hardware would otherwise be needed. """ from __future__ import annotations # ─── standard lib ──────────────────────────────────────────────────────── import random import time from typing import Callable, Optional # ─── first-party ───────────────────────────────────────────────────────── from measurement_acquisition.crc import Crc # ─── types ─────────────────────────────────────────────────────────────── FrameFn = Callable[[], bytes] def _default_frame() -> bytes: """Return a pseudo-random frame with a valid CRC byte.""" time.sleep(1) # mimic 1-second sampling cadence mu, sigma = 2.5, 1.5 msb = int(round(random.gauss(mu, sigma))) % 55 lsb = int(round(random.gauss(mu, sigma))) % 55 frame = [5, 1, msb, lsb, 0] # [START, LEN, MSB, LSB, CRC] frame[4] = Crc.calculate(frame[:-1]) return bytes(frame) class SerialException(Exception): """Raised when the virtual port is mis-used (e.g. read after close).""" class MockSerial: # pylint: disable=too-few-public-methods """ Minimal drop-in replacement for ``pyserial.Serial``. Only the subset of functionality required by *measurement_acquisition* is implemented. """ # ── construction ──────────────────────────────────────────────────── def __init__( # noqa: D401 self, port: str, baudrate: int, timeout: Optional[float] = None, *, frame_fn: FrameFn = _default_frame, **_ignored, # keep signature parity # pylint: disable=unused-argument ) -> None: """Instantiate a mock serial port with the given parameters.""" self.port = port self.baudrate = baudrate self.timeout = timeout self._is_open = True self._frame_fn = frame_fn self._tx_log: list[bytes] = [] # ── pyserial-compatible API ───────────────────────────────────────── @property def is_open(self) -> bool: """Return ``True`` while the virtual port is open.""" return self._is_open def flushInput(self) -> None: # noqa: N802 (keep camelCase for API) """Discard unread bytes (no-op in this stub).""" def read(self, size: int = 1) -> bytes: # pylint: disable=unused-argument """Return one full frame generated by *frame_fn*.""" if not self._is_open: raise SerialException("Port is closed") return self._frame_fn() def write(self, data: bytes) -> int: """Record *data* in the internal TX log and echo it to RX.""" if not self._is_open: raise SerialException("Port is closed") self._tx_log.append(bytes(data)) return len(data) def close(self) -> None: """Mark the port as closed.""" self._is_open = False