HiFiScan/hifiscan/analyzer.py

464 wiersze
14 KiB
Python

import array
import types
from functools import lru_cache
from typing import NamedTuple, Optional, Tuple
import numpy as np
from numpy.fft import fft, ifft, irfft, rfft
try:
from numba import njit
except ImportError:
def njit(f):
return f
from hifiscan.io_ import Correction
class XY(NamedTuple):
"""XY coordinate data of arrays of the same length."""
x: np.ndarray
y: np.ndarray
class Analyzer:
"""
Analyze the system response to a chirp stimulus.
Args:
f0: Start frequency of chirp [Hz].
f1: End frequency of chirp [Hz].
secs: Duration of chirp.
rate: Audio sample rate [Hz].
ampl: Amplitude of chirp.
calibration: Microphone calibration.
target: Target curve.
Symbols that are used:
x: stimulus
y: response = x * h
X = FT(x)
Y = FT(y) = X . H
H: system transfer function = X / Y
h: system impulse response = IFT(H)
h_inv: inverse system impulse response (which undoes h) = IFT(1 / H)
with:
*: convolution operator
FT: Fourier transform
IFT: Inverse Fourier transform
"""
MAX_DELAY_SECS = 0.1
TIMEOUT_SECS = 1.0
CACHED_METHODS = [
'X', 'Y', 'calcH', 'H', 'H2', 'h', 'h_inv', 'spectrum',
'frequency', 'calibration', 'target']
chirp: np.ndarray
x: np.ndarray
y: np.ndarray
sumH: np.ndarray
numMeasurements: int
rate: int
fmin: float
fmax: float
time: float
def __init__(
self, f0: int, f1: int, secs: float, rate: int, ampl: float,
calibration: Optional[Correction] = None,
target: Optional[Correction] = None):
self.chirp = ampl * geom_chirp(f0, f1, secs, rate)
self.x = np.concatenate([
self.chirp,
np.zeros(int(self.MAX_DELAY_SECS * rate))
])
self.y = np.zeros(self.x.size)
self.rate = rate
self.fmin = min(f0, f1)
self.fmax = max(f0, f1)
self.time = 0
self.sumH = np.zeros(self.X().size)
self.numMeasurements = 0
self._calibration = calibration
self._target = target
def __getstate__(self):
return {k: v for k, v in self.__dict__.items()
if k not in self.CACHED_METHODS}
def setCaching(self):
"""
Cache the main methods in a way that allows garbage collection of self.
Calling this method again will in effect clear the previous caching.
"""
for name in self.CACHED_METHODS:
unbound = getattr(Analyzer, name)
bound = types.MethodType(unbound, self)
setattr(self, name, lru_cache(bound))
def addMeasurements(self, analyzer):
"""Add measurements from other analyzer to this one."""
if not self.isCompatible(analyzer):
raise ValueError('Incompatible analyzers')
self.sumH = self.sumH + analyzer.sumH
self.numMeasurements += analyzer.numMeasurements
self.setCaching()
def isCompatible(self, analyzer):
"""
See if other analyzer is compatible for adding measurement to this one.
"""
return isinstance(analyzer, Analyzer) and np.array_equal(
analyzer.x, self.x)
def findMatch(self, recording: array.array) -> bool:
"""
Use correlation to find a match of the chirp in the recording.
If found, return True and store the system response as ``y``.
"""
sz = len(recording)
self.time = sz / self.rate
if sz >= self.x.size:
Y = fft(recording)
X = fft(np.flip(self.x), n=sz)
corr = ifft(X * Y).real
idx = int(corr.argmax()) - self.x.size + 1
if idx >= 0:
self.y = np.array(recording[idx:idx + self.x.size])
self.numMeasurements += 1
self.sumH += self.calcH()
self.setCaching()
return True
return False
def timedOut(self) -> bool:
"""See if time to find a match has exceeded the timeout limit."""
return self.time > self.x.size / self.rate + self.TIMEOUT_SECS
def frequency(self) -> np.ndarray:
"""Frequency array, from 0 to the Nyquist frequency."""
return np.linspace(0, self.rate // 2, self.X().size)
def freqRange(self, size: int = 0) -> slice:
"""
Return range slice of the valid frequency range for an array
of given size.
"""
size = size or self.X().size
nyq = self.rate / 2
i0 = min(size - 1, int(0.5 + size * self.fmin / nyq))
i1 = min(size - 1, int(0.5 + size * self.fmax / nyq))
return slice(i0, i1 + 1)
def calibration(self) -> Optional[np.ndarray]:
"""Interpolated calibration curve."""
return self.interpolateCorrection(self._calibration)
def target(self) -> Optional[np.ndarray]:
"""Interpolated target curve."""
return self.interpolateCorrection(self._target)
def interpolateCorrection(
self, corr: Optional[Correction]) -> Optional[np.ndarray]:
"""
Logarithmically interpolate the correction to a full-sized array.
"""
if not corr:
return None
corr = sorted(c for c in corr if c[0] > 0)
a = np.array(corr, 'd').T
logF = np.log(a[0])
db = a[1]
freq = self.frequency()
interp = np.empty_like(freq)
interp[0] = 0
interp[1:] = np.interp(np.log(freq[1:]), logF, db)
return interp
def X(self) -> np.ndarray:
return rfft(self.x)
def Y(self) -> np.ndarray:
return rfft(self.y)
def calcH(self) -> np.ndarray:
"""
Calculate transfer function H of the last measurement.
"""
X = self.X()
Y = self.Y()
# H = Y / X
H = Y * np.conj(X) / (np.abs(X) ** 2 + 1e-6)
if self._calibration:
H *= 10 ** (-self.calibration() / 20)
H = np.abs(H)
return H
def H(self) -> XY:
"""
Transfer function H averaged over all measurements.
"""
freq = self.frequency()
H = self.sumH / (self.numMeasurements or 1)
return XY(freq, H)
def H2(self, smoothing: float) -> XY:
"""Calculate smoothed squared transfer function |H|^2."""
freq, H = self.H()
r = self.freqRange()
H2 = np.empty_like(H)
# Perform smoothing on the squared amplitude.
H2[r] = smooth(freq[r], H[r] ** 2, smoothing)
H2[:r.start] = H2[r.start]
H2[r.stop:] = H2[r.stop - 1]
return XY(freq, H2)
def h(self) -> XY:
"""Calculate impulse response ``h`` in the time domain."""
_, H = self.H()
h = irfft(H)
h = np.hstack([h[h.size // 2:], h[0:h.size // 2]])
t = np.linspace(0, h.size / self.rate, h.size)
return XY(t, h)
def spectrum(self, smoothing: float = 0) -> XY:
"""
Calculate the frequency response in the valid frequency range,
with optional smoothing.
Args:
smoothing: Determines the overall strength of the smoothing.
Useful values are from 0 to around 30.
If 0 then no smoothing is done.
"""
freq, H2 = self.H2(smoothing)
r = self.freqRange()
return XY(freq[r], 10 * np.log10(H2[r]))
def h_inv(
self,
secs: float = 0.05,
dbRange: float = 24,
kaiserBeta: float = 5,
smoothing: float = 0,
causality: float = 0) -> XY:
"""
Calculate the inverse impulse response.
Args:
secs: Desired length of the response in seconds.
dbRange: Maximum attenuation in dB (power).
kaiserBeta: Shape parameter of the Kaiser tapering window.
smoothing: Strength of frequency-dependent smoothing.
causality: 0 = linear-phase a-causal, 1 = minimum-phase causal.
"""
freq, H2 = self.H2(smoothing)
# Apply target curve.
if self._target:
H2 = H2 * 10 ** (-self.target() / 10)
# Re-sample to halve the number of samples needed.
n = int(secs * self.rate / 2)
H = resample(H2, n) ** 0.5
# Accommodate the given dbRange from the top.
H /= H.max()
H = np.fmax(H, 10 ** (-dbRange / 20))
# Calculate Z, the reciprocal transfer function with added
# linear phase. This phase will shift and center z.
Z = 1 / H
phase = np.exp(Z.size * 1j * np.linspace(0, np.pi, Z.size))
Z = Z * phase
# Calculate the inverse impulse response z.
z = irfft(Z)
z = z[:-1]
z *= window(z.size, kaiserBeta)
if causality:
z = transform_causality(z, causality)
# Normalize using a fractal dimension for scaling.
dim = 1.5 - 0.25 * causality
norm = (np.abs(z) ** dim).sum() ** (1 / dim)
z /= norm
t = np.linspace(0, z.size / self.rate, z.size)
return XY(t, z)
def correctionFactor(self, h_inv: np.ndarray) -> XY:
"""
Calculate correction factor for each frequency, given the
inverse impulse response.
"""
Z = np.abs(rfft(h_inv))
Z /= Z.max()
freq = np.linspace(0, self.rate / 2, Z.size)
return XY(freq, Z)
def correctedSpectrum(self, corrFactor: XY) -> Tuple[XY, XY]:
"""
Simulate the frequency response of the system when it has
been corrected with the given transfer function.
"""
freq, H2 = self.H2(0)
H = H2 ** 0.5
r = self.freqRange()
tf = resample(corrFactor.y, H.size)
resp = 20 * np.log10(tf[r] * H[r])
spectrum = XY(freq[r], resp)
H = resample(H2, corrFactor.y.size) ** 0.5
rr = self.freqRange(corrFactor.y.size)
resp = 20 * np.log10(corrFactor.y[rr] * H[rr])
spectrum_resamp = XY(corrFactor.x[rr], resp)
return spectrum, spectrum_resamp
def targetSpectrum(self, spectrum: XY) -> Optional[XY]:
if self._target:
freq, resp = spectrum
r = self.freqRange()
target = self.target()[r]
target += np.average(resp - target, weights=1 / freq)
targetSpectrum = XY(freq, target)
else:
targetSpectrum = None
return targetSpectrum
@lru_cache
def tone(f: float, secs: float, rate: int):
"""Generate a sine wave."""
n = int(secs * f)
secs = n / f
t = np.arange(0, secs * rate) / rate
sine = np.sin(2 * np.pi * f * t)
return sine
@lru_cache
def geom_chirp(f0: float, f1: float, secs: float, rate: int):
"""
Generate a geometric chirp (with an exponentially changing frequency).
To avoid a clicking sound at the end, the last sample should be near
zero. This is done by slightly modifying the time interval to fit an
integer number of cycli.
"""
n = int(secs * (f1 - f0) / np.log(f1 / f0))
k = np.exp((f1 - f0) / n) # =~ exp[log(f1/f0) / secs]
secs = np.log(f1 / f0) / np.log(k)
t = np.arange(0, secs * rate) / rate
chirp = np.sin(2 * np.pi * f0 * (k ** t - 1) / np.log(k))
return chirp
@lru_cache
def linear_chirp(f0: float, f1: float, secs: float, rate: int):
"""Generate a linear chirp (with a linearly changing frequency)."""
n = int(secs * (f1 + f0) / 2)
secs = 2 * n / (f1 + f0)
c = (f1 - f0) / secs
t = np.arange(0, secs * rate) / rate
chirp = np.sin(2 * np.pi * (0.5 * c * t ** 2 + f0 * t))
return chirp
def resample(a: np.ndarray, size: int) -> np.ndarray:
"""
Re-sample the array ``a`` to the given new ``size`` in a way that
preserves the overall density.
"""
xp = np.linspace(0, 1, a.size)
yp = np.cumsum(a)
x = np.linspace(0, 1, size)
y = np.interp(x, xp, yp)
r = size / a.size * np.diff(y, prepend=0)
return r
@njit
def smooth(freq: np.ndarray, data: np.ndarray, smoothing: float) -> np.ndarray:
"""
Smooth the data with a smoothing strength proportional to
the given frequency array and overall smoothing factor.
The smoothing uses a double-pass exponential moving average (going
backward and forward).
"""
if not smoothing:
return data
weight = 1 / (1 + 2 ** (smoothing / 2 - 15) * freq)
smoothed = np.empty_like(data)
prev = data[-1]
for i, w in enumerate(np.flip(weight), 1):
smoothed[-i] = prev = (1 - w) * prev + w * data[-i]
for i, w in enumerate(weight):
smoothed[i] = prev = (1 - w) * prev + w * smoothed[i]
return smoothed
@lru_cache
def window(size: int, beta: float) -> np.ndarray:
"""Kaiser tapering window."""
return np.kaiser(size, beta)
@lru_cache
def taper(y0: float, y1: float, size: int) -> np.ndarray:
"""Create a smooth transition from y0 to y1 of given size."""
tp = (y0 + y1 - (y1 - y0) * np.cos(np.linspace(0, np.pi, size))) / 2
return tp
def transform_causality(x: np.ndarray, causality: float = 1) -> np.ndarray:
"""
Homomorphic filter to create a new impulse of desired causality from
the given impulse.
Params:
causality: 0 = linear-phase, 1 = minimum-phase,
in-between values smoothly transition between the two.
https://www.rle.mit.edu/dspg/documents/AVOHomoorphic75.pdf
https://www.katjaas.nl/minimumphase/minimumphase.html
"""
# Go to frequency domain, oversampling 4x to avoid aliasing.
X = np.abs(fft(x, 4 * x.size))
# Non-linear mapping.
XX = np.log(np.fmax(X, 1e-9))
# Linear filter to apply the desired amount of causal (right)
# and anti-causal (left) parts to the complex cepstrum.
xx = ifft(XX).real
mid = x.size // 2
left = slice(-1, -mid - 1, -1)
right = slice(1, mid + 1)
yy = np.zeros_like(xx)
yy[0] = xx[0]
yy[left] = (1 - causality) * xx[right]
yy[right] = (1 + causality) * xx[right]
YY = fft(yy)
# Non-linear mapping back.
Y = np.exp(YY)
# Go back to time domain.
y = ifft(Y).real
# Infer the original linear-phase filter size.
if np.allclose(x[0:mid + 1], x[-1:mid - 1:-1]):
# x is symmetric, so it's linear-phase.
orig_sz = x.size
else:
# Estimate based on location of peak.
p = x.argmax()
orig_sz = 2 * (x.size - p) - 1
# src_causality = (3 - (x.size + p) / (x.size - p)) / 2
# Roll and resize.
y = np.roll(y, int(orig_sz * (1 - causality) / 2))
sz = int(0.5 + orig_sz * (1 - causality / 2))
y = y[:sz]
return y