From ef0f1cea7e9114be65df683bbf901bab46227ee3 Mon Sep 17 00:00:00 2001 From: Ewald de Wit Date: Fri, 23 Sep 2022 14:41:29 +0200 Subject: [PATCH] Add minimum-phase filter, issue #9 --- hifiscan/__init__.py | 4 +-- hifiscan/analyzer.py | 65 ++++++++++++++++++++++++++++++++++---------- hifiscan/app.py | 16 +++++++++-- 3 files changed, 66 insertions(+), 19 deletions(-) diff --git a/hifiscan/__init__.py b/hifiscan/__init__.py index 333363f..f77f35d 100644 --- a/hifiscan/__init__.py +++ b/hifiscan/__init__.py @@ -1,6 +1,6 @@ """'Optimize the frequency response spectrum of an audio system""" from hifiscan.analyzer import ( - Analyzer, XY, geom_chirp, linear_chirp, resample, smooth, taper, - tone, window) + Analyzer, XY, geom_chirp, linear_chirp, minimum_phase, resample, + smooth, taper, tone, window) from hifiscan.audio import Audio, read_correction, write_wav diff --git a/hifiscan/analyzer.py b/hifiscan/analyzer.py index 1aadc46..299a7d1 100644 --- a/hifiscan/analyzer.py +++ b/hifiscan/analyzer.py @@ -3,9 +3,9 @@ import types from functools import lru_cache from typing import List, NamedTuple, Optional, Tuple -from numba import njit - import numpy as np +from numba import njit +from numpy.fft import fft, ifft, irfft, rfft class XY(NamedTuple): @@ -103,9 +103,9 @@ class Analyzer: sz = len(recording) self.time = sz / self.rate if sz >= self.x.size: - Y = np.fft.fft(recording) - X = np.fft.fft(np.flip(self.x), n=sz) - corr = np.fft.ifft(X * Y).real + Y = fft(recording) + X = fft(np.flip(self.x), n=sz) + corr = ifft(X * Y).real idx = int(corr.argmax()) - self.x.size + 1 if idx >= 0: self.y = np.array(recording[idx:idx + self.x.size]) @@ -159,10 +159,10 @@ class Analyzer: return interp def X(self) -> np.ndarray: - return np.fft.rfft(self.x) + return rfft(self.x) def Y(self) -> np.ndarray: - return np.fft.rfft(self.y) + return rfft(self.y) def calcH(self) -> np.ndarray: """ @@ -171,7 +171,7 @@ class Analyzer: X = self.X() Y = self.Y() # H = Y / X - H = Y * np.conj(X) / (np.abs(X) ** 2 + 1e-3) + H = Y * np.conj(X) / (np.abs(X) ** 2 + 1e-6) if self._calibration: H *= 10 ** (-self.calibration() / 20) H = np.abs(H) @@ -199,7 +199,7 @@ class Analyzer: def h(self) -> XY: """Calculate impulse response ``h`` in the time domain.""" _, H = self.H() - h = np.fft.irfft(H) + h = irfft(H) h = np.hstack([h[h.size // 2:], h[0:h.size // 2]]) t = np.linspace(0, h.size / self.rate, h.size) return XY(t, h) @@ -223,7 +223,8 @@ class Analyzer: secs: float = 0.05, dbRange: float = 24, kaiserBeta: float = 5, - smoothing: float = 0) -> XY: + smoothing: float = 0, + minPhase: bool = False) -> XY: """ Calculate the inverse impulse response. @@ -232,6 +233,7 @@ class Analyzer: dbRange: Maximum attenuation in dB (power). kaiserBeta: Shape parameter of the Kaiser tapering window. smoothing: Strength of frequency-dependent smoothing. + minPhase: Use minimal-phase if True or linear-phase if False """ freq, H2 = self.H2(smoothing) # Apply target curve. @@ -239,6 +241,9 @@ class Analyzer: H2 = H2 * 10 ** (-self.target() / 10) # Re-sample to halve the number of samples needed. n = int(secs * self.rate / 2) + if minPhase: + # Later minimum phase filter will halve the size. + n *= 2 H = resample(H2, n) ** 0.5 # Accommodate the given dbRange from the top. H /= H.max() @@ -251,14 +256,16 @@ class Analyzer: Z = Z * phase # Calculate the inverse impulse response z. - z = np.fft.irfft(Z) + z = irfft(Z) z = z[:-1] z *= window(z.size, kaiserBeta) + if minPhase: + z = minimum_phase(z) + # Normalize using a fractal dimension for scaling. - dim = 1.5 + dim = 1.25 if minPhase else 1.5 norm = (np.abs(z) ** dim).sum() ** (1 / dim) z /= norm - # assert np.allclose(z[-(z.size // 2):][::-1], z[:z.size // 2]) t = np.linspace(0, z.size / self.rate, z.size) return XY(t, z) @@ -268,7 +275,7 @@ class Analyzer: Calculate correction factor for each frequency, given the inverse impulse response. """ - Z = np.abs(np.fft.rfft(invResp)) + Z = np.abs(rfft(invResp)) Z /= Z.max() freq = np.linspace(0, self.rate / 2, Z.size) return XY(freq, Z) @@ -388,3 +395,33 @@ def taper(y0: float, y1: float, size: int) -> np.ndarray: """Create a smooth transition from y0 to y1 of given size.""" tp = (y0 + y1 - (y1 - y0) * np.cos(np.linspace(0, np.pi, size))) / 2 return tp + + +def minimum_phase(x: np.ndarray) -> np.ndarray: + """ + Homomorphic filter to create a minimum-phase impulse from the given + symmetric odd-sized linear-phase impulse. + + https://www.rle.mit.edu/dspg/documents/AVOHomoorphic75.pdf + https://www.katjaas.nl/minimumphase/minimumphase.html + """ + mid = x.size // 2 + if not (x.size % 2 and np.allclose(x[:mid], x[-1:mid:-1])): + raise ValueError('Symmetric odd-sized array required') + # Go to frequency domain, oversampling 4x to avoid aliasing. + X = np.abs(fft(x, 4 * x.size)) + # Non-linear mapping. + XX = np.log(np.fmax(X, 1e-9)) + # Linear filter selects minimum phase part in the complex cepstrum. + xx = ifft(XX).real + yy = np.zeros_like(xx) + yy[0] = xx[0] + yy[1:mid + 1] = 2 * xx[1:mid + 1] + YY = fft(yy) + # Non-linear mapping back. + Y = np.exp(YY) + # Go back to time domain. + y = ifft(Y).real + # Take the valid part. + y_min = y[:mid + 1] + return y_min diff --git a/hifiscan/app.py b/hifiscan/app.py index 31e6785..5ce8d65 100644 --- a/hifiscan/app.py +++ b/hifiscan/app.py @@ -100,8 +100,9 @@ class App(qt.QMainWindow): dbRange = self.dbRange.value() beta = self.kaiserBeta.value() smoothing = self.irSmoothing.value() + minPhase = self.typeBox.currentIndex() == 1 - t, ir = analyzer.h_inv(secs, dbRange, beta, smoothing) + t, ir = analyzer.h_inv(secs, dbRange, beta, smoothing, minPhase) self.irPlot.setData(1000 * t, ir) logIr = np.log10(1e-8 + np.abs(ir)) @@ -139,9 +140,11 @@ class App(qt.QMainWindow): db = int(self.dbRange.value()) beta = int(self.kaiserBeta.value()) smoothing = int(self.irSmoothing.value()) - _, irInv = analyzer.h_inv(ms / 1000, db, beta, smoothing) + minPhase = self.typeBox.currentIndex() == 1 + _, irInv = analyzer.h_inv(ms / 1000, db, beta, smoothing, minPhase) - name = f'IR_{ms}ms_{db}dB_{beta}t_{smoothing}s.wav' + name = (f'IR_{ms}ms_{db}dB_{beta}t_{smoothing}s' + f'{"_minphase" if minPhase else ""}.wav') filename, _ = qt.QFileDialog.getSaveFileName( self, 'Save inverse impulse response', str(self.saveDir / name), 'WAV (*.wav)') @@ -276,6 +279,10 @@ class App(qt.QMainWindow): self.useBox.addItems(['Stored measurements', 'Last measurement']) self.useBox.currentIndexChanged.connect(self.plot) + self.typeBox = qt.QComboBox() + self.typeBox.addItems(['Zero phase', 'Zero latency']) + self.typeBox.currentIndexChanged.connect(self.plot) + exportButton = qt.QPushButton('Export as WAV') exportButton.setShortcut('E') exportButton.setToolTip('') @@ -295,6 +302,9 @@ class App(qt.QMainWindow): hbox.addWidget(qt.QLabel('Smoothing: ')) hbox.addWidget(self.irSmoothing) hbox.addSpacing(32) + hbox.addWidget(qt.QLabel('Type: ')) + hbox.addWidget(self.typeBox) + hbox.addSpacing(32) hbox.addWidget(qt.QLabel('Use: ')) hbox.addWidget(self.useBox) hbox.addStretch(1)