2018-12-06 20:27:09 +00:00
|
|
|
import attr
|
2018-07-14 10:36:49 +00:00
|
|
|
import matplotlib.pyplot as plt
|
2018-07-15 13:04:50 +00:00
|
|
|
import pytest
|
2018-07-14 10:36:49 +00:00
|
|
|
from matplotlib.axes import Axes
|
|
|
|
from matplotlib.figure import Figure
|
|
|
|
|
2018-07-20 05:38:21 +00:00
|
|
|
from ovgenpy import triggers
|
2018-10-29 02:12:02 +00:00
|
|
|
from ovgenpy.triggers import CorrelationTriggerConfig, CorrelationTrigger, \
|
2018-10-29 09:10:03 +00:00
|
|
|
PerFrameCache, ZeroCrossingTriggerConfig, LocalPostTriggerConfig
|
2018-07-14 10:36:49 +00:00
|
|
|
from ovgenpy.wave import Wave
|
|
|
|
|
|
|
|
triggers.SHOW_TRIGGER = False
|
|
|
|
|
|
|
|
|
2018-12-06 20:27:09 +00:00
|
|
|
def cfg_template(**kwargs) -> CorrelationTriggerConfig:
|
|
|
|
""" Not identical to default_config() template. """
|
|
|
|
cfg = CorrelationTriggerConfig(
|
|
|
|
edge_strength=2,
|
|
|
|
responsiveness=1,
|
|
|
|
buffer_falloff=0.5,
|
|
|
|
use_edge_trigger=False,
|
|
|
|
)
|
|
|
|
return attr.evolve(cfg, **kwargs)
|
|
|
|
|
|
|
|
|
2018-07-15 13:04:50 +00:00
|
|
|
@pytest.fixture(scope='session', params=[False, True])
|
|
|
|
def cfg(request):
|
|
|
|
use_edge_trigger = request.param
|
2018-12-06 20:27:09 +00:00
|
|
|
return cfg_template(use_edge_trigger=use_edge_trigger)
|
2018-07-15 13:04:50 +00:00
|
|
|
|
|
|
|
|
2018-10-29 09:10:03 +00:00
|
|
|
@pytest.fixture(scope='session', params=[
|
|
|
|
None,
|
|
|
|
ZeroCrossingTriggerConfig(),
|
|
|
|
LocalPostTriggerConfig(strength=1)
|
|
|
|
])
|
2018-10-29 02:12:02 +00:00
|
|
|
def post_cfg(request):
|
|
|
|
post = request.param
|
2018-12-06 20:27:09 +00:00
|
|
|
return cfg_template(post=post)
|
2018-10-29 02:12:02 +00:00
|
|
|
|
|
|
|
|
2018-08-25 23:35:09 +00:00
|
|
|
# I regret adding the nsamp_frame parameter. It makes unit tests hard.
|
|
|
|
|
2018-08-26 01:51:01 +00:00
|
|
|
FPS = 60
|
2018-08-25 23:35:09 +00:00
|
|
|
|
2018-07-28 23:13:51 +00:00
|
|
|
def test_trigger(cfg: CorrelationTriggerConfig):
|
2018-07-14 22:42:10 +00:00
|
|
|
wave = Wave(None, 'tests/impulse24000.wav')
|
|
|
|
|
|
|
|
iters = 5
|
|
|
|
plot = False
|
2018-07-29 09:07:00 +00:00
|
|
|
x0 = 24000
|
|
|
|
x = x0 - 500
|
2018-11-17 23:47:32 +00:00
|
|
|
trigger: CorrelationTrigger = cfg(wave, 4000, stride=1, fps=FPS)
|
2018-07-14 10:36:49 +00:00
|
|
|
|
2018-07-14 22:42:10 +00:00
|
|
|
if plot:
|
|
|
|
BIG = 0.95
|
|
|
|
SMALL = 0.05
|
|
|
|
fig, axes = plt.subplots(iters, gridspec_kw=dict(
|
|
|
|
top=BIG, right=BIG,
|
|
|
|
bottom=SMALL, left=SMALL,
|
|
|
|
)) # type: Figure, Axes
|
|
|
|
fig.tight_layout()
|
|
|
|
else:
|
|
|
|
axes = range(iters)
|
2018-07-14 10:36:49 +00:00
|
|
|
|
|
|
|
for i, ax in enumerate(axes):
|
|
|
|
if i:
|
2018-10-29 00:46:07 +00:00
|
|
|
offset = trigger.get_trigger(x, PerFrameCache())
|
2018-07-29 09:07:00 +00:00
|
|
|
print(offset)
|
|
|
|
assert offset == x0
|
2018-07-14 22:42:10 +00:00
|
|
|
if plot:
|
|
|
|
ax.plot(trigger._buffer, label=str(i))
|
|
|
|
ax.grid()
|
|
|
|
|
|
|
|
if plot:
|
|
|
|
plt.show()
|
2018-07-29 09:07:00 +00:00
|
|
|
|
|
|
|
|
2018-11-17 23:47:32 +00:00
|
|
|
def test_trigger_stride(cfg: CorrelationTriggerConfig):
|
2018-07-29 09:07:00 +00:00
|
|
|
wave = Wave(None, 'tests/sine440.wav')
|
|
|
|
# period = 48000 / 440 = 109.(09)*
|
|
|
|
|
|
|
|
iters = 5
|
|
|
|
x0 = 24000
|
2018-11-17 23:47:32 +00:00
|
|
|
stride = 4
|
|
|
|
trigger = cfg(wave, tsamp=100, stride=stride, fps=FPS)
|
|
|
|
# real window_samp = window_samp*stride
|
2018-07-29 09:07:00 +00:00
|
|
|
# period = 109
|
|
|
|
|
2018-10-29 00:46:07 +00:00
|
|
|
cache = PerFrameCache()
|
|
|
|
|
2018-07-29 09:07:00 +00:00
|
|
|
for i in range(1, iters):
|
2018-10-29 00:46:07 +00:00
|
|
|
offset = trigger.get_trigger(x0, cache)
|
2018-07-29 09:07:00 +00:00
|
|
|
|
|
|
|
# Debugging CorrelationTrigger.get_trigger:
|
|
|
|
# from matplotlib import pyplot as plt
|
|
|
|
# plt.plot(data)
|
|
|
|
# plt.plot(prev_buffer)
|
|
|
|
# plt.plot(corr)
|
|
|
|
|
|
|
|
# When i=0, the data has 3 peaks, the rightmost taller than the center. The
|
|
|
|
# *tips* of the outer peaks are truncated between `left` and `right`.
|
|
|
|
# After truncation, corr[mid+1] is almost identical to corr[mid], for
|
|
|
|
# reasons I don't understand (mid+1 > mid because dithering?).
|
|
|
|
if not cfg.use_edge_trigger:
|
2018-11-17 23:47:32 +00:00
|
|
|
assert (offset - x0) % stride == 0, f'iteration {i}'
|
2018-10-29 02:12:02 +00:00
|
|
|
assert abs(offset - x0) < 10, f'iteration {i}'
|
2018-07-29 09:07:00 +00:00
|
|
|
|
|
|
|
# The edge trigger activates at x0+1=24001. Likely related: it triggers
|
|
|
|
# when moving from <=0 to >0. This is a necessary evil, in order to
|
|
|
|
# recognize 0-to-positive edges while testing tests/impulse24000.wav .
|
|
|
|
|
|
|
|
else:
|
2018-10-29 02:12:02 +00:00
|
|
|
# If assertion fails, remove it.
|
2018-11-17 23:47:32 +00:00
|
|
|
assert (offset - x0) % stride != 0, f'iteration {i}'
|
2018-10-29 02:12:02 +00:00
|
|
|
assert abs(offset - x0) <= 2, f'iteration {i}'
|
|
|
|
|
|
|
|
|
2018-11-17 23:47:32 +00:00
|
|
|
def test_post_trigger_stride(post_cfg: CorrelationTriggerConfig):
|
2018-10-29 02:12:02 +00:00
|
|
|
cfg = post_cfg
|
|
|
|
|
|
|
|
wave = Wave(None, 'tests/sine440.wav')
|
|
|
|
iters = 5
|
|
|
|
x0 = 24000
|
2018-11-17 23:47:32 +00:00
|
|
|
stride = 4
|
|
|
|
trigger = cfg(wave, tsamp=100, stride=stride, fps=FPS)
|
2018-10-29 02:12:02 +00:00
|
|
|
|
|
|
|
cache = PerFrameCache()
|
|
|
|
for i in range(1, iters):
|
|
|
|
offset = trigger.get_trigger(x0, cache)
|
|
|
|
|
|
|
|
if not cfg.post:
|
2018-11-17 23:47:32 +00:00
|
|
|
assert (offset - x0) % stride == 0, f'iteration {i}'
|
2018-10-29 02:12:02 +00:00
|
|
|
assert abs(offset - x0) < 10, f'iteration {i}'
|
|
|
|
|
|
|
|
else:
|
|
|
|
# If assertion fails, remove it.
|
2018-11-17 23:47:32 +00:00
|
|
|
assert (offset - x0) % stride != 0, f'iteration {i}'
|
2018-10-29 02:12:02 +00:00
|
|
|
assert abs(offset - x0) <= 2, f'iteration {i}'
|
2018-07-29 09:07:00 +00:00
|
|
|
|
|
|
|
|
2018-11-17 23:47:32 +00:00
|
|
|
def test_trigger_stride_edges(cfg: CorrelationTriggerConfig):
|
2018-07-29 09:07:00 +00:00
|
|
|
wave = Wave(None, 'tests/sine440.wav')
|
|
|
|
# period = 48000 / 440 = 109.(09)*
|
|
|
|
|
2018-11-17 23:47:32 +00:00
|
|
|
stride = 4
|
|
|
|
trigger = cfg(wave, tsamp=100, stride=stride, fps=FPS)
|
|
|
|
# real window_samp = window_samp*stride
|
2018-07-29 09:07:00 +00:00
|
|
|
# period = 109
|
|
|
|
|
2018-10-29 00:46:07 +00:00
|
|
|
trigger.get_trigger(0, PerFrameCache())
|
|
|
|
trigger.get_trigger(-1000, PerFrameCache())
|
|
|
|
trigger.get_trigger(50000, PerFrameCache())
|
2018-07-29 09:07:00 +00:00
|
|
|
|
|
|
|
|
2018-08-27 01:35:54 +00:00
|
|
|
def test_trigger_should_recalc_window():
|
2018-12-06 20:27:09 +00:00
|
|
|
cfg = cfg_template(recalc_semitones=1.0)
|
2018-08-27 01:35:54 +00:00
|
|
|
wave = Wave(None, 'tests/sine440.wav')
|
2018-11-17 23:47:32 +00:00
|
|
|
trigger: CorrelationTrigger = cfg(wave, tsamp=1000, stride=1, fps=FPS)
|
2018-08-27 01:35:54 +00:00
|
|
|
|
|
|
|
for x in [0, 1, 1000]:
|
|
|
|
assert trigger._is_window_invalid(x), x
|
|
|
|
|
|
|
|
trigger._prev_period = 100
|
|
|
|
|
|
|
|
for x in [99, 101]:
|
|
|
|
assert not trigger._is_window_invalid(x), x
|
|
|
|
for x in [0, 80, 120]:
|
|
|
|
assert trigger._is_window_invalid(x), x
|
|
|
|
|
|
|
|
trigger._prev_period = 0
|
|
|
|
|
|
|
|
x = 0
|
|
|
|
assert not trigger._is_window_invalid(x), x
|
|
|
|
for x in [1, 100]:
|
|
|
|
assert trigger._is_window_invalid(x), x
|
|
|
|
|
|
|
|
|
2018-08-26 05:44:42 +00:00
|
|
|
# Test the ability to load legacy TriggerConfig
|
|
|
|
|
|
|
|
def test_load_trigger_config():
|
|
|
|
from ovgenpy.config import yaml
|
|
|
|
|
|
|
|
# Ensure no exceptions
|
|
|
|
yaml.load('''\
|
|
|
|
!CorrelationTriggerConfig
|
|
|
|
trigger_strength: 3
|
|
|
|
use_edge_trigger: false
|
|
|
|
responsiveness: 0.2
|
|
|
|
falloff_width: 2
|
|
|
|
''')
|
|
|
|
|
2018-07-29 09:07:00 +00:00
|
|
|
# TODO test_period get_period()
|