diff --git a/corrscope/corrscope.py b/corrscope/corrscope.py index 6ec55d1..f25b23a 100644 --- a/corrscope/corrscope.py +++ b/corrscope/corrscope.py @@ -20,6 +20,7 @@ from corrscope.triggers import ( CorrelationTriggerConfig, PerFrameCache, CorrelationTrigger, + SpectrumConfig, ) from corrscope.util import pushd, coalesce from corrscope.wave import Wave, Flatten @@ -118,6 +119,7 @@ def default_config(**kwargs) -> Config: responsiveness=0.5, buffer_falloff=0.5, use_edge_trigger=False, + pitch_invariance=SpectrumConfig() # Removed due to speed hit. # post=LocalPostTriggerConfig(strength=0.1), ), diff --git a/corrscope/triggers.py b/corrscope/triggers.py index ae5da83..75eea0c 100644 --- a/corrscope/triggers.py +++ b/corrscope/triggers.py @@ -1,6 +1,18 @@ import warnings from abc import ABC, abstractmethod -from typing import TYPE_CHECKING, Type, Tuple, Optional, ClassVar, Callable, Union +from typing import ( + TYPE_CHECKING, + Type, + Tuple, + Optional, + ClassVar, + Callable, + Union, + NewType, + Sequence, + List, + Any, +) import attr import numpy as np @@ -105,10 +117,184 @@ class PerFrameCache: # CorrelationTrigger -class CorrelationTriggerConfig(ITriggerConfig): +class SpectrumConfig(KeywordAttrs): + """ + # Rationale: + If no basal frequency note-bands are to be truncated, + the spectrum must have freq resolution + `min_hz * (2 ** 1/notes_per_octave - 1)`. + + At 20hz, 10 octaves, 12 notes/octave, this is 1.19Hz fft freqs. + Our highest band must be + `min_hz * 2**octaves`, + leading to nearly 20K freqs, which produces an somewhat slow FFT. + + So increase min_hz and decrease octaves and notes_per_octave. + -------- + Using a Constant-Q transform may eliminate performance concerns? + """ + + # Spectrum X density + min_hz: float = 20 + octaves: int = 8 + notes_per_octave: int = 6 + + # Spectrum Y power + exponent: float = 1 + divide_by_freq: bool = True + + # Spectral alignment and resampling + pitch_estimate_boost: float = 1.2 + add_current_to_history: float = 0.1 # FIXME why does this exist? + max_octaves_to_resample: float = 1.0 + + @property + def max_notes_to_resample(self) -> int: + return round(self.notes_per_octave * self.max_octaves_to_resample) + + # Time-domain history parameters + min_frames_between_recompute: int = 6 + frames_to_lookbehind: int = 2 + + +class DummySpectrum: + # noinspection PyMethodMayBeStatic,PyUnusedLocal + def calc_spectrum(self, data: np.ndarray) -> np.ndarray: + return np.array([]) + + +# Indices are linearly spaced in FFT. Notes are exponentially spaced. +# FFT is grouped into notes. +FFTIndex = NewType("FFTIndex", int) +# Very hacky and weird. Maybe it's not worth getting mypy to pass. +if TYPE_CHECKING: + FFTIndexArray = Any # mypy +else: + FFTIndexArray = "np.ndarray[FFTIndex]" # pycharm + + +class LogFreqSpectrum(DummySpectrum): + """ + Invariants: + - len(note_fenceposts) == n_fencepost + + - rfft()[ : note_fenceposts[0]] is NOT used. + - rfft()[note_fenceposts[-1] : ] is NOT used. + - rfft()[note_fenceposts[0] : note_fenceposts[1]] becomes a note. + """ + + n_fftindex: FFTIndex # Determines frequency resolution, not range. + note_fenceposts: FFTIndexArray + n_fencepost: int + + def __init__(self, scfg: SpectrumConfig, subsmp_s: float, dummy_data: np.ndarray): + self.scfg = scfg + + n_fftindex: FFTIndex = signal.next_fast_len(len(dummy_data)) + + # Increase n_fftindex until every note has nonzero width. + while True: + # Compute parameters + self.min_hz = scfg.min_hz + self.max_hz = self.min_hz * 2 ** scfg.octaves + n_fencepost = scfg.notes_per_octave * scfg.octaves + 1 + + note_fenceposts_hz = np.geomspace( + self.min_hz, self.max_hz, n_fencepost, dtype=FLOAT + ) + + # Convert fenceposts to FFTIndex + fft_from_hertz = n_fftindex / subsmp_s + note_fenceposts: FFTIndexArray = ( + fft_from_hertz * note_fenceposts_hz + ).astype(np.int32) + note_widths = np.diff(note_fenceposts) + + if np.any(note_widths == 0): + n_fftindex = signal.next_fast_len(n_fftindex + n_fftindex // 5 + 1) + continue + else: + break + + self.n_fftindex = n_fftindex # Passed to rfft() to automatically zero-pad data. + self.note_fenceposts = note_fenceposts + self.n_fencepost = len(note_fenceposts) + + def calc_spectrum(self, data: np.ndarray) -> np.ndarray: + """ Unfortunately converting to FLOAT (single) adds too much overhead. + + Input: Time-domain signal to be analyzed. + Output: Frequency-domain spectrum with exponentially-spaced notes. + - ret[note] = nonnegative float. + """ + scfg = self.scfg + + # Compute FFT spectrum[freq] + spectrum = np.fft.rfft(data, self.n_fftindex) + spectrum = abs(spectrum) + if scfg.exponent != 1: + spectrum **= scfg.exponent + + # Compute energy of each note + # spectrum_per_note[note] = np.ndarray[float] + spectrum_per_note: List[np.ndarray] = split(spectrum, self.note_fenceposts) + + # energy_per_note[note] = float + energy_per_note: np.ndarray + + # np.add.reduce is much faster than np.sum/mean. + if scfg.divide_by_freq: + energy_per_note = np.array( + [np.add.reduce(region) / len(region) for region in spectrum_per_note] + ) + else: + energy_per_note = np.array( + [np.add.reduce(region) for region in spectrum_per_note] + ) + + assert len(energy_per_note) == self.n_fencepost - 1 + return energy_per_note + + +def split(data: np.ndarray, fenceposts: Sequence[FFTIndex]) -> List[np.ndarray]: + """ Based off np.split(), but faster. + Unlike np.split, does not include data before fenceposts[0] or after fenceposts[-1]. + """ + sub_arys = [] + ndata = len(data) + for i in range(len(fenceposts) - 1): + st = fenceposts[i] + end = fenceposts[i + 1] + if not st < ndata: + break + region = data[st:end] + sub_arys.append(region) + + return sub_arys + + +class CircularArray: + def __init__(self, size: int, *dims: int): + self.size = size + self.buf = np.zeros((size, *dims)) + self.index = 0 + + def push(self, arr: np.ndarray) -> None: + if self.size == 0: + return + self.buf[self.index] = arr + self.index = (self.index + 1) % self.size + + def peek(self) -> np.ndarray: + """Return is borrowed from self.buf. + Do NOT push to self while borrow is alive.""" + return self.buf[self.index] + + +class CorrelationTriggerConfig(ITriggerConfig, always_dump="pitch_invariance"): # get_trigger edge_strength: float - trigger_diameter: float = 0.5 + trigger_diameter: Optional[float] = None trigger_falloff: Tuple[float, float] = (4.0, 1.0) recalc_semitones: float = 1.0 @@ -118,6 +304,9 @@ class CorrelationTriggerConfig(ITriggerConfig): responsiveness: float buffer_falloff: float # Gaussian std = wave_period * buffer_falloff + # Pitch invariance = compute spectrum. + pitch_invariance: Optional["SpectrumConfig"] = None + # region Legacy Aliases trigger_strength = Alias("edge_strength") falloff_width = Alias("buffer_falloff") @@ -152,6 +341,10 @@ class CorrelationTriggerConfig(ITriggerConfig): class CorrelationTrigger(Trigger): cfg: CorrelationTriggerConfig + @property + def scfg(self) -> SpectrumConfig: + return self.cfg.pitch_invariance + def __init__(self, *args, **kwargs): """ Correlation-based trigger which looks at a window of `trigger_tsamp` samples. @@ -181,6 +374,24 @@ class CorrelationTrigger(Trigger): self._prev_period: Optional[int] = None self._prev_window: Optional[np.ndarray] = None + # (mutable) Log-scaled spectrum + self.frames_since_spectrum = 0 + + if self.scfg: + self._spectrum_calc = LogFreqSpectrum( + scfg=self.scfg, + subsmp_s=self._wave.smp_s / self._stride, + dummy_data=self._buffer, + ) + self._spectrum = self._spectrum_calc.calc_spectrum(self._buffer) + self.history = CircularArray( + self.scfg.frames_to_lookbehind, self._buffer_nsamp + ) + else: + self._spectrum_calc = DummySpectrum() + self._spectrum = np.array([0]) + self.history = CircularArray(0, self._buffer_nsamp) + def _calc_data_taper(self) -> np.ndarray: """ Input data window. Zeroes out all data older than 1 frame old. See https://github.com/nyanpasu64/corrscope/wiki/Correlation-Trigger @@ -242,6 +453,7 @@ class CorrelationTrigger(Trigger): # begin per-frame def get_trigger(self, index: int, cache: "PerFrameCache") -> int: N = self._buffer_nsamp + cfg = self.cfg # Get data stride = self._stride @@ -253,50 +465,39 @@ class CorrelationTrigger(Trigger): period = get_period(data) cache.period = period * stride - if self._is_window_invalid(period): - diameter, falloff = [round(period * x) for x in self.cfg.trigger_falloff] + semitones = self._is_window_invalid(period) + # If pitch changed... + if semitones: + diameter, falloff = [round(period * x) for x in cfg.trigger_falloff] falloff_window = cosine_flat(N, diameter, falloff) window = np.minimum(falloff_window, self._data_taper) + # If pitch invariance enabled, rescale buffer to match data's pitch. + if self.scfg and (data != 0).any(): + if isinstance(semitones, float): + peak_semitones = semitones + else: + peak_semitones = None + self.spectrum_rescale_buffer(data, peak_semitones) + self._prev_period = period self._prev_window = window else: window = self._prev_window + self.history.push(data) data *= window - # prev_buffer - prev_buffer = self._windowed_step + self._buffer + prev_buffer: np.ndarray = self._buffer.copy() + prev_buffer += self._windowed_step # Calculate correlation - """ - If offset < optimal, we need to `offset += positive`. - - The peak will appear near the right of `data`. + if self.cfg.trigger_diameter is not None: + radius = round(N * self.cfg.trigger_diameter / 2) + else: + radius = None - Either we must slide prev_buffer to the right: - - correlate(data, prev_buffer) - - trigger = offset + peak_offset - - Or we must slide data to the left (by sliding offset to the right): - - correlate(prev_buffer, data) - - trigger = offset - peak_offset - """ - corr = signal.correlate(data, prev_buffer) # returns double, not single/FLOAT - assert len(corr) == 2 * N - 1 - - # Find optimal offset (within trigger_diameter, default=±N/4) - mid = N - 1 - radius = round(N * self.cfg.trigger_diameter / 2) - - left = mid - radius - right = mid + radius + 1 - - corr = corr[left:right] - mid = mid - left - - # argmax(corr) == mid + peak_offset == (data >> peak_offset) - # peak_offset == argmax(corr) - mid - peak_offset = np.argmax(corr) - mid # type: int + peak_offset = self.correlate_offset(data, prev_buffer, radius) trigger = index + (stride * peak_offset) # Apply post trigger (before updating correlation buffer) @@ -306,11 +507,108 @@ class CorrelationTrigger(Trigger): # Update correlation buffer (distinct from visible area) aligned = self._wave.get_around(trigger, self._buffer_nsamp, stride) self._update_buffer(aligned, cache) + self.frames_since_spectrum += 1 return trigger - def _is_window_invalid(self, period: int) -> bool: - """ Returns True if pitch has changed more than `recalc_semitones`. """ + def spectrum_rescale_buffer( + self, data: np.ndarray, peak_semitones: Optional[float] + ) -> None: + """Rewrites self._spectrum, and possibly rescales self._buffer.""" + + scfg = self.scfg + N = self._buffer_nsamp + + if self.frames_since_spectrum < self.scfg.min_frames_between_recompute: + return + self.frames_since_spectrum = 0 + + spectrum = self._spectrum_calc.calc_spectrum(data) + normalize_buffer(spectrum) + + # Don't normalize self._spectrum. It was already normalized when being assigned. + prev_spectrum = self._spectrum_calc.calc_spectrum(self.history.peek()) + prev_spectrum += scfg.add_current_to_history * spectrum + + # rewrite spectrum + self._spectrum = spectrum + + assert not np.any(np.isnan(spectrum)) + + # Find spectral correlation peak, + # but prioritize "changing pitch by ???". + if peak_semitones is not None: + boost_x = int(round(peak_semitones / 12 * scfg.notes_per_octave)) + boost_y: float = scfg.pitch_estimate_boost + else: + boost_x = 0 + boost_y = 1.0 + + # If we want to double pitch... + resample_notes = self.correlate_offset( + spectrum, + prev_spectrum, + scfg.max_notes_to_resample, + boost_x=boost_x, + boost_y=boost_y, + ) + if resample_notes != 0: + # we must divide sampling rate by 2. + new_len = int(round(N / 2 ** (resample_notes / scfg.notes_per_octave))) + + # Copy+resample self._buffer. + self._buffer = np.interp( + np.linspace(0, 1, new_len), np.linspace(0, 1, N), self._buffer + ) + # assert len(self._buffer) == new_len + self._buffer = midpad(self._buffer, N) + + @staticmethod + def correlate_offset( + data: np.ndarray, + prev_buffer: np.ndarray, + radius: Optional[int], + boost_x: int = 0, + boost_y: float = 1.0, + ) -> int: + """ + This is confusing. + + If data index < optimal, data will be too far to the right, + and we need to `index += positive`. + - The peak will appear near the right of `data`. + + Either we must slide prev_buffer to the right, + or we must slide data to the left (by sliding index to the right): + - correlate(data, prev_buffer) + - trigger = index + peak_offset + """ + N = len(data) + corr = signal.correlate(data, prev_buffer) # returns double, not single/FLOAT + Ncorr = 2 * N - 1 + assert len(corr) == Ncorr + + # Find optimal offset + mid = N - 1 + + if radius is not None: + left = max(mid - radius, 0) + right = min(mid + radius + 1, Ncorr) + + corr = corr[left:right] + mid = mid - left + + # Prioritize part of it. + corr[mid + boost_x : mid + boost_x + 1] *= boost_y + + # argmax(corr) == mid + peak_offset == (data >> peak_offset) + # peak_offset == argmax(corr) - mid + peak_offset = np.argmax(corr) - mid # type: int + return peak_offset + + def _is_window_invalid(self, period: int) -> Union[bool, float]: + """ Returns number of semitones, + if pitch has changed more than `recalc_semitones`. """ prev = self._prev_period @@ -319,12 +617,12 @@ class CorrelationTrigger(Trigger): elif prev * period == 0: return prev != period else: - semitones = abs(np.log(period / prev) / np.log(2) * 12) - + # If period doubles, semitones are -12. + semitones = np.log(period / prev) / np.log(2) * -12 # If semitones == recalc_semitones == 0, do NOT recalc. - if semitones <= self.cfg.recalc_semitones: + if abs(semitones) <= self.cfg.recalc_semitones: return False - return True + return semitones def _update_buffer(self, data: np.ndarray, cache: PerFrameCache) -> None: """ diff --git a/tests/test_trigger.py b/tests/test_trigger.py index 166dbe2..7595959 100644 --- a/tests/test_trigger.py +++ b/tests/test_trigger.py @@ -1,8 +1,10 @@ import attr import matplotlib.pyplot as plt +import numpy as np import pytest from matplotlib.axes import Axes from matplotlib.figure import Figure +from pytest_cases import pytest_fixture_plus from corrscope import triggers from corrscope.triggers import ( @@ -11,6 +13,7 @@ from corrscope.triggers import ( PerFrameCache, ZeroCrossingTriggerConfig, LocalPostTriggerConfig, + SpectrumConfig, ) from corrscope.wave import Wave @@ -25,10 +28,16 @@ def cfg_template(**kwargs) -> CorrelationTriggerConfig: return attr.evolve(cfg, **kwargs) -@pytest.fixture(scope="session", params=[False, True]) -def cfg(request): - use_edge_trigger = request.param - return cfg_template(use_edge_trigger=use_edge_trigger) +@pytest_fixture_plus +@pytest.mark.parametrize("use_edge_trigger", [False, True]) +@pytest.mark.parametrize("trigger_diameter", [None, 0.5]) +@pytest.mark.parametrize("pitch_invariance", [None, SpectrumConfig()]) +def cfg(use_edge_trigger, trigger_diameter, pitch_invariance): + return cfg_template( + use_edge_trigger=use_edge_trigger, + trigger_diameter=trigger_diameter, + pitch_invariance=pitch_invariance, + ) @pytest.fixture( @@ -177,6 +186,43 @@ def test_trigger_should_recalc_window(): assert trigger._is_window_invalid(x), x +# Test pitch-invariant triggering using spectrum +def test_correlate_offset(): + """ + Catches bug where writing N instead of Ncorr + prevented function from returning positive numbers. + """ + + np.random.seed(31337) + correlate_offset = CorrelationTrigger.correlate_offset + + # Ensure autocorrelation on random data returns peak at 0. + N = 100 + spectrum = np.random.random(N) + assert correlate_offset(spectrum, spectrum, 12) == 0 + + # Ensure cross-correlation of time-shifted impulses works. + # Assume wave where y=[i==99]. + wave = np.eye(N)[::-1] + # Taking a slice beginning at index i will produce an impulse at 99-i. + left = wave[30] + right = wave[40] + + # We need to slide `left` to the right by 10 samples, and vice versa. + for radius in [None, 12]: + assert correlate_offset(data=left, prev_buffer=right, radius=radius) == 10 + assert correlate_offset(data=right, prev_buffer=left, radius=radius) == -10 + + # The correlation peak at zero-offset is small enough for boost_x to be returned. + boost_y = 1.5 + ones = np.ones(N) + for boost_x in [6, -6]: + assert ( + correlate_offset(ones, ones, radius=9, boost_x=boost_x, boost_y=boost_y) + == boost_x + ) + + # Test the ability to load legacy TriggerConfig