kopia lustrzana https://github.com/corrscope/corrscope
Rewrite correlation trigger using overlap-save on unwindowed data
The data read from the input _wave (length A+B+_trigger_diameter) is longer than the correlation buffer (length A+B), to ensure that when sliding the buffer across the wave, no edge effects are encountered. This is inspired by overlap-save convolution, but adapted to cross-correlation.pull/403/head
rodzic
9921ad900e
commit
ef2a6b9b98
|
@ -6,6 +6,11 @@
|
|||
- Add support for background images (#388, @Sanqui)
|
||||
- Add support for line outlines (#388, @Sanqui)
|
||||
|
||||
### Major Changes
|
||||
|
||||
- Rewrite the trigger algorithm to enhance determinism (#403)
|
||||
- Triggering still makes mistakes, especially when DC offset varies within a frame (eg. NES 75% pulse changing volumes). This may be addressed in the future.
|
||||
|
||||
### Changelog
|
||||
|
||||
- Fix passing absolute .wav paths into corrscope CLI (#398)
|
||||
|
|
|
@ -11,15 +11,17 @@ from corrscope.spectrum import SpectrumConfig, DummySpectrum, LogFreqSpectrum
|
|||
from corrscope.util import find, obj_name, iround
|
||||
from corrscope.utils.trigger_util import (
|
||||
get_period,
|
||||
UNKNOWN_PERIOD,
|
||||
normalize_buffer,
|
||||
lerp,
|
||||
MIN_AMPLITUDE,
|
||||
abs_max,
|
||||
)
|
||||
from corrscope.utils.windows import leftpad, midpad, rightpad, gaussian_or_zero
|
||||
from corrscope.utils.windows import midpad, gaussian_or_zero
|
||||
from corrscope.wave_common import f32
|
||||
|
||||
if TYPE_CHECKING:
|
||||
import numpy.typing as npt
|
||||
from corrscope.wave import Wave
|
||||
from corrscope.renderer import RendererFrontend
|
||||
|
||||
|
@ -114,12 +116,12 @@ class _Trigger(ABC, Generic[result]):
|
|||
# Subsamples per second
|
||||
self.subsmp_per_s = self._wave.smp_s / self._stride
|
||||
|
||||
frame_dur = 1 / fps
|
||||
# Subsamples per frame
|
||||
self._tsamp_per_frame = self.seconds_to_tsamp(frame_dur)
|
||||
seconds_per_frame = 1 / fps
|
||||
# Full samples per frame
|
||||
self._smp_per_frame = self.seconds_to_samp(seconds_per_frame)
|
||||
|
||||
def seconds_to_tsamp(self, time: float) -> int:
|
||||
return round(time * self.subsmp_per_s)
|
||||
def seconds_to_samp(self, time: float) -> int:
|
||||
return round(time * self._wave.smp_s)
|
||||
|
||||
def custom_line(
|
||||
self, name: str, data: np.ndarray, offset: bool, invert: bool = True
|
||||
|
@ -253,15 +255,6 @@ class PerFrameCache:
|
|||
# CorrelationTrigger
|
||||
|
||||
|
||||
class LagPrevention(KeywordAttrs):
|
||||
max_frames: float = 1
|
||||
transition_frames: float = 0.25
|
||||
|
||||
def __attrs_post_init__(self):
|
||||
validate_param(self, "max_frames", 0, 1)
|
||||
validate_param(self, "transition_frames", 0, self.max_frames)
|
||||
|
||||
|
||||
class CorrelationTriggerConfig(
|
||||
MainTriggerConfig,
|
||||
always_dump="""
|
||||
|
@ -280,21 +273,19 @@ class CorrelationTriggerConfig(
|
|||
slope_strength: float = 0
|
||||
slope_width: float = with_units("period", default=0.07)
|
||||
|
||||
# Correlation detection (meow~ =^_^=)
|
||||
# Correlation detection
|
||||
buffer_strength: float = 1
|
||||
|
||||
# Both data and buffer uses Gaussian windows. std = wave_period * falloff.
|
||||
# get_trigger()
|
||||
data_falloff: float = 1.5
|
||||
|
||||
# _update_buffer()
|
||||
buffer_falloff: float = 0.5
|
||||
|
||||
# Maximum distance to move
|
||||
trigger_diameter: Optional[float] = 0.5
|
||||
# Maximum distance to move (in terms of trigger_ms/trigger_samp)
|
||||
trigger_diameter: float = 0.5
|
||||
|
||||
# Maximum distance to move (in terms of estimated wave period)
|
||||
trigger_radius_periods: Optional[float] = 1.5
|
||||
|
||||
recalc_semitones: float = 1.0
|
||||
lag_prevention: LagPrevention = attr.ib(factory=LagPrevention)
|
||||
|
||||
# _update_buffer
|
||||
responsiveness: float
|
||||
|
@ -308,6 +299,7 @@ class CorrelationTriggerConfig(
|
|||
# region Legacy Aliases
|
||||
trigger_strength = Alias("edge_strength")
|
||||
falloff_width = Alias("buffer_falloff")
|
||||
data_falloff = Alias("trigger_radius_periods")
|
||||
# endregion
|
||||
|
||||
def __attrs_post_init__(self) -> None:
|
||||
|
@ -329,37 +321,53 @@ def validate_param(self, key: str, begin: float, end: float) -> None:
|
|||
@register_trigger(CorrelationTriggerConfig)
|
||||
class CorrelationTrigger(MainTrigger):
|
||||
"""
|
||||
Assume that if get_trigger(x) == x, then data[[x-1, x]] == [<0, >0].
|
||||
- edge detectors [halfN = N//2] > 0.
|
||||
- So wave.get_around(x)[N//2] > 0.
|
||||
- So wave.get_around(x) = [x - N//2 : ...]
|
||||
Correlation-based trigger. it's complicated.
|
||||
|
||||
test_trigger() checks that get_around() works properly, for even/odd N.
|
||||
See Wave.get_around() docstring.
|
||||
The data read from the input _wave (length A+B+_trigger_diameter) is longer than
|
||||
the correlation buffer (length A+B), to ensure that when sliding the buffer
|
||||
across the wave, no edge effects are encountered. This is inspired by
|
||||
overlap-save convolution, but adapted to cross-correlation.
|
||||
"""
|
||||
|
||||
cfg: CorrelationTriggerConfig
|
||||
|
||||
A: int
|
||||
"""(const) Number of subsamples on buffer/step's left (negative) side."""
|
||||
B: int
|
||||
"""(const) Number of subsamples on buffer/step's right (positive) side. Equals A.
|
||||
buffer_nsamp = A+B
|
||||
"""
|
||||
|
||||
_trigger_diameter: int
|
||||
"""(const) Distance (in subsamples) between the smallest and largest positions we can
|
||||
output on a given frame. Approximately equal to 0.5 * (A+B). """
|
||||
|
||||
_corr_buffer: "npt.NDArray[f32]"
|
||||
"""(mutable) [A+B] Amplitude"""
|
||||
|
||||
_edge_finder: "npt.NDArray[f32]"
|
||||
"""(const) [A+B] Amplitude"""
|
||||
|
||||
_prev_period: Optional[int]
|
||||
_prev_slope_finder: "Optional[npt.NDArray[f32]]"
|
||||
"""(mutable) [A+B] Amplitude"""
|
||||
|
||||
_prev_trigger: int
|
||||
_frames_since_spectrum: int
|
||||
_spectrum_calc: DummySpectrum
|
||||
|
||||
@property
|
||||
def scfg(self) -> Optional[SpectrumConfig]:
|
||||
return self.cfg.pitch_tracking
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
"""
|
||||
Correlation-based trigger which looks at a window of `trigger_tsamp` samples.
|
||||
it's complicated
|
||||
"""
|
||||
super().__init__(*args, **kwargs)
|
||||
self._buffer_nsamp = self._tsamp
|
||||
|
||||
# (const) Multiplied by each frame of input audio.
|
||||
# Zeroes out all data older than 1 frame old.
|
||||
self._lag_prevention_window = self._calc_lag_prevention()
|
||||
assert self._lag_prevention_window.dtype == f32
|
||||
self.A = self.B = self._tsamp // 2
|
||||
self._trigger_diameter = int((self.A + self.B) * self.cfg.trigger_diameter)
|
||||
|
||||
# (mutable) Correlated with data (for triggering).
|
||||
# Updated with tightly windowed old data at various pitches.
|
||||
self._buffer = np.zeros(self._buffer_nsamp, dtype=f32) # type: np.ndarray[f32]
|
||||
self._corr_buffer = np.zeros(self.A + self.B, dtype=f32)
|
||||
|
||||
# (const) Added to self._buffer. Nonzero if edge triggering is nonzero.
|
||||
# Left half is -edge_strength, right half is +edge_strength.
|
||||
|
@ -368,61 +376,25 @@ class CorrelationTrigger(MainTrigger):
|
|||
assert self._edge_finder.dtype == f32
|
||||
|
||||
# Will be overwritten on the first frame.
|
||||
self._prev_period: Optional[int] = None
|
||||
self._prev_window: Optional[np.ndarray] = None
|
||||
self._prev_slope_finder: Optional[np.ndarray] = None
|
||||
self._prev_period = None
|
||||
self._prev_slope_finder = None
|
||||
|
||||
self._prev_trigger: int = 0
|
||||
self._prev_trigger = 0
|
||||
|
||||
# (mutable) Log-scaled spectrum
|
||||
self.frames_since_spectrum = 0
|
||||
self._frames_since_spectrum = 0
|
||||
|
||||
if self.scfg:
|
||||
self._spectrum_calc = LogFreqSpectrum(
|
||||
scfg=self.scfg, subsmp_per_s=self.subsmp_per_s, dummy_data=self._buffer
|
||||
scfg=self.scfg,
|
||||
subsmp_per_s=self.subsmp_per_s,
|
||||
dummy_data=self._corr_buffer,
|
||||
)
|
||||
else:
|
||||
self._spectrum_calc = DummySpectrum()
|
||||
|
||||
def _calc_lag_prevention(self) -> np.ndarray:
|
||||
"""Returns input-data window,
|
||||
which zeroes out all data older than 1-ish frame old.
|
||||
See https://github.com/corrscope/corrscope/wiki/Correlation-Trigger
|
||||
"""
|
||||
N = self._buffer_nsamp
|
||||
halfN = N // 2
|
||||
|
||||
# - Create a cosine taper of `width` <= 1 frame
|
||||
# - Right-pad(value=1, len=1 frame)
|
||||
# - Place in left half of N-sample buffer.
|
||||
|
||||
# To avoid cutting off data, use a narrow transition zone (invariant to stride).
|
||||
lag_prevention = self.cfg.lag_prevention
|
||||
tsamp_per_frame = self._tsamp_per_frame
|
||||
transition_nsamp = round(tsamp_per_frame * lag_prevention.transition_frames)
|
||||
|
||||
# Left half of a Hann cosine taper
|
||||
# Width (type=subsample) = min(frame * lag_prevention, 1 frame)
|
||||
assert transition_nsamp <= tsamp_per_frame
|
||||
width = transition_nsamp
|
||||
taper = windows.hann(width * 2)[:width]
|
||||
|
||||
# Right-pad=1 taper to lag_prevention.max_frames long [t-#*f, t]
|
||||
taper = rightpad(taper, iround(tsamp_per_frame * lag_prevention.max_frames))
|
||||
|
||||
# Left-pad=0 taper to left `halfN` of data_taper [t-halfN, t]
|
||||
taper = leftpad(taper, halfN)
|
||||
|
||||
# Generate left half-taper to prevent correlating with 1-frame-old data.
|
||||
# Right-pad=1 taper to [t-halfN, t-halfN+N]
|
||||
# TODO switch to rightpad()? Does it return f32 or not?
|
||||
data_taper = np.ones(N, dtype=f32)
|
||||
data_taper[:halfN] = np.minimum(data_taper[:halfN], taper)
|
||||
|
||||
return data_taper
|
||||
|
||||
def _calc_step(self) -> np.ndarray:
|
||||
"""Step function used for approximate edge triggering."""
|
||||
"""Step function used for approximate edge triggering. Has length A+B."""
|
||||
|
||||
# Increasing buffer_falloff (width of buffer)
|
||||
# causes buffer to affect triggering, more than the step function.
|
||||
|
@ -431,103 +403,104 @@ class CorrelationTrigger(MainTrigger):
|
|||
cfg = self.cfg
|
||||
edge_strength = cfg.edge_strength * cfg.buffer_falloff
|
||||
|
||||
N = self._buffer_nsamp
|
||||
halfN = N // 2
|
||||
|
||||
step = np.empty(N, dtype=f32) # type: np.ndarray[f32]
|
||||
step[:halfN] = -edge_strength / 2
|
||||
step[halfN:] = edge_strength / 2
|
||||
step *= windows.gaussian(N, std=halfN / 3)
|
||||
step = np.empty(self.A + self.B, dtype=f32) # type: np.ndarray[f32]
|
||||
step[: self.A] = -edge_strength / 2
|
||||
step[self.A :] = edge_strength / 2
|
||||
step *= windows.gaussian(self.A + self.B, std=self.A / 3)
|
||||
return step
|
||||
|
||||
def _calc_slope_finder(self, period: float) -> np.ndarray:
|
||||
"""Called whenever period changes substantially.
|
||||
Returns a kernel to be correlated with input data,
|
||||
to find positive slopes."""
|
||||
Returns a kernel to be correlated with input data to find positive slopes,
|
||||
with length A+B."""
|
||||
|
||||
N = self._buffer_nsamp
|
||||
halfN = N // 2
|
||||
slope_finder = np.zeros(N)
|
||||
slope_finder = np.zeros(self.A + self.B)
|
||||
|
||||
cfg = self.cfg
|
||||
slope_width = max(iround(cfg.slope_width * period), 1)
|
||||
slope_strength = cfg.slope_strength * cfg.buffer_falloff
|
||||
|
||||
slope_finder[halfN - slope_width : halfN] = -slope_strength
|
||||
slope_finder[halfN : halfN + slope_width] = slope_strength
|
||||
slope_finder[self.A - slope_width : self.A] = -slope_strength
|
||||
slope_finder[self.A : self.A + slope_width] = slope_strength
|
||||
return slope_finder
|
||||
|
||||
# end setup
|
||||
|
||||
# begin per-frame
|
||||
def get_trigger(self, index: int, cache: "PerFrameCache") -> TriggerResult:
|
||||
N = self._buffer_nsamp
|
||||
def get_trigger(self, pos: int, cache: "PerFrameCache") -> TriggerResult:
|
||||
cfg = self.cfg
|
||||
|
||||
# Get data (1D, downmixed to mono)
|
||||
stride = self._stride
|
||||
data = self._wave.get_around(index, N, stride)
|
||||
|
||||
# Convert sizes to full samples (not trigger subsamples) when indexing into
|
||||
# _wave.
|
||||
|
||||
# _trigger_diameter is defined as inclusive. The length of correlate_valid()'s
|
||||
# corr variable is (A + _trigger_diameter + B) - (A + B) + 1, or
|
||||
# _trigger_diameter + 1. This gives us a possible triggering range of
|
||||
# _trigger_diameter inclusive, which is what we want.
|
||||
data_nsubsmp = self.A + self._trigger_diameter + self.B
|
||||
|
||||
trigger_begin = max(
|
||||
pos - self._smp_per_frame, pos - self._trigger_diameter // 2
|
||||
)
|
||||
data_begin = trigger_begin - stride * self.A
|
||||
|
||||
# Get subsampled data (1D, downmixed to mono)
|
||||
data = self._wave.get_padded(
|
||||
data_begin, data_begin + stride * data_nsubsmp, stride
|
||||
)
|
||||
assert data.size == data_nsubsmp, (data.size, data_nsubsmp)
|
||||
|
||||
if cfg.sign_strength != 0:
|
||||
signs = sign_times_peak(data)
|
||||
data += cfg.sign_strength * signs
|
||||
|
||||
# Remove mean from data
|
||||
data -= np.add.reduce(data) / N
|
||||
data -= np.add.reduce(data) / data.size
|
||||
|
||||
# Window data
|
||||
# Use period to recompute slope finder (if enabled) and restrict trigger
|
||||
# diameter.
|
||||
period = get_period(data, self.subsmp_per_s, self.cfg.max_freq, self)
|
||||
cache.period = period * stride
|
||||
|
||||
semitones = self._is_window_invalid(period)
|
||||
# If pitch changed...
|
||||
if semitones:
|
||||
# Gaussian window
|
||||
period_symmetric_window = gaussian_or_zero(N, period * cfg.data_falloff)
|
||||
|
||||
# Left-sided falloff
|
||||
lag_prevention_window = self._lag_prevention_window
|
||||
|
||||
# Both combined.
|
||||
window = np.minimum(period_symmetric_window, lag_prevention_window)
|
||||
|
||||
# Slope finder
|
||||
slope_finder = self._calc_slope_finder(period)
|
||||
|
||||
data *= window
|
||||
|
||||
# If pitch tracking enabled, rescale buffer to match data's pitch.
|
||||
if self.scfg and (data != 0).any():
|
||||
# Mutates self._buffer.
|
||||
self.spectrum_rescale_buffer(data)
|
||||
|
||||
self._prev_period = period
|
||||
self._prev_window = window
|
||||
self._prev_slope_finder = slope_finder
|
||||
else:
|
||||
window = self._prev_window
|
||||
slope_finder = self._prev_slope_finder
|
||||
|
||||
data *= window
|
||||
# array[A+B] Amplitude
|
||||
corr_kernel: np.ndarray = self._corr_buffer * self.cfg.buffer_strength
|
||||
corr_kernel += self._edge_finder
|
||||
corr_kernel += slope_finder
|
||||
|
||||
prev_buffer: np.ndarray = self._buffer * self.cfg.buffer_strength
|
||||
prev_buffer += self._edge_finder + slope_finder
|
||||
|
||||
# Calculate correlation
|
||||
if self.cfg.trigger_diameter is not None:
|
||||
radius = round(N * self.cfg.trigger_diameter / 2)
|
||||
# Don't pick peaks more than `period * trigger_radius_periods` away from the
|
||||
# center.
|
||||
if cfg.trigger_radius_periods and period != UNKNOWN_PERIOD:
|
||||
trigger_radius = round(period * cfg.trigger_radius_periods)
|
||||
else:
|
||||
radius = None
|
||||
trigger_radius = None
|
||||
|
||||
trigger_score = correlate_data(data, prev_buffer, radius)
|
||||
peak_offset = trigger_score.peak
|
||||
trigger = index + (stride * peak_offset)
|
||||
# Find correlation peak.
|
||||
peak_offset = correlate_valid(data, corr_kernel, trigger_radius)
|
||||
trigger = trigger_begin + (stride * peak_offset)
|
||||
|
||||
del data
|
||||
|
||||
Ntrigger = self.A + self.B
|
||||
if self.post:
|
||||
new_data = self._wave.get_around(trigger, N, stride)
|
||||
cache.mean = np.add.reduce(new_data) / N
|
||||
new_data = self._wave.get_around(trigger, Ntrigger, stride)
|
||||
cache.mean = np.add.reduce(new_data) / Ntrigger
|
||||
|
||||
# Apply post trigger (before updating correlation buffer)
|
||||
trigger = self.post.get_trigger(trigger, cache)
|
||||
|
@ -536,12 +509,12 @@ class CorrelationTrigger(MainTrigger):
|
|||
self._prev_trigger = trigger = max(trigger, self._prev_trigger)
|
||||
|
||||
# Update correlation buffer (distinct from visible area)
|
||||
aligned = self._wave.get_around(trigger, N, stride)
|
||||
aligned = self._wave.get_around(trigger, Ntrigger, stride)
|
||||
if cache.mean is None:
|
||||
cache.mean = np.add.reduce(aligned) / N
|
||||
cache.mean = np.add.reduce(aligned) / Ntrigger
|
||||
self._update_buffer(aligned, cache)
|
||||
|
||||
self.frames_since_spectrum += 1
|
||||
self._frames_since_spectrum += 1
|
||||
|
||||
self.offset_viewport(peak_offset)
|
||||
|
||||
|
@ -560,10 +533,10 @@ class CorrelationTrigger(MainTrigger):
|
|||
|
||||
# Setup
|
||||
scfg = self.scfg
|
||||
N = self._buffer_nsamp
|
||||
if self.frames_since_spectrum < self.scfg.min_frames_between_recompute:
|
||||
Ntrigger = self._corr_buffer.size
|
||||
if self._frames_since_spectrum < self.scfg.min_frames_between_recompute:
|
||||
return
|
||||
self.frames_since_spectrum = 0
|
||||
self._frames_since_spectrum = 0
|
||||
|
||||
calc_spectrum = self._spectrum_calc.calc_spectrum
|
||||
|
||||
|
@ -573,7 +546,7 @@ class CorrelationTrigger(MainTrigger):
|
|||
assert not np.any(np.isnan(spectrum))
|
||||
|
||||
# Compute log-frequency spectrum of `self._buffer`.
|
||||
prev_spectrum = calc_spectrum(self._buffer)
|
||||
prev_spectrum = calc_spectrum(self._corr_buffer)
|
||||
# Don't normalize self._spectrum. It was already normalized when being assigned.
|
||||
|
||||
# Rescale `self._buffer` until its pitch matches `data`.
|
||||
|
@ -582,18 +555,20 @@ class CorrelationTrigger(MainTrigger):
|
|||
).peak
|
||||
if resample_notes != 0:
|
||||
# If we want to double pitch, we must divide data length by 2.
|
||||
new_len = iround(N / 2 ** (resample_notes / scfg.notes_per_octave))
|
||||
new_len = iround(Ntrigger / 2 ** (resample_notes / scfg.notes_per_octave))
|
||||
|
||||
def rescale_mut(in_buf):
|
||||
def rescale_mut(corr_kernel_mut):
|
||||
buf = np.interp(
|
||||
np.linspace(0, 1, new_len), np.linspace(0, 1, N), in_buf
|
||||
np.linspace(0, 1, new_len),
|
||||
np.linspace(0, 1, Ntrigger),
|
||||
corr_kernel_mut,
|
||||
)
|
||||
# assert len(buf) == new_len
|
||||
buf = midpad(buf, N)
|
||||
in_buf[:] = buf
|
||||
buf = midpad(buf, Ntrigger)
|
||||
corr_kernel_mut[:] = buf
|
||||
|
||||
# Copy+resample self._buffer.
|
||||
rescale_mut(self._buffer)
|
||||
rescale_mut(self._corr_buffer)
|
||||
|
||||
def _is_window_invalid(self, period: int) -> Union[bool, float]:
|
||||
"""
|
||||
|
@ -638,10 +613,10 @@ class CorrelationTrigger(MainTrigger):
|
|||
responsiveness = self.cfg.responsiveness
|
||||
|
||||
N = len(data)
|
||||
if N != self._buffer_nsamp:
|
||||
if N != self._corr_buffer.size:
|
||||
raise ValueError(
|
||||
f"invalid data length {len(data)} does not match "
|
||||
f"CorrelationTrigger {self._buffer_nsamp}"
|
||||
f"CorrelationTrigger {self._corr_buffer.size}"
|
||||
)
|
||||
|
||||
# New waveform
|
||||
|
@ -651,25 +626,46 @@ class CorrelationTrigger(MainTrigger):
|
|||
data *= window
|
||||
|
||||
# Old buffer
|
||||
normalize_buffer(self._buffer)
|
||||
self._buffer = lerp(self._buffer, data, responsiveness)
|
||||
normalize_buffer(self._corr_buffer)
|
||||
self._corr_buffer = lerp(self._corr_buffer, data, responsiveness)
|
||||
|
||||
|
||||
def correlate_valid(
|
||||
data: np.ndarray, corr_kernel: np.ndarray, radius: Optional[int]
|
||||
) -> int:
|
||||
"""Returns kernel offset (≥ 0) relative to data, which maximizes correlation.
|
||||
kernel is not allowed to move past the boundaries of data.
|
||||
|
||||
If radius is set, the returned offset is limited to ±radius from the center of
|
||||
correlation.
|
||||
"""
|
||||
corr = signal.correlate_valid(data, corr_kernel) # returns double, not single/f32
|
||||
begin_offset = 0
|
||||
|
||||
if radius is not None:
|
||||
Ncorr = len(corr)
|
||||
mid = Ncorr // 2
|
||||
|
||||
left = max(mid - radius, 0)
|
||||
right = min(mid + radius + 1, Ncorr)
|
||||
|
||||
corr = corr[left:right]
|
||||
begin_offset = left
|
||||
|
||||
# Find optimal offset
|
||||
peak_offset = np.argmax(corr) + begin_offset # type: int
|
||||
return peak_offset
|
||||
|
||||
|
||||
@attr.dataclass
|
||||
class CorrelationResult:
|
||||
class SpectrumResult:
|
||||
peak: int
|
||||
corr: np.ndarray
|
||||
|
||||
|
||||
@attr.dataclass
|
||||
class InterpolatedCorrelationResult:
|
||||
peak: float
|
||||
corr: np.ndarray
|
||||
|
||||
|
||||
def correlate_data(
|
||||
def correlate_spectrum(
|
||||
data: np.ndarray, prev_buffer: np.ndarray, radius: Optional[int]
|
||||
) -> CorrelationResult:
|
||||
) -> SpectrumResult:
|
||||
"""
|
||||
This is confusing.
|
||||
|
||||
|
@ -681,6 +677,10 @@ def correlate_data(
|
|||
or we must slide data to the left (by sliding index to the right):
|
||||
- correlate(data, prev_buffer)
|
||||
- trigger = index + peak_offset
|
||||
|
||||
In correlate_spectrum(), I used to use parabolic() on the return value,
|
||||
but unfortunately it was unreliable and caused Plok Beach bass to jitter,
|
||||
so I turned it off (resulting in the same code as correlate_data).
|
||||
"""
|
||||
N = len(data)
|
||||
corr = signal.correlate(data, prev_buffer) # returns double, not single/f32
|
||||
|
@ -700,18 +700,7 @@ def correlate_data(
|
|||
# argmax(corr) == mid + peak_offset == (data >> peak_offset)
|
||||
# peak_offset == argmax(corr) - mid
|
||||
peak_offset = np.argmax(corr) - mid # type: int
|
||||
return CorrelationResult(peak_offset, corr)
|
||||
|
||||
|
||||
def correlate_spectrum(
|
||||
data: np.ndarray, prev_buffer: np.ndarray, radius: Optional[int]
|
||||
) -> CorrelationResult:
|
||||
"""
|
||||
I used to use parabolic() on the return value,
|
||||
but unfortunately it was unreliable and caused Plok Beach bass to jitter,
|
||||
so I turned it off (resulting in the same code as correlate_data).
|
||||
"""
|
||||
return correlate_data(data, prev_buffer, radius)
|
||||
return SpectrumResult(peak_offset, corr)
|
||||
|
||||
|
||||
def parabolic(xint: int, ys: np.ndarray) -> float:
|
||||
|
|
|
@ -2,6 +2,37 @@ from bisect import bisect_left
|
|||
|
||||
import numpy as np
|
||||
|
||||
def correlate_valid(buffer: np.ndarray, kernel: np.ndarray) -> np.ndarray:
|
||||
"""
|
||||
Based on scipy.correlate. buffer must be longer (or equal) to kernel. Returns an
|
||||
array of length (buffer - kernel + 1) without edge effects, much like mode="valid".
|
||||
"""
|
||||
buffer = np.asarray(buffer)
|
||||
kernel = np.asarray(kernel)
|
||||
|
||||
assert buffer.ndim == kernel.ndim == 1
|
||||
kernel = _reverse_and_conj(kernel)
|
||||
|
||||
# Taken from scipy fftconvolve()
|
||||
|
||||
kernel_support = len(kernel) - 1
|
||||
out_nsamp = len(buffer) - kernel_support
|
||||
|
||||
# fft_nsamp = 1 << (out_nsamp - 1).bit_length()
|
||||
fft_nsamp = next_fast_len(len(buffer))
|
||||
assert fft_nsamp >= out_nsamp
|
||||
|
||||
# return convolve(in1, _reverse_and_conj(in2), mode, method)
|
||||
sp1 = np.fft.rfft(buffer, fft_nsamp)
|
||||
# Already reversed above.
|
||||
sp2 = np.fft.rfft(kernel, fft_nsamp)
|
||||
|
||||
corr = np.fft.irfft(sp1 * sp2, fft_nsamp)
|
||||
# Slice the returned data to the valid region, for complex math reasons.
|
||||
ret = corr[kernel_support : kernel_support + out_nsamp].copy()
|
||||
return ret
|
||||
|
||||
|
||||
|
||||
def correlate(in1: np.ndarray, in2: np.ndarray) -> np.ndarray:
|
||||
"""
|
||||
|
|
|
@ -218,7 +218,7 @@ class Wave:
|
|||
data = data.reshape(-1, 1)
|
||||
return data
|
||||
|
||||
def _get(self, begin: int, end: int, subsampling: int) -> np.ndarray:
|
||||
def get_padded(self, begin: int, end: int, subsampling: int) -> np.ndarray:
|
||||
"""Copies self.data[begin:end] with zero-padding."""
|
||||
if 0 <= begin and end <= self.nsamp:
|
||||
return self[begin:end:subsampling]
|
||||
|
@ -264,7 +264,7 @@ class Wave:
|
|||
|
||||
begin = sample - (return_nsamp // 2) * stride
|
||||
end = begin + return_nsamp * stride
|
||||
return self._get(begin, end, stride)
|
||||
return self.get_padded(begin, end, stride)
|
||||
|
||||
def get_s(self) -> float:
|
||||
"""
|
||||
|
|
|
@ -16,7 +16,6 @@ from corrscope.triggers import (
|
|||
PerFrameCache,
|
||||
ZeroCrossingTriggerConfig,
|
||||
SpectrumConfig,
|
||||
correlate_data,
|
||||
correlate_spectrum,
|
||||
)
|
||||
from corrscope.wave import Wave
|
||||
|
@ -35,7 +34,7 @@ def trigger_template(**kwargs) -> CorrelationTriggerConfig:
|
|||
|
||||
|
||||
@fixture
|
||||
@parametrize("trigger_diameter", [None, 0.5])
|
||||
@parametrize("trigger_diameter", [0.5, 1.0])
|
||||
@parametrize("pitch_tracking", [None, SpectrumConfig()])
|
||||
@parametrize("slope_strength", [0, 100])
|
||||
@parametrize("sign_strength", [0, 1])
|
||||
|
@ -94,7 +93,7 @@ def test_trigger(trigger_cfg, is_odd: bool, post_trigger):
|
|||
offset = trigger.get_trigger(x, PerFrameCache()).result
|
||||
assert offset == x0, offset
|
||||
if plot:
|
||||
ax.plot(trigger._buffer, label=str(i))
|
||||
ax.plot(trigger._corr_buffer, label=str(i))
|
||||
ax.grid()
|
||||
|
||||
if plot:
|
||||
|
@ -241,8 +240,7 @@ def test_post_trigger_radius():
|
|||
# Test pitch-tracking (spectrum)
|
||||
|
||||
|
||||
@parametrize("correlate", [correlate_data, correlate_spectrum])
|
||||
def test_correlate_offset(correlate):
|
||||
def test_correlate_offset():
|
||||
"""
|
||||
Catches bug where writing N instead of Ncorr
|
||||
prevented function from returning positive numbers.
|
||||
|
@ -256,7 +254,7 @@ def test_correlate_offset(correlate):
|
|||
# Ensure autocorrelation on random data returns peak at 0.
|
||||
N = 100
|
||||
spectrum = np.random.random(N)
|
||||
assert correlate(spectrum, spectrum, 12).peak == approx(0)
|
||||
assert correlate_spectrum(spectrum, spectrum, 12).peak == approx(0)
|
||||
|
||||
# Ensure cross-correlation of time-shifted impulses works.
|
||||
# Assume wave where y=[i==99].
|
||||
|
@ -267,10 +265,12 @@ def test_correlate_offset(correlate):
|
|||
|
||||
# We need to slide `left` to the right by 10 samples, and vice versa.
|
||||
for radius in [None, 12]:
|
||||
assert correlate(data=left, prev_buffer=right, radius=radius).peak == approx(10)
|
||||
assert correlate(data=right, prev_buffer=left, radius=radius).peak == approx(
|
||||
-10
|
||||
)
|
||||
assert correlate_spectrum(
|
||||
data=left, prev_buffer=right, radius=radius
|
||||
).peak == approx(10)
|
||||
assert correlate_spectrum(
|
||||
data=right, prev_buffer=left, radius=radius
|
||||
).peak == approx(-10)
|
||||
|
||||
|
||||
# Test the ability to load legacy TriggerConfig
|
||||
|
|
Ładowanie…
Reference in New Issue