kopia lustrzana https://github.com/corrscope/corrscope
Add sign triggering
rodzic
457e50c88a
commit
283952d588
|
@ -854,6 +854,7 @@ class ChannelModel(qc.QAbstractTableModel):
|
|||
Column("trigger__responsiveness", float, None, "Buffer\nResponsiveness"),
|
||||
Column("trigger__edge_direction", plus_minus_one, None),
|
||||
Column("trigger__edge_strength", float, None),
|
||||
Column("trigger__sign_strength", float, None),
|
||||
Column("trigger__slope_strength", float, None),
|
||||
Column("trigger__slope_width", float, None),
|
||||
]
|
||||
|
|
|
@ -272,6 +272,22 @@ class MainWindow(QWidget):
|
|||
tr = self.tr
|
||||
|
||||
with self.add_tab_stretch(s, tr("&Trigger"), layout=QVBoxLayout) as tab:
|
||||
with append_widget(
|
||||
s,
|
||||
QGroupBox,
|
||||
title=tr("Input Data Preprocessing"),
|
||||
layout=QFormLayout,
|
||||
):
|
||||
with add_row(
|
||||
s,
|
||||
tr("Sign Triggering\n(for triangle waves)"),
|
||||
BoundDoubleSpinBox,
|
||||
name="trigger__sign_strength",
|
||||
minimum=0,
|
||||
singleStep=0.25,
|
||||
):
|
||||
pass
|
||||
|
||||
with append_widget(
|
||||
s, QGroupBox, title=tr("Wave Alignment"), layout=QFormLayout
|
||||
):
|
||||
|
@ -291,14 +307,6 @@ class MainWindow(QWidget):
|
|||
singleStep=0.1,
|
||||
):
|
||||
pass
|
||||
with add_row(
|
||||
s,
|
||||
tr("Mean Responsiveness"),
|
||||
BoundDoubleSpinBox,
|
||||
name="trigger__mean_responsiveness",
|
||||
) as w: # type: BoundDoubleSpinBox
|
||||
w.setMaximum(1.0)
|
||||
w.setSingleStep(0.1)
|
||||
with add_row(s, BoundCheckBox, Both) as (self.trigger__pitch_tracking):
|
||||
assert isinstance(self.trigger__pitch_tracking, QWidget)
|
||||
|
||||
|
|
|
@ -9,12 +9,22 @@ import corrscope.utils.scipy.windows as windows
|
|||
from corrscope.config import KeywordAttrs, CorrError, Alias, with_units
|
||||
from corrscope.spectrum import SpectrumConfig, DummySpectrum, LogFreqSpectrum
|
||||
from corrscope.util import find, obj_name, iround
|
||||
from corrscope.utils.trigger_util import get_period, normalize_buffer, lerp
|
||||
from corrscope.utils.trigger_util import (
|
||||
get_period,
|
||||
normalize_buffer,
|
||||
lerp,
|
||||
MIN_AMPLITUDE,
|
||||
)
|
||||
from corrscope.utils.windows import leftpad, midpad, rightpad
|
||||
from corrscope.wave import FLOAT
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from corrscope.wave import Wave
|
||||
from corrscope.renderer import RendererFrontend
|
||||
|
||||
|
||||
def abs_max(data, offset=0):
|
||||
return np.amax(np.abs(data)) + offset
|
||||
|
||||
|
||||
# Abstract classes
|
||||
|
@ -194,7 +204,6 @@ class PerFrameCache:
|
|||
# NOTE: period is a *non-subsampled* period.
|
||||
# The period of subsampled data must be multiplied by stride.
|
||||
period: Optional[int] = None
|
||||
sum: Optional[float] = None # Only used in branch 'rewrite-area-trigger'
|
||||
mean: Optional[float] = None
|
||||
|
||||
|
||||
|
@ -238,9 +247,8 @@ class CorrelationTriggerConfig(
|
|||
" buffer_falloff ",
|
||||
):
|
||||
# get_trigger()
|
||||
mean_responsiveness: float = 0.05
|
||||
|
||||
# Edge/area finding
|
||||
sign_strength: float = 0
|
||||
edge_strength: float
|
||||
|
||||
# Slope detection
|
||||
|
@ -280,7 +288,6 @@ class CorrelationTriggerConfig(
|
|||
validate_param(self, "slope_width", 0, 0.5)
|
||||
|
||||
validate_param(self, "responsiveness", 0, 1)
|
||||
validate_param(self, "mean_responsiveness", 0, 1)
|
||||
# TODO trigger_falloff >= 0
|
||||
validate_param(self, "buffer_falloff", 0, np.inf)
|
||||
|
||||
|
@ -339,7 +346,6 @@ class CorrelationTrigger(MainTrigger):
|
|||
self._prev_window: Optional[np.ndarray] = None
|
||||
self._prev_slope_finder: Optional[np.ndarray] = None
|
||||
|
||||
self._prev_mean: float = 0.0
|
||||
self._prev_trigger: int = 0
|
||||
|
||||
# (mutable) Log-scaled spectrum
|
||||
|
@ -443,14 +449,12 @@ class CorrelationTrigger(MainTrigger):
|
|||
# Get data (1D, downmixed to mono)
|
||||
stride = self._stride
|
||||
data = self._wave.get_around(index, N, stride)
|
||||
cache.sum = np.add.reduce(data)
|
||||
|
||||
# Update data-mean estimate
|
||||
raw_mean = cache.sum / N
|
||||
self._prev_mean = cache.mean = lerp(
|
||||
self._prev_mean, raw_mean, cfg.mean_responsiveness
|
||||
)
|
||||
data -= cache.mean
|
||||
if cfg.sign_strength != 0:
|
||||
signs = sign_times_peak(data)
|
||||
data += cfg.sign_strength * signs
|
||||
data -= np.add.reduce(data) / N
|
||||
|
||||
# Window data
|
||||
period = get_period(data)
|
||||
|
@ -498,19 +502,28 @@ class CorrelationTrigger(MainTrigger):
|
|||
else:
|
||||
radius = None
|
||||
|
||||
peak_offset = self.correlate_offset(data, prev_buffer, radius)
|
||||
score = correlate_offset(data, prev_buffer, radius)
|
||||
peak_offset = score.peak
|
||||
trigger = index + (stride * peak_offset)
|
||||
|
||||
# Apply post trigger (before updating correlation buffer)
|
||||
del data
|
||||
|
||||
if self.post:
|
||||
new_data = self._wave.get_around(trigger, N, stride)
|
||||
cache.mean = np.add.reduce(new_data) / N
|
||||
|
||||
# Apply post trigger (before updating correlation buffer)
|
||||
trigger = self.post.get_trigger(trigger, cache)
|
||||
|
||||
# Avoid time traveling backwards.
|
||||
self._prev_trigger = trigger = max(trigger, self._prev_trigger)
|
||||
|
||||
# Update correlation buffer (distinct from visible area)
|
||||
aligned = self._wave.get_around(trigger, self._buffer_nsamp, stride)
|
||||
aligned = self._wave.get_around(trigger, N, stride)
|
||||
if cache.mean is None:
|
||||
cache.mean = np.add.reduce(aligned) / N
|
||||
self._update_buffer(aligned, cache)
|
||||
|
||||
self.frames_since_spectrum += 1
|
||||
|
||||
return trigger
|
||||
|
@ -548,13 +561,13 @@ class CorrelationTrigger(MainTrigger):
|
|||
boost_y = 1.0
|
||||
|
||||
# If we want to double pitch...
|
||||
resample_notes = self.correlate_offset(
|
||||
resample_notes = correlate_offset(
|
||||
spectrum,
|
||||
prev_spectrum,
|
||||
scfg.max_notes_to_resample,
|
||||
boost_x=boost_x,
|
||||
boost_y=boost_y,
|
||||
)
|
||||
).peak
|
||||
if resample_notes != 0:
|
||||
# we must divide sampling rate by 2.
|
||||
new_len = iround(N / 2 ** (resample_notes / scfg.notes_per_octave))
|
||||
|
@ -566,49 +579,6 @@ class CorrelationTrigger(MainTrigger):
|
|||
# assert len(self._buffer) == new_len
|
||||
self._buffer = midpad(self._buffer, N)
|
||||
|
||||
@staticmethod
|
||||
def correlate_offset(
|
||||
data: np.ndarray,
|
||||
prev_buffer: np.ndarray,
|
||||
radius: Optional[int],
|
||||
boost_x: int = 0,
|
||||
boost_y: float = 1.0,
|
||||
) -> int:
|
||||
"""
|
||||
This is confusing.
|
||||
|
||||
If data index < optimal, data will be too far to the right,
|
||||
and we need to `index += positive`.
|
||||
- The peak will appear near the right of `data`.
|
||||
|
||||
Either we must slide prev_buffer to the right,
|
||||
or we must slide data to the left (by sliding index to the right):
|
||||
- correlate(data, prev_buffer)
|
||||
- trigger = index + peak_offset
|
||||
"""
|
||||
N = len(data)
|
||||
corr = signal.correlate(data, prev_buffer) # returns double, not single/FLOAT
|
||||
Ncorr = 2 * N - 1
|
||||
assert len(corr) == Ncorr
|
||||
|
||||
# Find optimal offset
|
||||
mid = N - 1
|
||||
|
||||
if radius is not None:
|
||||
left = max(mid - radius, 0)
|
||||
right = min(mid + radius + 1, Ncorr)
|
||||
|
||||
corr = corr[left:right]
|
||||
mid = mid - left
|
||||
|
||||
# Prioritize part of it.
|
||||
corr[mid + boost_x : mid + boost_x + 1] *= boost_y
|
||||
|
||||
# argmax(corr) == mid + peak_offset == (data >> peak_offset)
|
||||
# peak_offset == argmax(corr) - mid
|
||||
peak_offset = np.argmax(corr) - mid # type: int
|
||||
return peak_offset
|
||||
|
||||
def _is_window_invalid(self, period: int) -> Union[bool, float]:
|
||||
""" Returns number of semitones,
|
||||
if pitch has changed more than `recalc_semitones`. """
|
||||
|
@ -657,6 +627,75 @@ class CorrelationTrigger(MainTrigger):
|
|||
self._buffer = lerp(self._buffer, data, responsiveness)
|
||||
|
||||
|
||||
@attr.dataclass
|
||||
class CorrelationResult:
|
||||
peak: int
|
||||
corr: np.ndarray
|
||||
|
||||
|
||||
def correlate_offset(
|
||||
data: np.ndarray,
|
||||
prev_buffer: np.ndarray,
|
||||
radius: Optional[int],
|
||||
boost_x: int = 0,
|
||||
boost_y: float = 1.0,
|
||||
) -> CorrelationResult:
|
||||
"""
|
||||
This is confusing.
|
||||
|
||||
If data index < optimal, data will be too far to the right,
|
||||
and we need to `index += positive`.
|
||||
- The peak will appear near the right of `data`.
|
||||
|
||||
Either we must slide prev_buffer to the right,
|
||||
or we must slide data to the left (by sliding index to the right):
|
||||
- correlate(data, prev_buffer)
|
||||
- trigger = index + peak_offset
|
||||
"""
|
||||
N = len(data)
|
||||
corr = signal.correlate(data, prev_buffer) # returns double, not single/FLOAT
|
||||
Ncorr = 2 * N - 1
|
||||
assert len(corr) == Ncorr
|
||||
|
||||
# Find optimal offset
|
||||
mid = N - 1
|
||||
|
||||
if radius is not None:
|
||||
left = max(mid - radius, 0)
|
||||
right = min(mid + radius + 1, Ncorr)
|
||||
|
||||
corr = corr[left:right]
|
||||
mid = mid - left
|
||||
|
||||
# Prioritize part of it.
|
||||
corr[mid + boost_x : mid + boost_x + 1] *= boost_y
|
||||
|
||||
# argmax(corr) == mid + peak_offset == (data >> peak_offset)
|
||||
# peak_offset == argmax(corr) - mid
|
||||
peak_offset = np.argmax(corr) - mid # type: int
|
||||
return CorrelationResult(peak_offset, corr)
|
||||
|
||||
|
||||
SIGN_AMPLIFICATION = 1000
|
||||
|
||||
|
||||
def sign_times_peak(data: np.ndarray) -> np.ndarray:
|
||||
"""
|
||||
Computes peak = max(abs(data)).
|
||||
Returns `peak` for positive parts of data, and `-peak` for negative parts,
|
||||
and heavily amplifies parts of the wave near zero.
|
||||
"""
|
||||
data = data.copy()
|
||||
|
||||
peak = abs_max(data)
|
||||
data *= SIGN_AMPLIFICATION / (peak + MIN_AMPLITUDE)
|
||||
|
||||
sign_data = np.tanh(data)
|
||||
sign_data *= peak
|
||||
|
||||
return sign_data
|
||||
|
||||
|
||||
#### Post-processing triggers
|
||||
# ZeroCrossingTrigger
|
||||
|
||||
|
|
|
@ -13,6 +13,7 @@ from corrscope.triggers import (
|
|||
PerFrameCache,
|
||||
ZeroCrossingTriggerConfig,
|
||||
SpectrumConfig,
|
||||
correlate_offset,
|
||||
)
|
||||
from corrscope.wave import Wave
|
||||
|
||||
|
@ -220,12 +221,11 @@ def test_correlate_offset():
|
|||
"""
|
||||
|
||||
np.random.seed(31337)
|
||||
correlate_offset = CorrelationTrigger.correlate_offset
|
||||
|
||||
# Ensure autocorrelation on random data returns peak at 0.
|
||||
N = 100
|
||||
spectrum = np.random.random(N)
|
||||
assert correlate_offset(spectrum, spectrum, 12) == 0
|
||||
assert correlate_offset(spectrum, spectrum, 12).peak == 0
|
||||
|
||||
# Ensure cross-correlation of time-shifted impulses works.
|
||||
# Assume wave where y=[i==99].
|
||||
|
@ -236,15 +236,17 @@ def test_correlate_offset():
|
|||
|
||||
# We need to slide `left` to the right by 10 samples, and vice versa.
|
||||
for radius in [None, 12]:
|
||||
assert correlate_offset(data=left, prev_buffer=right, radius=radius) == 10
|
||||
assert correlate_offset(data=right, prev_buffer=left, radius=radius) == -10
|
||||
assert correlate_offset(data=left, prev_buffer=right, radius=radius).peak == 10
|
||||
assert correlate_offset(data=right, prev_buffer=left, radius=radius).peak == -10
|
||||
|
||||
# The correlation peak at zero-offset is small enough for boost_x to be returned.
|
||||
boost_y = 1.5
|
||||
ones = np.ones(N)
|
||||
for boost_x in [6, -6]:
|
||||
assert (
|
||||
correlate_offset(ones, ones, radius=9, boost_x=boost_x, boost_y=boost_y)
|
||||
correlate_offset(
|
||||
ones, ones, radius=9, boost_x=boost_x, boost_y=boost_y
|
||||
).peak
|
||||
== boost_x
|
||||
)
|
||||
|
||||
|
|
Ładowanie…
Reference in New Issue