Improve period calculation, add maximum frequency cap (#294)

pull/357/head
nyanpasu64 2019-06-16 22:15:46 -07:00 zatwierdzone przez GitHub
rodzic 66133d581d
commit e99c1d3c99
6 zmienionych plików z 180 dodań i 40 usunięć

Wyświetl plik

@ -770,6 +770,12 @@ class MatplotlibAggRenderer(AbstractMatplotlibRenderer):
return np.array([round(c * 255) for c in to_rgb(c)], dtype=int)
# TODO: PlotConfig
# - align: left vs mid
# - shift/offset: bool
# - invert if trigger is negative: bool
class RendererFrontend(_RendererBackend, ABC):
"""Wrapper around _RendererBackend implementations, providing a better interface."""

Wyświetl plik

@ -34,7 +34,6 @@ class SpectrumConfig(KeywordAttrs):
divide_by_freq: bool = True
# Spectral alignment and resampling
pitch_estimate_boost: float = 1.2
max_octaves_to_resample: float = 1.0
@property

Wyświetl plik

@ -14,8 +14,9 @@ from corrscope.utils.trigger_util import (
normalize_buffer,
lerp,
MIN_AMPLITUDE,
abs_max,
)
from corrscope.utils.windows import leftpad, midpad, rightpad
from corrscope.utils.windows import leftpad, midpad, rightpad, gaussian_or_zero
from corrscope.wave import FLOAT
if TYPE_CHECKING:
@ -98,24 +99,49 @@ class _Trigger(ABC):
self._renderer = renderer
self._wave_idx = wave_idx
# Subsamples per second
self.subsmp_s = self._wave.smp_s / self._stride
frame_dur = 1 / fps
# Subsamples per frame
self._tsamp_frame = self.time2tsamp(frame_dur)
def time2tsamp(self, time: float) -> int:
return round(time * self._wave.smp_s / self._stride)
return round(time * self.subsmp_s)
def custom_line(self, name: str, data: np.ndarray, **kwargs):
def custom_line(
self, name: str, data: np.ndarray, offset: bool, invert: bool = True
):
"""
:param offset:
- True, for untriggered wave data:
- line will be shifted and triggered (by offset_viewport()).
- False, for triggered data and buffers:
- line is immune to offset_viewport().
:param invert:
- True, for wave (data and buffers):
- If wave data is inverted (edge_direction = -1),
data will be plotted inverted.
- False, for buffers and autocorrelated wave data:
- Data is plotted as-is.
"""
if self._renderer is None:
return
data = data / abs_max(data, 0.01) / 2
if invert:
data *= np.copysign(1, self._wave.amplification)
self._renderer.update_custom_line(
name, self._wave_idx, self._stride, data, **kwargs
name, self._wave_idx, self._stride, data, offset=offset
)
def custom_vline(self, name: str, x: int):
def custom_vline(self, name: str, x: int, offset: bool):
"""See above for `offset`."""
if self._renderer is None:
return
self._renderer.update_vline(name, self._wave_idx, self._stride, x)
self._renderer.update_vline(
name, self._wave_idx, self._stride, x, offset=offset
)
def offset_viewport(self, offset: int):
if self._renderer is None:
@ -252,6 +278,9 @@ class CorrelationTriggerConfig(
# _update_buffer
responsiveness: float
# Period/frequency estimation (not in GUI)
max_freq: float = with_units("Hz", default=4000)
# Pitch tracking = compute spectrum.
pitch_tracking: Optional["SpectrumConfig"] = None
@ -331,9 +360,7 @@ class CorrelationTrigger(MainTrigger):
if self.scfg:
self._spectrum_calc = LogFreqSpectrum(
scfg=self.scfg,
subsmp_s=self._wave.smp_s / self._stride,
dummy_data=self._buffer,
scfg=self.scfg, subsmp_s=self.subsmp_s, dummy_data=self._buffer
)
else:
self._spectrum_calc = DummySpectrum()
@ -430,14 +457,14 @@ class CorrelationTrigger(MainTrigger):
data -= np.add.reduce(data) / N
# Window data
period = get_period(data)
period = get_period(data, self.subsmp_s, self.cfg.max_freq, self)
cache.period = period * stride
semitones = self._is_window_invalid(period)
# If pitch changed...
if semitones:
# Gaussian window
period_symmetric_window = windows.gaussian(N, period * cfg.data_falloff)
period_symmetric_window = gaussian_or_zero(N, period * cfg.data_falloff)
# Left-sided falloff
lag_prevention_window = self._lag_prevention_window
@ -473,8 +500,8 @@ class CorrelationTrigger(MainTrigger):
else:
radius = None
score = correlate_data(data, prev_buffer, radius)
peak_offset = score.peak
trigger_score = correlate_data(data, prev_buffer, radius)
peak_offset = trigger_score.peak
trigger = index + (stride * peak_offset)
del data
@ -544,17 +571,29 @@ class CorrelationTrigger(MainTrigger):
rescale_mut(self._buffer)
def _is_window_invalid(self, period: int) -> Union[bool, float]:
""" Returns number of semitones,
if pitch has changed more than `recalc_semitones`. """
"""
Returns number of semitones,
if pitch has changed more than `recalc_semitones`.
Preconditions:
- self._prev_period is assigned whenever this function returns True.
- If period cannot be estimated, period == 0.
Postconditions:
- On frame 0, MUST return True (to initialize self._prev_window).
- This is the only way self._prev_period == 0.
- Elif period is 0 (cannot be estimated), return False.
"""
prev = self._prev_period
if prev is None:
return True
elif prev * period == 0:
return prev != period
elif period == 0:
return False
elif prev == 0:
return True
else:
# If period doubles, semitones are -12.
# When period doubles, semitones are -12.
semitones = np.log(period / prev) / np.log(2) * -12
# If semitones == recalc_semitones == 0, do NOT recalc.
if abs(semitones) <= self.cfg.recalc_semitones:
@ -583,7 +622,7 @@ class CorrelationTrigger(MainTrigger):
# New waveform
data -= cache.mean
normalize_buffer(data)
window = windows.gaussian(N, std=(cache.period / self._stride) * buffer_falloff)
window = gaussian_or_zero(N, std=(cache.period / self._stride) * buffer_falloff)
data *= window
# Old buffer
@ -689,10 +728,6 @@ def sign_times_peak(data: np.ndarray) -> np.ndarray:
return sign_data
def abs_max(data, offset=0):
return np.amax(np.abs(data)) + offset
#### Post-processing triggers
# ZeroCrossingTrigger

Wyświetl plik

@ -1,28 +1,112 @@
from typing import Union
from typing import Union, Optional, TYPE_CHECKING
import numpy as np
from corrscope.utils.scipy import signal as signal
import corrscope.utils.scipy.signal as signal
from corrscope.util import iround
from corrscope.wave import FLOAT
if TYPE_CHECKING:
import corrscope.triggers as t
# 0 = no amplification (estimated period too short).
# 1 = full area compensation (estimated period too long).
EDGE_COMPENSATION = 0.9
MAX_AMPLIFICATION = 2
# get_trigger()
def get_period(data: np.ndarray) -> int:
def get_period(
data: np.ndarray,
subsmp_s: float,
max_freq: float,
self: "Optional[t.CorrelationTrigger]" = None,
) -> int:
"""
Use autocorrelation to estimate the period of a signal.
Use tweaked autocorrelation to estimate the period (AKA pitch) of a signal.
Loosely inspired by https://github.com/endolith/waveform_analysis
Design principles:
- It is better to overestimate the period than underestimate.
- Underestimation leads to bad triggering.
- Overestimation only leads to slightly increased x-distance.
- When the wave is exiting the field of view,
do NOT estimate an egregiously large period.
- Do not report a tiny period when faced with YM2612 FM feedback/noise.
- See get_min_period() docstring.
Return value:
- Returns 0 if period cannot be estimated.
This is a good placeholder value
since it causes buffers/etc. to be basically not updated.
"""
corr = signal.correlate(data, data)
corr = corr[len(corr) // 2 :]
UNKNOWN_PERIOD = 0
# Remove the zero-correlation peak
zero_crossings = np.where(corr < 0)[0]
N = len(data)
if len(zero_crossings) == 0:
# This can happen given an array of all zeros. Anything else?
return len(data)
# If no input, return period of 1.
if np.add.reduce(np.abs(data)) < MIN_AMPLITUDE * N:
return UNKNOWN_PERIOD
# Begin.
corr_symmetric = signal.correlate(data, data)
mid = len(corr_symmetric) // 2
corr = corr_symmetric[mid:]
assert len(corr) == len(data)
def get_min_period() -> int:
"""
Avoid picking periods shorter than `max_freq`.
- Yamaha FM feedback produces nearly inaudible high frequencies,
which tend to produce erroneously short period estimates,
causing correlation to fail.
- Most music does not go this high.
- Overestimating period of high notes is mostly harmless.
"""
max_cyc_s = max_freq
min_s_cyc = 1 / max_cyc_s
min_subsmp_cyc = subsmp_s * min_s_cyc
return iround(min_subsmp_cyc)
def get_zero_crossing() -> int:
"""Remove the central peak."""
zero_crossings = np.where(corr < 0)[0]
if len(zero_crossings) == 0:
# This can happen given an array of all zeros. Anything else?
return UNKNOWN_PERIOD
return zero_crossings[0]
min_period = get_min_period()
zero_crossing = get_zero_crossing()
if zero_crossing == UNKNOWN_PERIOD:
return UNKNOWN_PERIOD
# [minX..) = [min_period..) & [zero_crossing..)
minX = max(min_period, zero_crossing)
# Remove the zero-correlation peak.
def calc_peak():
return minX + np.argmax(corr[minX:])
temp_peakX = calc_peak()
# In the case of uncorrelated noise,
# corr[temp_peakX] can be tiny (smaller than N * MIN_AMPLITUDE^2).
# But don't return 0 since it's not silence.
is_long_period = temp_peakX > 0.1 * N
if is_long_period:
# If a long-period wave has strong harmonics,
# the true peak will be attenuated below the harmonic peaks.
# Compensate for that.
divisor = np.linspace(1, 1 - EDGE_COMPENSATION, N, endpoint=False, dtype=FLOAT)
divisor = np.maximum(divisor, 1 / MAX_AMPLIFICATION)
corr /= divisor
peakX = calc_peak()
else:
peakX = temp_peakX
crossX = zero_crossings[0]
peakX = crossX + np.argmax(corr[crossX:])
return int(peakX)
@ -43,3 +127,7 @@ Arithmetic = Union[np.ndarray, float]
def lerp(x: Arithmetic, y: Arithmetic, a: float) -> Arithmetic:
return x * (1 - a) + y * a
def abs_max(data, offset=0):
return np.amax(np.abs(data)) + offset

Wyświetl plik

@ -43,3 +43,15 @@ def rightpad(data: np.ndarray, n: int, constant_values=1) -> np.ndarray:
data = np.pad(data, (0, n - len(data)), "constant", constant_values=constant_values)
return data
def gaussian_or_zero(M: int, std: float, sym: bool = True) -> np.ndarray:
"""
Sometimes `std` is computed based on period.
If period is zero (cannot be estimated), return all zeros.
"""
if std == 0:
return np.zeros(M, dtype=FLOAT)
else:
return windows.gaussian(M, std, sym)

Wyświetl plik

@ -197,9 +197,9 @@ def test_when_does_trigger_recalc_window():
trigger._prev_period = 100
for x in [99, 101]:
for x in [0, 99, 101]:
assert not trigger._is_window_invalid(x), x
for x in [0, 80, 120]:
for x in [80, 120]:
assert trigger._is_window_invalid(x), x
trigger._prev_period = 0