Rewrite pitch tracking based on buffer, not history

pull/357/head
nyanpasu64 2019-04-13 06:09:43 -07:00
rodzic 0a8c0e8451
commit ee4ad5e3ae
4 zmienionych plików z 105 dodań i 95 usunięć

Wyświetl plik

@ -43,7 +43,6 @@ class SpectrumConfig(KeywordAttrs):
# Time-domain history parameters
min_frames_between_recompute: int = 1
frames_to_lookbehind: int = 2
class DummySpectrum:

Wyświetl plik

@ -206,24 +206,6 @@ class PerFrameCache:
# CorrelationTrigger
class CircularArray:
def __init__(self, size: int, *dims: int):
self.size = size
self.buf = np.zeros((size, *dims))
self.index = 0
def push(self, arr: np.ndarray) -> None:
if self.size == 0:
return
self.buf[self.index] = arr
self.index = (self.index + 1) % self.size
def peek(self) -> np.ndarray:
"""Return is borrowed from self.buf.
Do NOT push to self while borrow is alive."""
return self.buf[self.index]
class LagPrevention(KeywordAttrs):
max_frames: float = 1
transition_frames: float = 0.25
@ -353,14 +335,8 @@ class CorrelationTrigger(MainTrigger):
subsmp_s=self._wave.smp_s / self._stride,
dummy_data=self._buffer,
)
self._spectrum = self._spectrum_calc.calc_spectrum(self._buffer)
self.history = CircularArray(
self.scfg.frames_to_lookbehind, self._buffer_nsamp
)
else:
self._spectrum_calc = DummySpectrum()
self._spectrum = np.array([0])
self.history = CircularArray(0, self._buffer_nsamp)
def _calc_lag_prevention(self) -> np.ndarray:
""" Returns input-data window,
@ -402,7 +378,7 @@ class CorrelationTrigger(MainTrigger):
def _calc_step(self) -> np.ndarray:
""" Step function used for approximate edge triggering. """
# Increasing buffer_falloff (width of history buffer)
# Increasing buffer_falloff (width of buffer)
# causes buffer to affect triggering, more than the step function.
# So we multiply edge_strength (step function height) by buffer_falloff.
@ -471,13 +447,12 @@ class CorrelationTrigger(MainTrigger):
# Slope finder
slope_finder = self._calc_slope_finder(period)
data *= window
# If pitch tracking enabled, rescale buffer to match data's pitch.
if self.scfg and (data != 0).any():
if isinstance(semitones, float):
peak_semitones = semitones
else:
peak_semitones = None
self.spectrum_rescale_buffer(data, peak_semitones)
# Mutates self._buffer.
self.spectrum_rescale_buffer(data)
self._prev_period = period
self._prev_window = window
@ -486,8 +461,7 @@ class CorrelationTrigger(MainTrigger):
window = self._prev_window
slope_finder = self._prev_slope_finder
self.history.push(data)
data *= window
data *= window
prev_buffer: np.ndarray = self._buffer * self.cfg.buffer_strength
prev_buffer += self._edge_finder + slope_finder
@ -498,7 +472,7 @@ class CorrelationTrigger(MainTrigger):
else:
radius = None
score = correlate_offset(data, prev_buffer, radius)
score = correlate_data(data, prev_buffer, radius)
peak_offset = score.peak
trigger = index + (stride * peak_offset)
@ -524,56 +498,48 @@ class CorrelationTrigger(MainTrigger):
return trigger
def spectrum_rescale_buffer(
self, data: np.ndarray, peak_semitones: Optional[float]
) -> None:
"""Rewrites self._spectrum, and possibly rescales self._buffer."""
def spectrum_rescale_buffer(self, data: np.ndarray) -> None:
"""
- Cross-correlate the log-frequency spectrum of `data` with `buffer`.
- Rescale `buffer` until its pitch matches `data`.
"""
# Setup
scfg = self.scfg
N = self._buffer_nsamp
if self.frames_since_spectrum < self.scfg.min_frames_between_recompute:
return
self.frames_since_spectrum = 0
spectrum = self._spectrum_calc.calc_spectrum(data)
calc_spectrum = self._spectrum_calc.calc_spectrum
# Compute log-frequency spectrum of `data`.
spectrum = calc_spectrum(data)
normalize_buffer(spectrum)
# Don't normalize self._spectrum. It was already normalized when being assigned.
prev_spectrum = self._spectrum_calc.calc_spectrum(self.history.peek())
# rewrite spectrum
self._spectrum = spectrum
assert not np.any(np.isnan(spectrum))
# Find spectral correlation peak,
# but prioritize "changing pitch by ???".
if peak_semitones is not None:
boost_x = iround(peak_semitones / 12 * scfg.notes_per_octave)
boost_y: float = scfg.pitch_estimate_boost
else:
boost_x = 0
boost_y = 1.0
# Compute log-frequency spectrum of `self._buffer`.
prev_spectrum = calc_spectrum(self._buffer)
# Don't normalize self._spectrum. It was already normalized when being assigned.
# If we want to double pitch...
resample_notes = correlate_offset(
spectrum,
prev_spectrum,
scfg.max_notes_to_resample,
boost_x=boost_x,
boost_y=boost_y,
# Rescale `self._buffer` until its pitch matches `data`.
resample_notes = correlate_spectrum(
spectrum, prev_spectrum, scfg.max_notes_to_resample
).peak
if resample_notes != 0:
# we must divide sampling rate by 2.
# If we want to double pitch, we must divide data length by 2.
new_len = iround(N / 2 ** (resample_notes / scfg.notes_per_octave))
def rescale_mut(in_buf):
buf = np.interp(
np.linspace(0, 1, new_len), np.linspace(0, 1, N), in_buf
)
# assert len(buf) == new_len
buf = midpad(buf, N)
in_buf[:] = buf
# Copy+resample self._buffer.
self._buffer = np.interp(
np.linspace(0, 1, new_len), np.linspace(0, 1, N), self._buffer
)
# assert len(self._buffer) == new_len
self._buffer = midpad(self._buffer, N)
rescale_mut(self._buffer)
def _is_window_invalid(self, period: int) -> Union[bool, float]:
""" Returns number of semitones,
@ -629,12 +595,15 @@ class CorrelationResult:
corr: np.ndarray
def correlate_offset(
data: np.ndarray,
prev_buffer: np.ndarray,
radius: Optional[int],
boost_x: int = 0,
boost_y: float = 1.0,
@attr.dataclass
class InterpolatedCorrelationResult:
peak: float
corr: np.ndarray
# TODO use parabolic() for added precision when trigger subsampling enabled
def correlate_data(
data: np.ndarray, prev_buffer: np.ndarray, radius: Optional[int]
) -> CorrelationResult:
"""
This is confusing.
@ -663,15 +632,55 @@ def correlate_offset(
corr = corr[left:right]
mid = mid - left
# Prioritize part of it.
corr[mid + boost_x : mid + boost_x + 1] *= boost_y
# argmax(corr) == mid + peak_offset == (data >> peak_offset)
# peak_offset == argmax(corr) - mid
peak_offset = np.argmax(corr) - mid # type: int
return CorrelationResult(peak_offset, corr)
def correlate_spectrum(
data: np.ndarray, prev_buffer: np.ndarray, radius: Optional[int]
) -> InterpolatedCorrelationResult:
N = len(data)
corr = signal.correlate(data, prev_buffer) # returns double, not single/FLOAT
Ncorr = 2 * N - 1
assert len(corr) == Ncorr
# Find optimal offset
mid = N - 1
if radius is not None:
left = max(mid - radius, 0)
right = min(mid + radius + 1, Ncorr)
corr = corr[left:right]
mid = mid - left
# argmax(corr) == mid + peak_offset == (data >> peak_offset)
# peak_offset == argmax(corr) - mid
peak_offset = parabolic(corr, np.argmax(corr)) - mid # type: float
return InterpolatedCorrelationResult(peak_offset, corr)
def parabolic(ys: np.ndarray, xint: int) -> float:
"""
Quadratic interpolation for estimating the true position of an inter-sample maximum
when nearby samples are known.
"""
if xint - 1 < 0 or xint + 1 >= len(ys):
return float(xint)
left = ys[xint - 1]
mid = ys[xint]
right = ys[xint + 1]
# https://ccrma.stanford.edu/~jos/sasp/Quadratic_Interpolation_Spectral_Peaks.html
dx = 0.5 * (+left - right) / (+left - 2 * mid + right)
assert -1 < dx < 1
return xint + dx
SIGN_AMPLIFICATION = 1000

Wyświetl plik

@ -55,16 +55,21 @@ To remove DC offset from the wave, corrscope calculates the `mean` of input `dat
Corrscope then estimates the fundamental `period` of the waveform, using autocorrelation.
Corrscope multiplies `data` by `data window` to taper off the edges towards zero, and avoid using data over 1 frame old.
### (optional) Pitch Tracking
If `Pitch Tracking` is enabled:
If `period` changes significantly, corrscope computes the spectrums of `data` and `data` from 2 frames ago, and cross-correlates them to estimate the pitch change over the last 2 frames. It then resamples (horizontally scales) `buffer` to match this pitch change.
If `period` changes significantly:
- Cross-correlate the log-frequency spectrum of `data` with `buffer`.
- Rescale `buffer` until its pitch matches `data`.
Pitch Tracking may get confused when `data` moves from 1 note to another over the course of multiple frames. If the right half of `buffer` changes to a new note while the left half is still latched onto the old note, the next frame will latch onto the mistriggered right half of the buffer. To prevent issues, you should consider reducing `Buffer Responsiveness` (so `buffer` will not "learn" the wrong pitch, and instead be rescaled to align with the new note).
### Correlation Triggering (uses `buffer`)
Corrscope multiplies `data` by `data window` to taper off the edges towards zero, and avoid using data over 1 frame old.
Precomputed: `edge_finder`, which is computed once and reused for every frame.
Corrscope cross-correlates `data` with `buffer + edge_finder` to produce a "buffer similarity + edge" score for each possible `data` triggering location. Corrscope then picks the location in `data` with the highest score, then sets `position` to be used for rendering.

Wyświetl plik

@ -13,7 +13,8 @@ from corrscope.triggers import (
PerFrameCache,
ZeroCrossingTriggerConfig,
SpectrumConfig,
correlate_offset,
correlate_data,
correlate_spectrum,
)
from corrscope.wave import Wave
@ -214,18 +215,23 @@ def test_post_trigger_radius():
# Test pitch-tracking (spectrum)
def test_correlate_offset():
@parametrize("correlate", [correlate_data, correlate_spectrum])
def test_correlate_offset(correlate):
"""
Catches bug where writing N instead of Ncorr
prevented function from returning positive numbers.
"""
if correlate == correlate_spectrum:
approx = lambda x: pytest.approx(x, rel=0.5)
else:
approx = lambda x: x
np.random.seed(31337)
# Ensure autocorrelation on random data returns peak at 0.
N = 100
spectrum = np.random.random(N)
assert correlate_offset(spectrum, spectrum, 12).peak == 0
assert correlate(spectrum, spectrum, 12).peak == approx(0)
# Ensure cross-correlation of time-shifted impulses works.
# Assume wave where y=[i==99].
@ -236,18 +242,9 @@ def test_correlate_offset():
# We need to slide `left` to the right by 10 samples, and vice versa.
for radius in [None, 12]:
assert correlate_offset(data=left, prev_buffer=right, radius=radius).peak == 10
assert correlate_offset(data=right, prev_buffer=left, radius=radius).peak == -10
# The correlation peak at zero-offset is small enough for boost_x to be returned.
boost_y = 1.5
ones = np.ones(N)
for boost_x in [6, -6]:
assert (
correlate_offset(
ones, ones, radius=9, boost_x=boost_x, boost_y=boost_y
).peak
== boost_x
assert correlate(data=left, prev_buffer=right, radius=radius).peak == approx(10)
assert correlate(data=right, prev_buffer=left, radius=radius).peak == approx(
-10
)