kopia lustrzana https://github.com/erdewit/HiFiScan
Use array.array for recording instread of numpy array, issue #2
rodzic
4084773a75
commit
aa9a30973d
|
@ -1,3 +1,4 @@
|
||||||
|
import array
|
||||||
from functools import lru_cache
|
from functools import lru_cache
|
||||||
from typing import NamedTuple, Tuple
|
from typing import NamedTuple, Tuple
|
||||||
|
|
||||||
|
@ -66,19 +67,20 @@ class Analyzer:
|
||||||
for meth in ['X', 'Y', 'H', 'H2', 'h', 'h_inv', 'spectrum']:
|
for meth in ['X', 'Y', 'H', 'H2', 'h', 'h_inv', 'spectrum']:
|
||||||
setattr(self, meth, lru_cache(getattr(self, meth)))
|
setattr(self, meth, lru_cache(getattr(self, meth)))
|
||||||
|
|
||||||
def findMatch(self, recording: np.ndarray) -> bool:
|
def findMatch(self, recording: array.array) -> bool:
|
||||||
"""
|
"""
|
||||||
Use correlation to find a match of the chirp in the recording.
|
Use correlation to find a match of the chirp in the recording.
|
||||||
If found, return True and store the system response as ``y``.
|
If found, return True and store the system response as ``y``.
|
||||||
"""
|
"""
|
||||||
self.time = recording.size / self.rate
|
sz = len(recording)
|
||||||
if recording.size >= self.x.size:
|
self.time = sz / self.rate
|
||||||
|
if sz >= self.x.size:
|
||||||
Y = np.fft.fft(recording)
|
Y = np.fft.fft(recording)
|
||||||
X = np.fft.fft(np.flip(self.x), n=recording.size)
|
X = np.fft.fft(np.flip(self.x), n=sz)
|
||||||
corr = np.fft.ifft(X * Y).real
|
corr = np.fft.ifft(X * Y).real
|
||||||
idx = int(corr.argmax()) - self.x.size + 1
|
idx = int(corr.argmax()) - self.x.size + 1
|
||||||
if idx >= 0:
|
if idx >= 0:
|
||||||
self.y = recording[idx:idx + self.x.size].copy()
|
self.y = np.array(recording[idx:idx + self.x.size], 'f')
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
|
@ -66,15 +66,14 @@ class Audio:
|
||||||
"""Is there sound playing from the play queue?"""
|
"""Is there sound playing from the play queue?"""
|
||||||
return bool(self.playQ)
|
return bool(self.playQ)
|
||||||
|
|
||||||
def record(self) -> AsyncIterator[np.ndarray]:
|
def record(self) -> AsyncIterator[array.array]:
|
||||||
"""
|
"""
|
||||||
Start a recording, yielding the entire recording every time a
|
Start a recording, yielding the entire recording every time a
|
||||||
new chunk is added. Note: The yielded array holds a memory reference
|
new chunk is added. The recording is a 32-bit float array.
|
||||||
that is only valid until the next chunk is added.
|
|
||||||
"""
|
"""
|
||||||
arr = array.array('f')
|
arr = array.array('f')
|
||||||
return self.recorded.map(arr.extend).map(
|
return self.recorded.map(arr.extend).map(
|
||||||
lambda _: np.frombuffer(arr, 'f')).aiter(skip_to_last=True)
|
lambda _: arr).aiter(skip_to_last=True)
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
|
|
Ładowanie…
Reference in New Issue