kopia lustrzana https://github.com/corrscope/corrscope
Add support for subsampling in CorrelationTrigger and Wave
rodzic
17ef07fb93
commit
8cce1d1414
|
@ -4,7 +4,7 @@ from typing import TYPE_CHECKING, Type
|
|||
import numpy as np
|
||||
from scipy import signal
|
||||
|
||||
from ovgenpy.config import register_config
|
||||
from ovgenpy.config import register_config, OvgenError
|
||||
from ovgenpy.util import find
|
||||
from ovgenpy.wave import FLOAT
|
||||
|
||||
|
@ -36,8 +36,8 @@ class Trigger(ABC):
|
|||
self.cfg = cfg
|
||||
self._wave = wave
|
||||
|
||||
self._trigger_nsamp = nsamp
|
||||
self._trigger_subsampling = subsampling
|
||||
self._nsamp = nsamp
|
||||
self._subsampling = subsampling
|
||||
|
||||
@abstractmethod
|
||||
def get_trigger(self, index: int) -> int:
|
||||
|
@ -75,7 +75,7 @@ class CorrelationTrigger(Trigger):
|
|||
it's complicated
|
||||
"""
|
||||
Trigger.__init__(self, *args, **kwargs)
|
||||
self._buffer_nsamp = self._trigger_nsamp
|
||||
self._buffer_nsamp = self._nsamp
|
||||
|
||||
# Create correlation buffer (containing a series of old data)
|
||||
self._buffer = np.zeros(self._buffer_nsamp, dtype=FLOAT) # type: np.ndarray[FLOAT]
|
||||
|
@ -97,7 +97,7 @@ class CorrelationTrigger(Trigger):
|
|||
use_edge_trigger = self.cfg.use_edge_trigger
|
||||
|
||||
N = self._buffer_nsamp
|
||||
data = self._wave.get_around(index, N)
|
||||
data = self._wave.get_around(index, N, self._subsampling)
|
||||
|
||||
# prev_buffer = windowed step function + self._buffer
|
||||
halfN = N // 2
|
||||
|
@ -110,10 +110,6 @@ class CorrelationTrigger(Trigger):
|
|||
|
||||
prev_buffer = self._buffer + step
|
||||
|
||||
# Find optimal offset (within ±N//4)
|
||||
mid = N-1
|
||||
radius = N//4
|
||||
|
||||
# Calculate correlation
|
||||
"""
|
||||
If offset < optimal, we need to `offset += positive`.
|
||||
|
@ -130,6 +126,10 @@ class CorrelationTrigger(Trigger):
|
|||
corr = signal.correlate(data, prev_buffer)
|
||||
assert len(corr) == 2*N - 1
|
||||
|
||||
# Find optimal offset (within ±N//4)
|
||||
mid = N-1
|
||||
radius = N//4
|
||||
|
||||
left = mid - radius
|
||||
right = mid + radius + 1
|
||||
|
||||
|
@ -139,10 +139,10 @@ class CorrelationTrigger(Trigger):
|
|||
# argmax(corr) == mid + peak_offset == (data >> peak_offset)
|
||||
# peak_offset == argmax(corr) - mid
|
||||
peak_offset = np.argmax(corr) - mid # type: int
|
||||
trigger = index + peak_offset
|
||||
trigger = index + (self._subsampling * peak_offset)
|
||||
|
||||
# Update correlation buffer (distinct from visible area)
|
||||
aligned = self._wave.get_around(trigger, self._buffer_nsamp)
|
||||
aligned = self._wave.get_around(trigger, self._buffer_nsamp, self._subsampling)
|
||||
self._update_buffer(aligned)
|
||||
|
||||
if use_edge_trigger:
|
||||
|
@ -206,8 +206,13 @@ def get_period(data: np.ndarray) -> int:
|
|||
|
||||
|
||||
class ZeroCrossingTrigger(Trigger):
|
||||
# TODO support subsampling
|
||||
def get_trigger(self, index: int):
|
||||
trigger_nsamp = self._trigger_nsamp
|
||||
if self._subsampling != 1:
|
||||
raise OvgenError(
|
||||
f'ZeroCrossingTrigger with subsampling != 1 is not implemented '
|
||||
f'(supplied {self._subsampling})')
|
||||
nsamp = self._nsamp
|
||||
|
||||
if not 0 <= index < self._wave.nsamp:
|
||||
return index
|
||||
|
@ -223,7 +228,7 @@ class ZeroCrossingTrigger(Trigger):
|
|||
else: # self._wave[sample] == 0
|
||||
return index + 1
|
||||
|
||||
data = self._wave[index : index + (direction * trigger_nsamp) : direction]
|
||||
data = self._wave[index : index + (direction * nsamp) : direction]
|
||||
intercepts = find(data, test)
|
||||
try:
|
||||
(delta,), value = next(intercepts)
|
||||
|
|
|
@ -58,10 +58,10 @@ class Wave:
|
|||
data *= self.cfg.amplification / self.max_val
|
||||
return data
|
||||
|
||||
def get(self, begin: int, end: int) -> 'np.ndarray[FLOAT]':
|
||||
def _get(self, begin: int, end: int, subsampling: int) -> 'np.ndarray[FLOAT]':
|
||||
""" Copies self.data[begin:end] with zero-padding. """
|
||||
if 0 <= begin and end <= self.nsamp:
|
||||
return self[begin:end]
|
||||
return self[begin:end:subsampling]
|
||||
|
||||
region_len = end - begin
|
||||
|
||||
|
@ -75,23 +75,31 @@ class Wave:
|
|||
delta = self.nsamp - idx # delta < 0
|
||||
assert idx + delta == self.nsamp
|
||||
|
||||
return delta, idx
|
||||
return delta
|
||||
|
||||
delta_begin, begin = constrain(begin)
|
||||
delta_end, end = constrain(end)
|
||||
begin_index = constrain(begin)
|
||||
end_index = region_len + constrain(end)
|
||||
del end
|
||||
data = self[begin+begin_index : begin+end_index : subsampling]
|
||||
|
||||
out = np.zeros(region_len, dtype=FLOAT)
|
||||
# Compute subsampled output ranges
|
||||
out_len = region_len // subsampling
|
||||
out_begin = begin_index // subsampling
|
||||
out_end = out_begin + len(data)
|
||||
# len(data) == ceil((end_index - begin_index) / subsampling)
|
||||
|
||||
out = np.zeros(out_len, dtype=FLOAT)
|
||||
|
||||
out[out_begin : out_end] = data
|
||||
|
||||
# out[0 : region_len]. == self[begin: end]
|
||||
# out[Δbegin : region_len+Δend] == self[begin + Δbegin: end + Δend]
|
||||
out[delta_begin : region_len+delta_end] = self[begin+delta_begin : end+delta_end]
|
||||
return out
|
||||
|
||||
def get_around(self, sample: int, region_len: int):
|
||||
def get_around(self, sample: int, region_len: int, subsampling: int):
|
||||
"""" Copies self.data[...] """
|
||||
region_len *= subsampling
|
||||
end = sample + region_len // 2
|
||||
begin = end - region_len
|
||||
return self.get(begin, end)
|
||||
return self._get(begin, end, subsampling)
|
||||
|
||||
def get_s(self) -> float:
|
||||
"""
|
||||
|
|
|
@ -24,12 +24,12 @@ def cfg(request):
|
|||
|
||||
|
||||
def test_trigger(cfg: CorrelationTriggerConfig):
|
||||
# wave = Wave(None, 'tests/sine440.wav')
|
||||
wave = Wave(None, 'tests/impulse24000.wav')
|
||||
|
||||
iters = 5
|
||||
plot = False
|
||||
x = 24000 - 500
|
||||
x0 = 24000
|
||||
x = x0 - 500
|
||||
trigger = cfg(wave, 4000, subsampling=1)
|
||||
|
||||
if plot:
|
||||
|
@ -45,12 +45,67 @@ def test_trigger(cfg: CorrelationTriggerConfig):
|
|||
|
||||
for i, ax in enumerate(axes):
|
||||
if i:
|
||||
offset2 = trigger.get_trigger(x)
|
||||
print(offset2)
|
||||
assert offset2 == 24000
|
||||
offset = trigger.get_trigger(x)
|
||||
print(offset)
|
||||
assert offset == x0
|
||||
if plot:
|
||||
ax.plot(trigger._buffer, label=str(i))
|
||||
ax.grid()
|
||||
|
||||
if plot:
|
||||
plt.show()
|
||||
|
||||
|
||||
def test_trigger_subsampling(cfg: CorrelationTriggerConfig):
|
||||
wave = Wave(None, 'tests/sine440.wav')
|
||||
# period = 48000 / 440 = 109.(09)*
|
||||
|
||||
iters = 5
|
||||
x0 = 24000
|
||||
subsampling = 4
|
||||
trigger = cfg(wave, nsamp=100, subsampling=subsampling)
|
||||
# real nsamp = nsamp*subsampling
|
||||
# period = 109
|
||||
|
||||
for i in range(1, iters):
|
||||
offset = trigger.get_trigger(x0)
|
||||
print(offset)
|
||||
|
||||
# Debugging CorrelationTrigger.get_trigger:
|
||||
# from matplotlib import pyplot as plt
|
||||
# plt.plot(data)
|
||||
# plt.plot(prev_buffer)
|
||||
# plt.plot(corr)
|
||||
|
||||
# When i=0, the data has 3 peaks, the rightmost taller than the center. The
|
||||
# *tips* of the outer peaks are truncated between `left` and `right`.
|
||||
# After truncation, corr[mid+1] is almost identical to corr[mid], for
|
||||
# reasons I don't understand (mid+1 > mid because dithering?).
|
||||
if not cfg.use_edge_trigger:
|
||||
assert (offset - x0) % subsampling == 0
|
||||
assert abs(offset - x0) < 10
|
||||
|
||||
# The edge trigger activates at x0+1=24001. Likely related: it triggers
|
||||
# when moving from <=0 to >0. This is a necessary evil, in order to
|
||||
# recognize 0-to-positive edges while testing tests/impulse24000.wav .
|
||||
|
||||
else:
|
||||
assert abs(offset - x0) <= 2
|
||||
|
||||
|
||||
def test_trigger_subsampling_edges(cfg: CorrelationTriggerConfig):
|
||||
wave = Wave(None, 'tests/sine440.wav')
|
||||
# period = 48000 / 440 = 109.(09)*
|
||||
|
||||
iters = 5
|
||||
subsampling = 4
|
||||
trigger = cfg(wave, nsamp=100, subsampling=subsampling)
|
||||
# real nsamp = nsamp*subsampling
|
||||
# period = 109
|
||||
|
||||
trigger.get_trigger(0)
|
||||
trigger.get_trigger(-1000)
|
||||
trigger.get_trigger(50000)
|
||||
|
||||
|
||||
# TODO test_period get_period()
|
||||
|
|
|
@ -35,3 +35,18 @@ def test_wave(wave_path):
|
|||
# check for FutureWarning (raised when determining wavfile type)
|
||||
warns = [o for o in w if issubclass(o.category, FutureWarning)]
|
||||
assert not [str(w) for w in warns]
|
||||
|
||||
|
||||
def test_wave_subsampling():
|
||||
wave = Wave(None, 'tests/sine440.wav')
|
||||
# period = 48000 / 440 = 109.(09)*
|
||||
|
||||
wave.get_around(1000, region_len=501, subsampling=4)
|
||||
# len([:region_len:subsampling]) == ceil(region_len / subsampling)
|
||||
# If region_len % subsampling != 0, len() != region_len // subsampling.
|
||||
|
||||
subsampling = 4
|
||||
region = 100 # diameter = region * subsampling
|
||||
for i in [-1000, 50000]:
|
||||
data = wave.get_around(i, region, subsampling)
|
||||
assert (data == 0).all()
|
||||
|
|
Ładowanie…
Reference in New Issue