IonizationChamber/Simulation/mock_serial.py

89 wiersze
3.4 KiB
Python

2025-06-27 06:15:49 +00:00
"""
Mock serial port for tests and simulations.
This module provides :class:`MockSerial`, a lightweight stand-in for
``pyserial.Serial`` that can be injected where real UART hardware would
otherwise be needed.
"""
from __future__ import annotations
# ─── standard lib ────────────────────────────────────────────────────────
import random
import time
from typing import Callable, Optional
# ─── first-party ─────────────────────────────────────────────────────────
from measurement_acquisition.crc import Crc
# ─── types ───────────────────────────────────────────────────────────────
FrameFn = Callable[[], bytes]
def _default_frame() -> bytes:
"""Return a pseudo-random frame with a valid CRC byte."""
time.sleep(1) # mimic 1-second sampling cadence
mu, sigma = 2.5, 1.5
msb = int(round(random.gauss(mu, sigma))) % 55
lsb = int(round(random.gauss(mu, sigma))) % 55
frame = [5, 1, msb, lsb, 0] # [START, LEN, MSB, LSB, CRC]
frame[4] = Crc.calculate(frame[:-1])
return bytes(frame)
class SerialException(Exception):
"""Raised when the virtual port is mis-used (e.g. read after close)."""
class MockSerial: # pylint: disable=too-few-public-methods
"""
Minimal drop-in replacement for ``pyserial.Serial``.
Only the subset of functionality required by *measurement_acquisition*
is implemented.
"""
# ── construction ────────────────────────────────────────────────────
def __init__( # noqa: D401
self,
port: str,
baudrate: int,
timeout: Optional[float] = None,
*,
frame_fn: FrameFn = _default_frame,
**_ignored, # keep signature parity # pylint: disable=unused-argument
) -> None:
"""Instantiate a mock serial port with the given parameters."""
self.port = port
self.baudrate = baudrate
self.timeout = timeout
self._is_open = True
self._frame_fn = frame_fn
self._tx_log: list[bytes] = []
# ── pyserial-compatible API ─────────────────────────────────────────
@property
def is_open(self) -> bool:
"""Return ``True`` while the virtual port is open."""
return self._is_open
def flushInput(self) -> None: # noqa: N802 (keep camelCase for API)
"""Discard unread bytes (no-op in this stub)."""
def read(self, size: int = 1) -> bytes: # pylint: disable=unused-argument
"""Return one full frame generated by *frame_fn*."""
if not self._is_open:
raise SerialException("Port is closed")
return self._frame_fn()
def write(self, data: bytes) -> int:
"""Record *data* in the internal TX log and echo it to RX."""
if not self._is_open:
raise SerialException("Port is closed")
self._tx_log.append(bytes(data))
return len(data)
def close(self) -> None:
"""Mark the port as closed."""
self._is_open = False