2022-09-12 11:47:29 +00:00
|
|
|
import array
|
2022-09-11 11:28:25 +00:00
|
|
|
from functools import lru_cache
|
2022-09-17 09:43:04 +00:00
|
|
|
from typing import List, NamedTuple, Optional, Tuple
|
2022-09-11 11:28:25 +00:00
|
|
|
|
2022-09-13 09:49:21 +00:00
|
|
|
from numba import njit
|
2022-09-11 11:28:25 +00:00
|
|
|
|
2022-09-14 18:43:20 +00:00
|
|
|
import numpy as np
|
|
|
|
|
2022-09-11 11:28:25 +00:00
|
|
|
|
|
|
|
class XY(NamedTuple):
|
|
|
|
"""XY coordinate data of arrays of the same length."""
|
|
|
|
|
|
|
|
x: np.ndarray
|
|
|
|
y: np.ndarray
|
|
|
|
|
|
|
|
|
2022-09-17 09:43:04 +00:00
|
|
|
Correction = List[Tuple[float, float]]
|
|
|
|
|
|
|
|
|
2022-09-11 11:28:25 +00:00
|
|
|
class Analyzer:
|
|
|
|
"""
|
|
|
|
Analyze the system response to a chirp stimulus.
|
|
|
|
|
|
|
|
Symbols that are used:
|
|
|
|
|
|
|
|
x: stimulus
|
|
|
|
y: response = x * h
|
|
|
|
X = FT(x)
|
|
|
|
Y = FT(y) = X . H
|
|
|
|
H: system transfer function = X / Y
|
|
|
|
h: system impulse response = IFT(H)
|
|
|
|
h_inv: inverse system impulse response (which undoes h) = IFT(1 / H)
|
|
|
|
|
|
|
|
with:
|
|
|
|
*: convolution operator
|
|
|
|
FT: Fourier transform
|
|
|
|
IFT: Inverse Fourier transform
|
|
|
|
"""
|
|
|
|
|
|
|
|
MAX_DELAY_SECS = 0.1
|
|
|
|
TIMEOUT_SECS = 1.0
|
|
|
|
|
|
|
|
chirp: np.ndarray
|
|
|
|
x: np.ndarray
|
|
|
|
y: np.ndarray
|
|
|
|
rate: int
|
|
|
|
secs: float
|
|
|
|
fmin: float
|
|
|
|
fmax: float
|
|
|
|
time: float
|
|
|
|
|
|
|
|
def __init__(
|
2022-09-17 09:43:04 +00:00
|
|
|
self, f0: int, f1: int, secs: float, rate: int, ampl: float,
|
|
|
|
calibration: Optional[Correction] = None,
|
|
|
|
target: Optional[Correction] = None):
|
2022-09-11 11:28:25 +00:00
|
|
|
self.chirp = ampl * geom_chirp(f0, f1, secs, rate)
|
|
|
|
self.x = np.concatenate([
|
|
|
|
self.chirp,
|
|
|
|
np.zeros(int(self.MAX_DELAY_SECS * rate))
|
|
|
|
])
|
|
|
|
self.secs = self.x.size / rate
|
|
|
|
self.rate = rate
|
|
|
|
self.fmin = min(f0, f1)
|
|
|
|
self.fmax = max(f0, f1)
|
2022-09-12 11:21:47 +00:00
|
|
|
self.time = 0
|
2022-09-17 09:43:04 +00:00
|
|
|
self._calibration = calibration
|
|
|
|
self._target = target
|
2022-09-11 11:28:25 +00:00
|
|
|
|
|
|
|
# Cache the methods in a way that allows garbage collection of self.
|
2022-09-17 09:43:04 +00:00
|
|
|
for meth in ['X', 'Y', 'H', 'H2', 'h', 'h_inv', 'spectrum',
|
|
|
|
'frequency', 'calibration', 'target']:
|
2022-09-11 11:28:25 +00:00
|
|
|
setattr(self, meth, lru_cache(getattr(self, meth)))
|
|
|
|
|
2022-09-12 11:47:29 +00:00
|
|
|
def findMatch(self, recording: array.array) -> bool:
|
2022-09-11 11:28:25 +00:00
|
|
|
"""
|
|
|
|
Use correlation to find a match of the chirp in the recording.
|
2022-09-12 11:21:09 +00:00
|
|
|
If found, return True and store the system response as ``y``.
|
2022-09-11 11:28:25 +00:00
|
|
|
"""
|
2022-09-12 11:47:29 +00:00
|
|
|
sz = len(recording)
|
|
|
|
self.time = sz / self.rate
|
|
|
|
if sz >= self.x.size:
|
2022-09-11 11:28:25 +00:00
|
|
|
Y = np.fft.fft(recording)
|
2022-09-12 11:47:29 +00:00
|
|
|
X = np.fft.fft(np.flip(self.x), n=sz)
|
2022-09-11 11:28:25 +00:00
|
|
|
corr = np.fft.ifft(X * Y).real
|
|
|
|
idx = int(corr.argmax()) - self.x.size + 1
|
|
|
|
if idx >= 0:
|
2022-09-12 11:47:29 +00:00
|
|
|
self.y = np.array(recording[idx:idx + self.x.size], 'f')
|
2022-09-11 11:28:25 +00:00
|
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
|
|
def timedOut(self) -> bool:
|
|
|
|
"""See if time to find a match has exceeded the timeout limit."""
|
|
|
|
return self.time > self.secs + self.TIMEOUT_SECS
|
|
|
|
|
2022-09-17 09:43:04 +00:00
|
|
|
def frequency(self) -> np.ndarray:
|
|
|
|
return np.linspace(0, self.rate // 2, self.X().size)
|
|
|
|
|
|
|
|
def freqRange(self, size: int = 0) -> slice:
|
2022-09-11 11:28:25 +00:00
|
|
|
"""
|
|
|
|
Return range slice of the valid frequency range for an array
|
|
|
|
of given size.
|
|
|
|
"""
|
2022-09-17 09:43:04 +00:00
|
|
|
size = size or self.X().size
|
2022-09-11 11:28:25 +00:00
|
|
|
nyq = self.rate / 2
|
|
|
|
i0 = int(0.5 + size * self.fmin / nyq)
|
|
|
|
i1 = int(0.5 + size * self.fmax / nyq)
|
|
|
|
return slice(i0, i1 + 1)
|
|
|
|
|
2022-09-17 09:43:04 +00:00
|
|
|
def calibration(self) -> Optional[np.ndarray]:
|
|
|
|
return self.interpolateCorrection(self._calibration)
|
|
|
|
|
|
|
|
def target(self) -> Optional[np.ndarray]:
|
|
|
|
return self.interpolateCorrection(self._target)
|
|
|
|
|
|
|
|
def interpolateCorrection(self, corr: Correction) -> Optional[np.ndarray]:
|
|
|
|
"""
|
|
|
|
Logarithmically interpolate the correction to a full-sized array.
|
|
|
|
"""
|
|
|
|
if not corr:
|
|
|
|
return None
|
|
|
|
corr = sorted(c for c in corr if c[0] > 0)
|
|
|
|
a = np.array(corr, 'd').T
|
|
|
|
logF = np.log(a[0])
|
|
|
|
db = a[1]
|
|
|
|
freq = self.frequency()
|
|
|
|
interp = np.empty_like(freq)
|
|
|
|
interp[1:] = np.interp(np.log(freq[1:]), logF, db)
|
|
|
|
interp[0] = 0
|
|
|
|
return interp
|
|
|
|
|
2022-09-11 11:28:25 +00:00
|
|
|
def X(self) -> np.ndarray:
|
|
|
|
return np.fft.rfft(self.x)
|
|
|
|
|
|
|
|
def Y(self) -> np.ndarray:
|
|
|
|
return np.fft.rfft(self.y)
|
|
|
|
|
|
|
|
def H(self) -> XY:
|
|
|
|
"""
|
|
|
|
Calculate complex-valued transfer function H in the
|
|
|
|
frequency domain.
|
|
|
|
"""
|
|
|
|
X = self.X()
|
|
|
|
Y = self.Y()
|
|
|
|
# H = Y / X
|
|
|
|
H = Y * np.conj(X) / (np.abs(X) ** 2 + 1e-3)
|
2022-09-17 09:43:04 +00:00
|
|
|
if self._calibration:
|
|
|
|
H *= 10 ** (-self.calibration() / 20)
|
|
|
|
freq = self.frequency()
|
2022-09-11 11:28:25 +00:00
|
|
|
return XY(freq, H)
|
|
|
|
|
|
|
|
def H2(self, smoothing: float):
|
|
|
|
"""Calculate smoothed squared transfer function |H|^2."""
|
|
|
|
freq, H = self.H()
|
|
|
|
H = np.abs(H)
|
2022-09-17 09:43:04 +00:00
|
|
|
r = self.freqRange()
|
2022-09-11 11:28:25 +00:00
|
|
|
H2 = np.empty_like(H)
|
|
|
|
# Perform smoothing on the squared amplitude.
|
|
|
|
H2[r] = smooth(freq[r], H[r] ** 2, smoothing)
|
|
|
|
H2[:r.start] = H2[r.start]
|
|
|
|
H2[r.stop:] = H2[r.stop - 1]
|
|
|
|
return XY(freq, H2)
|
|
|
|
|
|
|
|
def h(self) -> XY:
|
|
|
|
"""Calculate impulse response ``h`` in the time domain."""
|
|
|
|
_, H = self.H()
|
|
|
|
h = np.fft.irfft(H)
|
|
|
|
h = np.hstack([h[h.size // 2:], h[0:h.size // 2]])
|
|
|
|
t = np.linspace(0, h.size / self.rate, h.size)
|
|
|
|
return XY(t, h)
|
|
|
|
|
|
|
|
def spectrum(self, smoothing: float = 0) -> XY:
|
|
|
|
"""
|
|
|
|
Calculate the frequency response in the valid frequency range,
|
|
|
|
with optional smoothing.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
smoothing: Determines the overall strength of the smoothing.
|
|
|
|
Useful values are from 0 to around 30.
|
|
|
|
If 0 then no smoothing is done.
|
|
|
|
"""
|
|
|
|
freq, H2 = self.H2(smoothing)
|
2022-09-17 09:43:04 +00:00
|
|
|
r = self.freqRange()
|
2022-09-11 11:28:25 +00:00
|
|
|
return XY(freq[r], 10 * np.log10(H2[r]))
|
|
|
|
|
|
|
|
def h_inv(
|
|
|
|
self,
|
|
|
|
secs: float = 0.05,
|
|
|
|
dbRange: float = 24,
|
|
|
|
kaiserBeta: float = 5,
|
|
|
|
smoothing: float = 0) -> XY:
|
|
|
|
"""
|
|
|
|
Calculate the inverse impulse response.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
secs: Desired length of the response in seconds.
|
|
|
|
dbRange: Maximum attenuation in dB (power).
|
|
|
|
kaiserBeta: Shape parameter of the Kaiser tapering window.
|
|
|
|
smoothing: Strength of frequency-dependent smoothing.
|
|
|
|
"""
|
|
|
|
freq, H2 = self.H2(smoothing)
|
2022-09-17 09:43:04 +00:00
|
|
|
# Apply target curve.
|
|
|
|
if self._target:
|
|
|
|
H2 = H2 * 10 ** (-self.target() / 10)
|
2022-09-11 11:28:25 +00:00
|
|
|
# Re-sample to halve the number of samples needed.
|
|
|
|
n = int(secs * self.rate / 2)
|
|
|
|
H = resample(H2, n) ** 0.5
|
|
|
|
# Accommodate the given dbRange from the top.
|
|
|
|
H /= H.max()
|
|
|
|
H = np.fmax(H, 10 ** (-dbRange / 20))
|
|
|
|
|
|
|
|
# Calculate Z, the reciprocal transfer function with added
|
|
|
|
# linear phase. This phase will shift and center z.
|
|
|
|
Z = 1 / H
|
|
|
|
phase = np.exp(Z.size * 1j * np.linspace(0, np.pi, Z.size))
|
|
|
|
Z = Z * phase
|
|
|
|
|
|
|
|
# Calculate the inverse impulse response z.
|
|
|
|
z = np.fft.irfft(Z)
|
|
|
|
z = z[:-1]
|
|
|
|
z *= window(z.size, kaiserBeta)
|
|
|
|
# Normalize using a fractal dimension for scaling.
|
2022-09-18 07:14:10 +00:00
|
|
|
dim = 1.5
|
2022-09-11 11:28:25 +00:00
|
|
|
norm = (np.abs(z) ** dim).sum() ** (1 / dim)
|
|
|
|
z /= norm
|
|
|
|
# assert np.allclose(z[-(z.size // 2):][::-1], z[:z.size // 2])
|
|
|
|
|
|
|
|
t = np.linspace(0, z.size / self.rate, z.size)
|
|
|
|
return XY(t, z)
|
|
|
|
|
|
|
|
def correctionFactor(self, invResp: np.ndarray) -> XY:
|
|
|
|
"""
|
|
|
|
Calculate correction factor for each frequency, given the
|
|
|
|
inverse impulse response.
|
|
|
|
"""
|
|
|
|
Z = np.abs(np.fft.rfft(invResp))
|
|
|
|
Z /= Z.max()
|
|
|
|
freq = np.linspace(0, self.rate / 2, Z.size)
|
|
|
|
return XY(freq, Z)
|
|
|
|
|
|
|
|
def correctedSpectrum(self, corrFactor: XY) -> Tuple[XY, XY]:
|
|
|
|
"""
|
|
|
|
Simulate the frequency response of the system when it has
|
|
|
|
been corrected with the given transfer function.
|
|
|
|
"""
|
|
|
|
freq, H2 = self.H2(0)
|
|
|
|
H = H2 ** 0.5
|
2022-09-17 09:43:04 +00:00
|
|
|
r = self.freqRange()
|
2022-09-11 11:28:25 +00:00
|
|
|
|
|
|
|
tf = resample(corrFactor.y, H.size)
|
|
|
|
resp = 20 * np.log10(tf[r] * H[r])
|
|
|
|
spectrum = XY(freq[r], resp)
|
|
|
|
|
|
|
|
H = resample(H2, corrFactor.y.size) ** 0.5
|
|
|
|
rr = self.freqRange(corrFactor.y.size)
|
|
|
|
resp = 20 * np.log10(corrFactor.y[rr] * H[rr])
|
|
|
|
spectrum_resamp = XY(corrFactor.x[rr], resp)
|
|
|
|
|
|
|
|
return spectrum, spectrum_resamp
|
|
|
|
|
2022-09-17 09:43:04 +00:00
|
|
|
def targetSpectrum(self, spectrum: XY) -> Optional[XY]:
|
|
|
|
if self._target:
|
|
|
|
freq, resp = spectrum
|
|
|
|
r = self.freqRange()
|
|
|
|
target = self.target()[r]
|
|
|
|
target += np.average(resp - target, weights=1 / freq)
|
|
|
|
targetSpectrum = XY(freq, target)
|
|
|
|
else:
|
|
|
|
targetSpectrum = None
|
|
|
|
return targetSpectrum
|
2022-09-11 11:28:25 +00:00
|
|
|
|
2022-09-17 09:45:43 +00:00
|
|
|
|
2022-09-11 11:28:25 +00:00
|
|
|
@lru_cache
|
|
|
|
def tone(f: float, secs: float, rate: int):
|
|
|
|
"""Generate a sine wave."""
|
|
|
|
n = int(secs * f)
|
|
|
|
secs = n / f
|
|
|
|
t = np.arange(0, secs * rate) / rate
|
|
|
|
sine = np.sin(2 * np.pi * f * t)
|
|
|
|
return sine
|
|
|
|
|
|
|
|
|
|
|
|
@lru_cache
|
2022-09-13 09:47:35 +00:00
|
|
|
def geom_chirp(f0: float, f1: float, secs: float, rate: int):
|
2022-09-11 11:28:25 +00:00
|
|
|
"""
|
|
|
|
Generate a geometric chirp (with an exponentially changing frequency).
|
|
|
|
|
|
|
|
To avoid a clicking sound at the end, the last sample should be near
|
|
|
|
zero. This is done by slightly modifying the time interval to fit an
|
|
|
|
integer number of cycli.
|
|
|
|
"""
|
|
|
|
n = int(secs * (f1 - f0) / np.log(f1 / f0))
|
|
|
|
k = np.exp((f1 - f0) / n) # =~ exp[log(f1/f0) / secs]
|
|
|
|
secs = np.log(f1 / f0) / np.log(k)
|
|
|
|
|
|
|
|
t = np.arange(0, secs * rate) / rate
|
|
|
|
chirp = np.sin(2 * np.pi * f0 * (k ** t - 1) / np.log(k))
|
|
|
|
return chirp
|
|
|
|
|
|
|
|
|
|
|
|
@lru_cache
|
|
|
|
def linear_chirp(f0: float, f1: float, secs: float, rate: int):
|
|
|
|
"""Generate a linear chirp (with a linearly changing frequency)."""
|
|
|
|
n = int(secs * (f1 + f0) / 2)
|
|
|
|
secs = 2 * n / (f1 + f0)
|
|
|
|
c = (f1 - f0) / secs
|
|
|
|
t = np.arange(0, secs * rate) / rate
|
|
|
|
chirp = np.sin(2 * np.pi * (0.5 * c * t ** 2 + f0 * t))
|
|
|
|
return chirp
|
|
|
|
|
|
|
|
|
|
|
|
def resample(a: np.ndarray, size: int) -> np.ndarray:
|
|
|
|
"""
|
2022-09-13 09:46:18 +00:00
|
|
|
Re-sample the array ``a`` to the given new ``size`` in a way that
|
|
|
|
preserves the overall density.
|
2022-09-11 11:28:25 +00:00
|
|
|
"""
|
|
|
|
xp = np.linspace(0, 1, a.size)
|
2022-09-13 09:46:18 +00:00
|
|
|
yp = np.cumsum(a)
|
2022-09-11 11:28:25 +00:00
|
|
|
x = np.linspace(0, 1, size)
|
2022-09-13 09:46:18 +00:00
|
|
|
y = np.interp(x, xp, yp)
|
|
|
|
r = size / a.size * np.diff(y, prepend=0)
|
|
|
|
return r
|
2022-09-11 11:28:25 +00:00
|
|
|
|
|
|
|
|
|
|
|
@njit
|
|
|
|
def smooth(freq: np.ndarray, data: np.ndarray, smoothing: float) -> np.ndarray:
|
|
|
|
"""
|
|
|
|
Smooth the data with a smoothing strength proportional to
|
|
|
|
the given frequency array and overall smoothing factor.
|
|
|
|
The smoothing uses a double-pass exponential moving average (going
|
|
|
|
backward and forward).
|
|
|
|
"""
|
|
|
|
if not smoothing:
|
|
|
|
return data
|
|
|
|
weight = 1 / (1 + freq * 2 ** (smoothing / 2 - 15))
|
2022-09-13 09:52:22 +00:00
|
|
|
smoothed = np.empty_like(data)
|
2022-09-11 11:28:25 +00:00
|
|
|
prev = data[-1]
|
|
|
|
for i, w in enumerate(np.flip(weight), 1):
|
2022-09-13 09:52:22 +00:00
|
|
|
smoothed[-i] = prev = (1 - w) * prev + w * data[-i]
|
2022-09-11 11:28:25 +00:00
|
|
|
for i, w in enumerate(weight):
|
2022-09-13 09:52:22 +00:00
|
|
|
smoothed[i] = prev = (1 - w) * prev + w * smoothed[i]
|
|
|
|
return smoothed
|
2022-09-11 11:28:25 +00:00
|
|
|
|
|
|
|
|
|
|
|
@lru_cache
|
|
|
|
def window(size: int, beta: float) -> np.ndarray:
|
|
|
|
"""Kaiser tapering window."""
|
|
|
|
return np.kaiser(size, beta)
|
|
|
|
|
|
|
|
|
|
|
|
@lru_cache
|
|
|
|
def taper(y0: float, y1: float, size: int) -> np.ndarray:
|
|
|
|
"""Create a smooth transition from y0 to y1 of given size."""
|
|
|
|
tp = (y0 + y1 - (y1 - y0) * np.cos(np.linspace(0, np.pi, size))) / 2
|
|
|
|
return tp
|