IonizationChamber/Software/MeasurementAcquisition/test_component/test_application.py

178 wiersze
7.4 KiB
Python
Executable File

"""StateMachine integration tests (CSV backend injected via IMeasurementStorage)."""
from __future__ import annotations
import csv
import itertools
import sys
from datetime import datetime, timedelta
from pathlib import Path
from typing import ClassVar, Final
from measurement_acquisition.config import (
UartConfig,
CsvWriterConfig,
DataFrameConfig,
)
from measurement_acquisition.measurement_storage import MeasurementStorage
from measurement_acquisition.physical_layer import PhysicalLayer
from measurement_acquisition.transport_layer import TransportLayer
from measurement_acquisition.application_layer import ApplicationLayer
from measurement_acquisition.state_machine import StateMachine
from measurement_acquisition.mcp3425_converter import MCP3425Converter
from measurement_acquisition.crc import Crc
# ─────────────────────────────────────────────────────────────────────────────
# Fake serial back-ends
# ─────────────────────────────────────────────────────────────────────────────
class _FakeSerial: # pylint: disable=too-few-public-methods
"""Ultra-light stand-in for :class:`serial.Serial` that emits zero frames."""
def __init__(self, *_, **__):
self.is_open = True
self._frames = itertools.repeat(b"\x00\x00\x00\x00\x00")
def flushInput(self): # noqa: N802 – Serial API is camelCase
pass
def read(self, size: int, *_, **__) -> bytes: # noqa: D401
return next(self._frames)[:size]
class _SequenceFakeSerial(_FakeSerial):
"""Cycles through a predefined list of frames, then repeats the last one."""
def __init__(self, frames: list[bytes]):
super().__init__()
self._frames_iter = iter(frames)
self._last = frames[-1]
def read(self, size: int, *_, **__) -> bytes: # noqa: D401
frame = next(self._frames_iter, self._last)
return frame[:size]
# ─────────────────────────────────────────────────────────────────────────────
# Helper CsvWriterCfgHelper subclass – override only what the tests need
# ─────────────────────────────────────────────────────────────────────────────
class CsvWriterCfgHelper(CsvWriterConfig): # pylint: disable=too-few-public-methods
"""Minimal CsvWriterConfig override used by integration tests."""
PATH: ClassVar[Final[Path]] = Path("data.csv")
DELIMITER: ClassVar[Final[str]] = ","
TIMESTAMP_LABEL: ClassVar[Final[str]] = "time"
VOLTAGE_LABEL: ClassVar[Final[str]] = "voltage"
# ─────────────────────────────────────────────────────────────────────────────
# Shared helpers
# ─────────────────────────────────────────────────────────────────────────────
MEASUREMENT_COUNT = 10
TOTAL_TICKS = 2 + MEASUREMENT_COUNT * 3 # state-machine cadence
def _build_state_machine() -> tuple[StateMachine, CsvWriterCfgHelper]:
"""Assemble the full stack with the *current* `_FakeSerial` implementation."""
# Physical layer with fake UART
uart_cfg = UartConfig()
physical_layer = PhysicalLayer(uart_cfg, serial_factory=_FakeSerial)
# Transport layer (frame layout comes first, then physical layer)
frame_cfg = DataFrameConfig()
transport_layer = TransportLayer(physical_layer, frame_cfg)
# Application layer
application_layer = ApplicationLayer(transport_layer=transport_layer)
# Storage back-end
csv_cfg = CsvWriterCfgHelper()
storage = MeasurementStorage(csv_cfg)
# Fully wired state machine
return StateMachine(application_layer, storage), csv_cfg
def _voltage_to_bytes(voltage: float) -> tuple[int, int]:
"""Return (MSB, LSB) representing *voltage* for an MCP3425 frame."""
raw = int(round(voltage / MCP3425Converter.BASE_LSB)) & 0xFFFF
return (raw >> 8) & 0xFF, raw & 0xFF
def _make_frame(msb: int, lsb: int) -> bytes:
"""Return a 5-byte frame [B0, B1, MSB, LSB, CRC]."""
payload = [0x00, 0x00, msb, lsb]
crc = Crc.calculate(bytes(payload))
return bytes(payload + [crc])
# ─────────────────────────────────────────────────────────────────────────────
# Tests
# ─────────────────────────────────────────────────────────────────────────────
def test_application(tmp_path, monkeypatch):
"""Full-stack run with zero-filled frames."""
monkeypatch.chdir(tmp_path)
state_machine, csv_cfg = _build_state_machine()
for _ in range(TOTAL_TICKS):
state_machine.tick()
state_machine.measurement_storage.disconnect()
rows = list(
csv.reader(
csv_cfg.PATH.read_text(encoding="utf-8").splitlines(),
delimiter=csv_cfg.DELIMITER,
)
)
header, *data_rows = rows
assert header == [csv_cfg.TIMESTAMP_LABEL, csv_cfg.VOLTAGE_LABEL]
assert len(data_rows) == MEASUREMENT_COUNT
assert all(float(v) == 0.0 for _, v in data_rows)
# pylint: disable=too-many-locals
def test_application_non_zero_values(tmp_path, monkeypatch):
"""Full-stack run with custom voltages encoded in sequence frames."""
monkeypatch.chdir(tmp_path)
sample_voltages = [0.125, 0.25, 0.5]
frames = [_make_frame(*_voltage_to_bytes(v)) for v in sample_voltages]
# Replace the serial implementation only for this test
monkeypatch.setattr(
sys.modules[__name__],
"_FakeSerial",
lambda *a, **kw: _SequenceFakeSerial(frames),
)
state_machine, csv_cfg = _build_state_machine()
# Deterministic timestamps
t0 = datetime(2025, 7, 2, 13, 0, 0)
times = [t0 + timedelta(milliseconds=i) for i in range(len(sample_voltages))]
time_iter = iter(times)
state_machine.now_func = lambda: next(time_iter)
total_ticks = 2 + len(sample_voltages) * 3
for _ in range(total_ticks):
state_machine.tick()
state_machine.measurement_storage.disconnect()
rows = list(
csv.reader(
csv_cfg.PATH.read_text(encoding="utf-8").splitlines(),
delimiter=csv_cfg.DELIMITER,
)
)
header, *data_rows = rows
assert header == [csv_cfg.TIMESTAMP_LABEL, csv_cfg.VOLTAGE_LABEL]
assert len(data_rows) == len(sample_voltages)
for (ts, volt), expected_time, expected_volt in zip(
data_rows, times, sample_voltages, strict=True
):
assert ts == expected_time.isoformat(sep=" ", timespec="milliseconds")
assert float(volt) == expected_volt