2018-07-14 03:58:00 +00:00
|
|
|
from abc import ABC, abstractmethod
|
2018-07-28 23:13:51 +00:00
|
|
|
from typing import TYPE_CHECKING, Type
|
2018-07-14 03:58:00 +00:00
|
|
|
|
|
|
|
import numpy as np
|
2018-07-14 06:54:14 +00:00
|
|
|
from scipy import signal
|
2018-07-14 03:58:00 +00:00
|
|
|
|
2018-07-29 09:07:00 +00:00
|
|
|
from ovgenpy.config import register_config, OvgenError
|
2018-07-15 10:11:58 +00:00
|
|
|
from ovgenpy.util import find
|
2018-07-20 09:04:37 +00:00
|
|
|
from ovgenpy.wave import FLOAT
|
|
|
|
|
2018-07-14 06:54:14 +00:00
|
|
|
|
|
|
|
if TYPE_CHECKING:
|
|
|
|
from ovgenpy.wave import Wave
|
2018-07-14 03:58:00 +00:00
|
|
|
|
|
|
|
|
2018-07-28 23:13:51 +00:00
|
|
|
class ITriggerConfig:
|
|
|
|
cls: Type['Trigger']
|
|
|
|
|
|
|
|
def __call__(self, wave: 'Wave', nsamp: int, subsampling: int):
|
|
|
|
return self.cls(wave, cfg=self, nsamp=nsamp, subsampling=subsampling)
|
|
|
|
|
|
|
|
|
|
|
|
def register_trigger(config_t: Type[ITriggerConfig]):
|
|
|
|
""" @register_trigger(FooTriggerConfig)
|
|
|
|
def FooTrigger(): ...
|
|
|
|
"""
|
|
|
|
def inner(trigger_t: Type[Trigger]):
|
|
|
|
config_t.cls = trigger_t
|
|
|
|
return trigger_t
|
|
|
|
|
|
|
|
return inner
|
|
|
|
|
|
|
|
|
2018-07-14 03:58:00 +00:00
|
|
|
class Trigger(ABC):
|
2018-07-28 23:13:51 +00:00
|
|
|
def __init__(self, wave: 'Wave', cfg: ITriggerConfig, nsamp: int, subsampling: int):
|
|
|
|
self.cfg = cfg
|
|
|
|
self._wave = wave
|
|
|
|
|
2018-07-29 09:07:00 +00:00
|
|
|
self._nsamp = nsamp
|
|
|
|
self._subsampling = subsampling
|
2018-07-14 09:06:07 +00:00
|
|
|
|
2018-07-14 03:58:00 +00:00
|
|
|
@abstractmethod
|
2018-07-15 10:11:17 +00:00
|
|
|
def get_trigger(self, index: int) -> int:
|
2018-07-14 03:58:00 +00:00
|
|
|
"""
|
2018-07-15 10:11:17 +00:00
|
|
|
:param index: sample index
|
2018-07-14 03:58:00 +00:00
|
|
|
:return: new sample index, corresponding to rising edge
|
|
|
|
"""
|
|
|
|
...
|
|
|
|
|
|
|
|
|
2018-07-14 09:06:07 +00:00
|
|
|
def lerp(x: np.ndarray, y: np.ndarray, a: float):
|
|
|
|
return x * (1 - a) + y * a
|
|
|
|
|
|
|
|
|
2018-07-25 12:01:58 +00:00
|
|
|
@register_config
|
2018-07-24 11:28:53 +00:00
|
|
|
class CorrelationTriggerConfig(ITriggerConfig):
|
|
|
|
# get_trigger
|
|
|
|
trigger_strength: float
|
|
|
|
use_edge_trigger: bool
|
2018-07-14 06:54:14 +00:00
|
|
|
|
2018-07-24 11:28:53 +00:00
|
|
|
# _update_buffer
|
|
|
|
responsiveness: float
|
|
|
|
falloff_width: float
|
2018-07-14 09:06:07 +00:00
|
|
|
|
|
|
|
|
2018-07-28 23:13:51 +00:00
|
|
|
@register_trigger(CorrelationTriggerConfig)
|
2018-07-24 11:28:53 +00:00
|
|
|
class CorrelationTrigger(Trigger):
|
|
|
|
MIN_AMPLITUDE = 0.01
|
2018-07-15 10:11:58 +00:00
|
|
|
ZERO_CROSSING_SCAN = 256
|
2018-07-28 23:13:51 +00:00
|
|
|
cfg: CorrelationTriggerConfig
|
2018-07-15 10:11:58 +00:00
|
|
|
|
2018-07-28 23:13:51 +00:00
|
|
|
def __init__(self, *args, **kwargs):
|
2018-07-14 03:58:00 +00:00
|
|
|
"""
|
2018-07-28 23:13:51 +00:00
|
|
|
Correlation-based trigger which looks at a window of `trigger_nsamp` samples.
|
2018-07-14 03:58:00 +00:00
|
|
|
it's complicated
|
2018-07-14 06:54:14 +00:00
|
|
|
"""
|
2018-07-28 23:13:51 +00:00
|
|
|
Trigger.__init__(self, *args, **kwargs)
|
2018-07-29 09:07:00 +00:00
|
|
|
self._buffer_nsamp = self._nsamp
|
2018-07-14 03:58:00 +00:00
|
|
|
|
2018-07-14 09:06:07 +00:00
|
|
|
# Create correlation buffer (containing a series of old data)
|
2018-07-28 23:13:51 +00:00
|
|
|
self._buffer = np.zeros(self._buffer_nsamp, dtype=FLOAT) # type: np.ndarray[FLOAT]
|
2018-07-14 06:54:14 +00:00
|
|
|
|
2018-07-15 10:11:58 +00:00
|
|
|
# Create zero crossing trigger, for postprocessing results
|
2018-07-28 23:13:51 +00:00
|
|
|
self._zero_trigger = ZeroCrossingTrigger(
|
|
|
|
self._wave,
|
|
|
|
ITriggerConfig(),
|
|
|
|
nsamp=self.ZERO_CROSSING_SCAN,
|
|
|
|
subsampling=1,
|
|
|
|
)
|
2018-07-15 10:11:58 +00:00
|
|
|
|
2018-07-15 10:11:17 +00:00
|
|
|
def get_trigger(self, index: int) -> int:
|
2018-07-14 03:58:00 +00:00
|
|
|
"""
|
2018-07-15 10:11:17 +00:00
|
|
|
:param index: sample index
|
2018-07-14 03:58:00 +00:00
|
|
|
:return: new sample index, corresponding to rising edge
|
|
|
|
"""
|
2018-07-14 09:06:07 +00:00
|
|
|
trigger_strength = self.cfg.trigger_strength
|
2018-07-15 11:26:54 +00:00
|
|
|
use_edge_trigger = self.cfg.use_edge_trigger
|
2018-07-14 09:06:07 +00:00
|
|
|
|
2018-07-15 11:04:53 +00:00
|
|
|
N = self._buffer_nsamp
|
2018-07-29 10:12:33 +00:00
|
|
|
halfN = N // 2
|
|
|
|
|
|
|
|
# data = windowed
|
2018-07-29 09:07:00 +00:00
|
|
|
data = self._wave.get_around(index, N, self._subsampling)
|
2018-07-29 10:12:33 +00:00
|
|
|
data *= signal.gaussian(N, std = halfN / np.sqrt(self._subsampling))
|
2018-07-14 06:54:14 +00:00
|
|
|
|
2018-07-20 05:37:40 +00:00
|
|
|
# prev_buffer = windowed step function + self._buffer
|
2018-07-20 09:04:37 +00:00
|
|
|
step = np.empty(N, dtype=FLOAT) # type: np.ndarray[FLOAT]
|
2018-07-14 22:42:10 +00:00
|
|
|
step[:halfN] = -trigger_strength / 2
|
|
|
|
step[halfN:] = trigger_strength / 2
|
2018-07-20 05:37:40 +00:00
|
|
|
|
2018-07-29 10:12:33 +00:00
|
|
|
step *= signal.gaussian(N, std = halfN / 3)
|
2018-07-14 22:42:10 +00:00
|
|
|
|
|
|
|
prev_buffer = self._buffer + step
|
2018-07-14 06:54:14 +00:00
|
|
|
|
|
|
|
# Calculate correlation
|
2018-07-14 22:42:10 +00:00
|
|
|
"""
|
|
|
|
If offset < optimal, we need to `offset += positive`.
|
|
|
|
- The peak will appear near the right of `data`.
|
2018-07-20 05:31:15 +00:00
|
|
|
|
2018-07-14 22:42:10 +00:00
|
|
|
Either we must slide prev_buffer to the right:
|
|
|
|
- correlate(data, prev_buffer)
|
|
|
|
- trigger = offset + peak_offset
|
2018-07-20 05:31:15 +00:00
|
|
|
|
2018-07-14 22:42:10 +00:00
|
|
|
Or we must slide data to the left (by sliding offset to the right):
|
|
|
|
- correlate(prev_buffer, data)
|
|
|
|
- trigger = offset - peak_offset
|
|
|
|
"""
|
|
|
|
corr = signal.correlate(data, prev_buffer)
|
2018-07-14 06:54:14 +00:00
|
|
|
assert len(corr) == 2*N - 1
|
2018-07-14 22:42:10 +00:00
|
|
|
|
2018-07-29 09:07:00 +00:00
|
|
|
# Find optimal offset (within ±N//4)
|
|
|
|
mid = N-1
|
|
|
|
radius = N//4
|
|
|
|
|
2018-07-20 05:37:40 +00:00
|
|
|
left = mid - radius
|
|
|
|
right = mid + radius + 1
|
|
|
|
|
|
|
|
corr = corr[left:right]
|
|
|
|
mid = mid - left
|
2018-07-14 06:54:14 +00:00
|
|
|
|
2018-07-20 05:37:40 +00:00
|
|
|
# argmax(corr) == mid + peak_offset == (data >> peak_offset)
|
|
|
|
# peak_offset == argmax(corr) - mid
|
|
|
|
peak_offset = np.argmax(corr) - mid # type: int
|
2018-07-29 09:07:00 +00:00
|
|
|
trigger = index + (self._subsampling * peak_offset)
|
2018-07-14 06:54:14 +00:00
|
|
|
|
|
|
|
# Update correlation buffer (distinct from visible area)
|
2018-07-29 09:07:00 +00:00
|
|
|
aligned = self._wave.get_around(trigger, self._buffer_nsamp, self._subsampling)
|
2018-07-14 06:54:14 +00:00
|
|
|
self._update_buffer(aligned)
|
|
|
|
|
2018-07-15 11:26:54 +00:00
|
|
|
if use_edge_trigger:
|
|
|
|
return self._zero_trigger.get_trigger(trigger)
|
|
|
|
else:
|
|
|
|
return trigger
|
2018-07-14 06:54:14 +00:00
|
|
|
|
2018-07-14 09:06:07 +00:00
|
|
|
def _update_buffer(self, data: np.ndarray) -> None:
|
|
|
|
"""
|
2018-07-14 10:36:16 +00:00
|
|
|
Update self._buffer by adding `data` and a step function.
|
2018-07-14 09:06:07 +00:00
|
|
|
Data is reshaped to taper away from the center.
|
|
|
|
|
|
|
|
:param data: Wave data. WILL BE MODIFIED.
|
|
|
|
"""
|
|
|
|
falloff_width = self.cfg.falloff_width
|
|
|
|
responsiveness = self.cfg.responsiveness
|
|
|
|
|
|
|
|
N = len(data)
|
2018-07-14 10:36:16 +00:00
|
|
|
if N != self._buffer_nsamp:
|
2018-07-14 09:06:07 +00:00
|
|
|
raise ValueError(f'invalid data length {len(data)} does not match '
|
2018-07-14 10:36:16 +00:00
|
|
|
f'CorrelationTrigger {self._buffer_nsamp}')
|
2018-07-14 09:06:07 +00:00
|
|
|
|
|
|
|
# New waveform
|
|
|
|
self._normalize_buffer(data)
|
|
|
|
|
|
|
|
wave_period = get_period(data)
|
2018-07-14 10:36:16 +00:00
|
|
|
window = signal.gaussian(N, std = wave_period * falloff_width)
|
2018-07-14 09:06:07 +00:00
|
|
|
data *= window
|
|
|
|
|
|
|
|
# Old buffer
|
2018-07-14 10:36:16 +00:00
|
|
|
self._normalize_buffer(self._buffer)
|
|
|
|
self._buffer = lerp(self._buffer, data, responsiveness)
|
2018-07-14 09:06:07 +00:00
|
|
|
|
|
|
|
# const method
|
|
|
|
def _normalize_buffer(self, data: np.ndarray) -> None:
|
|
|
|
"""
|
|
|
|
Rescales `data` in-place.
|
|
|
|
"""
|
|
|
|
peak = np.amax(abs(data))
|
|
|
|
data /= max(peak, self.MIN_AMPLITUDE)
|
|
|
|
|
2018-07-14 06:54:14 +00:00
|
|
|
|
|
|
|
def get_period(data: np.ndarray) -> int:
|
|
|
|
"""
|
|
|
|
Use autocorrelation to estimate the period of a signal.
|
|
|
|
Loosely inspired by https://github.com/endolith/waveform_analysis
|
|
|
|
"""
|
|
|
|
corr = signal.correlate(data, data, mode='full', method='fft')
|
|
|
|
corr = corr[len(corr) // 2:]
|
|
|
|
|
|
|
|
# Remove the zero-correlation peak
|
|
|
|
zero_crossings = np.where(corr < 0)[0]
|
|
|
|
|
|
|
|
if len(zero_crossings) == 0:
|
|
|
|
# This can happen given an array of all zeros. Anything else?
|
|
|
|
return len(data)
|
|
|
|
|
|
|
|
crossX = zero_crossings[0]
|
|
|
|
peakX = crossX + np.argmax(corr[crossX:])
|
|
|
|
return peakX
|
2018-07-15 10:11:58 +00:00
|
|
|
|
|
|
|
|
|
|
|
class ZeroCrossingTrigger(Trigger):
|
2018-07-29 09:07:00 +00:00
|
|
|
# TODO support subsampling
|
2018-07-15 10:11:58 +00:00
|
|
|
def get_trigger(self, index: int):
|
2018-07-29 09:07:00 +00:00
|
|
|
if self._subsampling != 1:
|
|
|
|
raise OvgenError(
|
|
|
|
f'ZeroCrossingTrigger with subsampling != 1 is not implemented '
|
|
|
|
f'(supplied {self._subsampling})')
|
|
|
|
nsamp = self._nsamp
|
2018-07-15 10:11:58 +00:00
|
|
|
|
2018-07-15 11:05:41 +00:00
|
|
|
if not 0 <= index < self._wave.nsamp:
|
2018-07-15 10:11:58 +00:00
|
|
|
return index
|
|
|
|
|
|
|
|
if self._wave[index] < 0:
|
|
|
|
direction = 1
|
|
|
|
test = lambda a: a >= 0
|
|
|
|
|
|
|
|
elif self._wave[index] > 0:
|
|
|
|
direction = -1
|
|
|
|
test = lambda a: a <= 0
|
|
|
|
|
|
|
|
else: # self._wave[sample] == 0
|
|
|
|
return index + 1
|
|
|
|
|
2018-07-29 09:07:00 +00:00
|
|
|
data = self._wave[index : index + (direction * nsamp) : direction]
|
2018-07-15 10:11:58 +00:00
|
|
|
intercepts = find(data, test)
|
|
|
|
try:
|
|
|
|
(delta,), value = next(intercepts)
|
|
|
|
return index + (delta * direction) + int(value <= 0)
|
|
|
|
|
|
|
|
except StopIteration: # No zero-intercepts
|
|
|
|
return index
|
|
|
|
|
|
|
|
# noinspection PyUnreachableCode
|
|
|
|
"""
|
|
|
|
`value <= 0` produces poor results on on sine waves, since it erroneously
|
|
|
|
increments the exact idx of the zero-crossing sample.
|
|
|
|
|
|
|
|
`value < 0` produces poor results on impulse24000, since idx = 23999 which
|
|
|
|
doesn't match CorrelationTrigger. (scans left looking for a zero-crossing)
|
|
|
|
|
|
|
|
CorrelationTrigger tries to maximize @trigger - @(trigger-1). I think always
|
|
|
|
incrementing zeros (impulse24000 = 24000) is acceptable.
|
2018-07-20 05:31:15 +00:00
|
|
|
|
2018-07-15 10:11:58 +00:00
|
|
|
- To be consistent, we should increment zeros whenever we *start* there.
|
|
|
|
"""
|