kopia lustrzana https://github.com/corrscope/corrscope
Add separate trigger and render_subsampling configs
rodzic
9a47ca8990
commit
0acea45908
|
@ -17,6 +17,7 @@
|
|||
<option value="E302" />
|
||||
<option value="E203" />
|
||||
<option value="E128" />
|
||||
<option value="E266" />
|
||||
</list>
|
||||
</option>
|
||||
</inspection_tool>
|
||||
|
|
|
@ -30,41 +30,46 @@ class ChannelConfig:
|
|||
|
||||
|
||||
class Channel:
|
||||
# Shared between trigger and renderer.
|
||||
window_samp: int
|
||||
# trigger_samp is unneeded, since __init__ (not Ovgenpy) constructs triggers.
|
||||
render_samp: int
|
||||
# TODO add a "get_around" method for rendering (also helps test_channel_subsampling)
|
||||
# Currently Ovgenpy peeks at Chanel.render_samp and render_stride (bad).
|
||||
|
||||
# Product of ovgen_cfg.subsampling and trigger/render_width.
|
||||
trigger_subsampling: int
|
||||
render_subsampling: int
|
||||
trigger_stride: int
|
||||
render_stride: int
|
||||
|
||||
def __init__(self, cfg: ChannelConfig, ovgen_cfg: 'Config'):
|
||||
self.cfg = cfg
|
||||
subsampling = ovgen_cfg.subsampling
|
||||
|
||||
# Create a Wave object.
|
||||
wcfg = _WaveConfig(amplification=ovgen_cfg.amplification * cfg.ampl_ratio)
|
||||
self.wave = Wave(wcfg, abspath(cfg.wav_path))
|
||||
|
||||
# Compute subsampling (array stride).
|
||||
# `subsampling` increases `stride` and decreases `nsamp`.
|
||||
# `width` increases `stride` without changing `nsamp`.
|
||||
tsub = ovgen_cfg.trigger_subsampling
|
||||
tw = coalesce(cfg.trigger_width, ovgen_cfg.trigger_width)
|
||||
self.trigger_subsampling = subsampling * tw
|
||||
|
||||
rsub = ovgen_cfg.render_subsampling
|
||||
rw = coalesce(cfg.render_width, ovgen_cfg.render_width)
|
||||
self.render_subsampling = subsampling * rw
|
||||
|
||||
# Compute window_samp and tsamp_frame.
|
||||
nsamp = ovgen_cfg.render_width_s * self.wave.smp_s / subsampling
|
||||
self.window_samp = round(nsamp)
|
||||
# nsamp = orig / subsampling
|
||||
# stride = subsampling * width
|
||||
def calculate_nsamp(sub):
|
||||
return round(ovgen_cfg.width_s * self.wave.smp_s / sub)
|
||||
trigger_samp = calculate_nsamp(tsub)
|
||||
self.render_samp = calculate_nsamp(rsub)
|
||||
|
||||
del subsampling
|
||||
del nsamp
|
||||
self.trigger_stride = tsub * tw
|
||||
self.render_stride = rsub * rw
|
||||
|
||||
# Create a Trigger object.
|
||||
tcfg = cfg.trigger or ovgen_cfg.trigger
|
||||
self.trigger = tcfg(
|
||||
wave=self.wave,
|
||||
tsamp=self.window_samp,
|
||||
subsampling=self.trigger_subsampling,
|
||||
tsamp=trigger_samp,
|
||||
subsampling=self.trigger_stride,
|
||||
fps=ovgen_cfg.fps
|
||||
)
|
||||
|
||||
|
|
|
@ -13,7 +13,7 @@ from ovgenpy.layout import LayoutConfig
|
|||
from ovgenpy.triggers import ITriggerConfig, CorrelationTriggerConfig, PerFrameCache
|
||||
from ovgenpy.util import pushd, coalesce
|
||||
from ovgenpy.utils import keyword_dataclasses as dc
|
||||
from ovgenpy.utils.keyword_dataclasses import field
|
||||
from ovgenpy.utils.keyword_dataclasses import field, InitVar
|
||||
from ovgenpy.wave import Wave
|
||||
|
||||
if TYPE_CHECKING:
|
||||
|
@ -39,7 +39,12 @@ class Config:
|
|||
end_time: float = None
|
||||
|
||||
width_ms: int
|
||||
subsampling: int = 1
|
||||
|
||||
# trigger_subsampling and render_subsampling override subsampling.
|
||||
trigger_subsampling: int = None
|
||||
render_subsampling: int = None
|
||||
subsampling: InitVar[int] = 1
|
||||
|
||||
trigger_width: int = 1
|
||||
render_width: int = 1
|
||||
|
||||
|
@ -65,10 +70,11 @@ class Config:
|
|||
# endregion
|
||||
|
||||
@property
|
||||
def render_width_s(self) -> float:
|
||||
def width_s(self) -> float:
|
||||
return self.width_ms / 1000
|
||||
|
||||
def __post_init__(self):
|
||||
def __post_init__(self, subsampling):
|
||||
# Cast benchmark_mode to enum.
|
||||
try:
|
||||
if not isinstance(self.benchmark_mode, BenchmarkMode):
|
||||
self.benchmark_mode = BenchmarkMode[self.benchmark_mode]
|
||||
|
@ -77,6 +83,10 @@ class Config:
|
|||
f'invalid benchmark_mode mode {self.benchmark_mode} not in '
|
||||
f'{[el.name for el in BenchmarkMode]}')
|
||||
|
||||
# Compute trigger_subsampling and render_subsampling.
|
||||
self.trigger_subsampling = coalesce(self.trigger_subsampling, subsampling)
|
||||
self.render_subsampling = coalesce(self.render_subsampling, subsampling)
|
||||
|
||||
|
||||
_FPS = 60 # f_s
|
||||
|
||||
|
@ -87,6 +97,7 @@ def default_config(**kwargs):
|
|||
amplification=1,
|
||||
|
||||
width_ms=40,
|
||||
# FIXME add trigger_subsampling and render_subsampling
|
||||
subsampling=2,
|
||||
trigger=CorrelationTriggerConfig(
|
||||
edge_strength=2,
|
||||
|
@ -212,7 +223,7 @@ class Ovgen:
|
|||
print(rounded)
|
||||
prev = rounded
|
||||
|
||||
datas = []
|
||||
render_datas = []
|
||||
# Get data from each wave
|
||||
for wave, channel in zip(self.waves, self.channels):
|
||||
sample = round(wave.smp_s * time_seconds)
|
||||
|
@ -223,8 +234,8 @@ class Ovgen:
|
|||
else:
|
||||
trigger_sample = sample
|
||||
|
||||
datas.append(wave.get_around(
|
||||
trigger_sample, channel.window_samp, channel.render_subsampling))
|
||||
render_datas.append(wave.get_around(
|
||||
trigger_sample, channel.render_samp, channel.render_stride))
|
||||
|
||||
# region Display buffers, for debugging purposes.
|
||||
if extra_outputs.window:
|
||||
|
@ -240,7 +251,7 @@ class Ovgen:
|
|||
|
||||
if not_benchmarking or benchmark_mode >= BenchmarkMode.RENDER:
|
||||
# Render frame
|
||||
renderer.render_frame(datas)
|
||||
renderer.render_frame(render_datas)
|
||||
frame = renderer.get_frame()
|
||||
|
||||
if not_benchmarking or benchmark_mode == BenchmarkMode.OUTPUT:
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
from typing import Optional
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
from hypothesis import given, reproduce_failure
|
||||
from hypothesis.strategies import integers
|
||||
from hypothesis import given
|
||||
import hypothesis.strategies as hs
|
||||
from pytest_mock import MockFixture
|
||||
|
||||
import ovgenpy.channel
|
||||
|
@ -9,21 +11,25 @@ import ovgenpy.ovgenpy
|
|||
from ovgenpy.channel import ChannelConfig, Channel
|
||||
from ovgenpy.ovgenpy import default_config, Ovgen, BenchmarkMode
|
||||
from ovgenpy.triggers import NullTriggerConfig
|
||||
from ovgenpy.util import coalesce
|
||||
|
||||
assert reproduce_failure
|
||||
positive = hs.integers(min_value=1, max_value=100)
|
||||
maybe = hs.one_of(positive, hs.none())
|
||||
|
||||
|
||||
positive = integers(min_value=1, max_value=100)
|
||||
|
||||
@given(subsampling=positive, trigger_width=positive, render_width=positive)
|
||||
@given(subsampling=positive, tsub=maybe, rsub=maybe,
|
||||
trigger_width=positive, render_width=positive)
|
||||
def test_channel_subsampling(
|
||||
subsampling: int,
|
||||
trigger_width: int,
|
||||
render_width: int,
|
||||
mocker: MockFixture
|
||||
subsampling: int,
|
||||
tsub: Optional[int],
|
||||
rsub: Optional[int],
|
||||
trigger_width: int,
|
||||
render_width: int,
|
||||
mocker: MockFixture
|
||||
):
|
||||
""" Ensure window_samp and trigger/render subsampling are computed correctly. """
|
||||
""" Ensure trigger/render_samp and trigger/render subsampling
|
||||
are computed correctly. """
|
||||
|
||||
# region setup test variables
|
||||
ovgenpy.ovgenpy.PRINT_TIMESTAMP = False # Cleanup Hypothesis testing logs
|
||||
|
||||
Wave = mocker.patch.object(ovgenpy.channel, 'Wave')
|
||||
|
@ -44,39 +50,50 @@ def test_channel_subsampling(
|
|||
cfg = default_config(
|
||||
channels=[ccfg],
|
||||
subsampling=subsampling,
|
||||
trigger_subsampling=tsub,
|
||||
render_subsampling=rsub,
|
||||
trigger=NullTriggerConfig(),
|
||||
benchmark_mode=BenchmarkMode.OUTPUT
|
||||
)
|
||||
channel = Channel(ccfg, cfg)
|
||||
# endregion
|
||||
|
||||
# Ensure channel.window_samp, trigger_subsampling, render_subsampling are correct.
|
||||
ideal_nsamp = pytest.approx(
|
||||
round(cfg.render_width_s * channel.wave.smp_s / subsampling), 1)
|
||||
tsub = coalesce(tsub, subsampling)
|
||||
rsub = coalesce(rsub, subsampling)
|
||||
|
||||
assert channel.window_samp == ideal_nsamp
|
||||
assert channel.trigger_subsampling == subsampling * trigger_width
|
||||
assert channel.render_subsampling == subsampling * render_width
|
||||
def ideal_samp(sub):
|
||||
return pytest.approx(
|
||||
round(cfg.width_s * channel.wave.smp_s / sub), abs=1)
|
||||
|
||||
# Ensure trigger uses channel.window_samp and trigger_subsampling.
|
||||
ideal_tsamp = ideal_samp(tsub)
|
||||
ideal_rsamp = ideal_samp(rsub)
|
||||
assert channel.render_samp == ideal_rsamp
|
||||
del subsampling
|
||||
|
||||
assert channel.trigger_stride == tsub * trigger_width
|
||||
assert channel.render_stride == rsub * render_width
|
||||
|
||||
## Ensure trigger uses channel.window_samp and trigger_stride.
|
||||
trigger = channel.trigger
|
||||
assert trigger._tsamp == channel.window_samp
|
||||
assert trigger._subsampling == channel.trigger_subsampling
|
||||
assert trigger._tsamp == ideal_tsamp
|
||||
assert trigger._subsampling == channel.trigger_stride
|
||||
|
||||
# Ensure ovgenpy calls render using channel.window_samp and render_subsampling.
|
||||
## Ensure ovgenpy calls render using channel.render_samp and render_stride.
|
||||
ovgen = Ovgen(cfg, '.', outputs=[])
|
||||
renderer = mocker.patch.object(Ovgen, '_load_renderer').return_value
|
||||
ovgen.play()
|
||||
|
||||
# Inspect arguments to wave.get_around()
|
||||
# Only render (not NullTrigger) calls wave.get_around().
|
||||
(_sample, _region_nsamp, _subsampling), kwargs = wave.get_around.call_args
|
||||
assert _region_nsamp == channel.window_samp
|
||||
assert _subsampling == channel.render_subsampling
|
||||
assert _region_nsamp == channel.render_samp
|
||||
assert _subsampling == channel.render_stride
|
||||
|
||||
# Inspect arguments to renderer.render_frame()
|
||||
# datas: List[np.ndarray]
|
||||
(datas,), kwargs = renderer.render_frame.call_args
|
||||
render_data = datas[0]
|
||||
assert len(render_data) == channel.window_samp
|
||||
assert len(render_data) == channel.render_samp
|
||||
|
||||
|
||||
# line_color is tested in test_renderer.py
|
||||
|
|
Ładowanie…
Reference in New Issue