kopia lustrzana https://github.com/RobertGawron/IonizationChamber
89 wiersze
3.4 KiB
Python
Executable File
89 wiersze
3.4 KiB
Python
Executable File
"""
|
|
Mock serial port for tests and simulations.
|
|
|
|
This module provides :class:`MockSerial`, a lightweight stand-in for
|
|
``pyserial.Serial`` that can be injected where real UART hardware would
|
|
otherwise be needed.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
# ─── standard lib ────────────────────────────────────────────────────────
|
|
import random
|
|
import time
|
|
from typing import Callable, Optional
|
|
|
|
# ─── first-party ─────────────────────────────────────────────────────────
|
|
from measurement_acquisition.crc import Crc
|
|
|
|
# ─── types ───────────────────────────────────────────────────────────────
|
|
FrameFn = Callable[[], bytes]
|
|
|
|
|
|
def _default_frame() -> bytes:
|
|
"""Return a pseudo-random frame with a valid CRC byte."""
|
|
time.sleep(1) # mimic 1-second sampling cadence
|
|
mu, sigma = 2.5, 1.5
|
|
msb = int(round(random.gauss(mu, sigma))) % 55
|
|
lsb = int(round(random.gauss(mu, sigma))) % 55
|
|
frame = [5, 1, msb, lsb, 0] # [START, LEN, MSB, LSB, CRC]
|
|
frame[4] = Crc.calculate(frame[:-1])
|
|
return bytes(frame)
|
|
|
|
|
|
class SerialException(Exception):
|
|
"""Raised when the virtual port is mis-used (e.g. read after close)."""
|
|
|
|
|
|
class MockSerial: # pylint: disable=too-few-public-methods
|
|
"""
|
|
Minimal drop-in replacement for ``pyserial.Serial``.
|
|
|
|
Only the subset of functionality required by *measurement_acquisition*
|
|
is implemented.
|
|
"""
|
|
|
|
# ── construction ────────────────────────────────────────────────────
|
|
def __init__( # noqa: D401
|
|
self,
|
|
port: str,
|
|
baudrate: int,
|
|
timeout: Optional[float] = None,
|
|
*,
|
|
frame_fn: FrameFn = _default_frame,
|
|
**_ignored, # keep signature parity # pylint: disable=unused-argument
|
|
) -> None:
|
|
"""Instantiate a mock serial port with the given parameters."""
|
|
self.port = port
|
|
self.baudrate = baudrate
|
|
self.timeout = timeout
|
|
self._is_open = True
|
|
self._frame_fn = frame_fn
|
|
self._tx_log: list[bytes] = []
|
|
|
|
# ── pyserial-compatible API ─────────────────────────────────────────
|
|
@property
|
|
def is_open(self) -> bool:
|
|
"""Return ``True`` while the virtual port is open."""
|
|
return self._is_open
|
|
|
|
def flushInput(self) -> None: # noqa: N802 (keep camelCase for API)
|
|
"""Discard unread bytes (no-op in this stub)."""
|
|
|
|
def read(self, size: int = 1) -> bytes: # pylint: disable=unused-argument
|
|
"""Return one full frame generated by *frame_fn*."""
|
|
if not self._is_open:
|
|
raise SerialException("Port is closed")
|
|
return self._frame_fn()
|
|
|
|
def write(self, data: bytes) -> int:
|
|
"""Record *data* in the internal TX log and echo it to RX."""
|
|
if not self._is_open:
|
|
raise SerialException("Port is closed")
|
|
self._tx_log.append(bytes(data))
|
|
return len(data)
|
|
|
|
def close(self) -> None:
|
|
"""Mark the port as closed."""
|
|
self._is_open = False
|