kopia lustrzana https://github.com/corrscope/corrscope
initial code
rodzic
5addd4f9d1
commit
e0bcd53be4
|
@ -0,0 +1 @@
|
|||
* text=auto
|
|
@ -0,0 +1,164 @@
|
|||
# Byte-compiled / optimized / DLL files
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
*$py.class
|
||||
|
||||
# C extensions
|
||||
*.so
|
||||
|
||||
# Distribution / packaging
|
||||
.Python
|
||||
build/
|
||||
develop-eggs/
|
||||
dist/
|
||||
downloads/
|
||||
eggs/
|
||||
.eggs/
|
||||
lib/
|
||||
lib64/
|
||||
parts/
|
||||
sdist/
|
||||
var/
|
||||
wheels/
|
||||
*.egg-info/
|
||||
.installed.cfg
|
||||
*.egg
|
||||
MANIFEST
|
||||
|
||||
# PyInstaller
|
||||
# Usually these files are written by a python script from a template
|
||||
# before PyInstaller builds the exe, so as to inject date/other infos into it.
|
||||
*.manifest
|
||||
*.spec
|
||||
|
||||
# Installer logs
|
||||
pip-log.txt
|
||||
pip-delete-this-directory.txt
|
||||
|
||||
# Unit test / coverage reports
|
||||
htmlcov/
|
||||
.tox/
|
||||
.coverage
|
||||
.coverage.*
|
||||
.cache
|
||||
nosetests.xml
|
||||
coverage.xml
|
||||
*.cover
|
||||
.hypothesis/
|
||||
.pytest_cache/
|
||||
|
||||
# Translations
|
||||
*.mo
|
||||
*.pot
|
||||
|
||||
# Django stuff:
|
||||
*.log
|
||||
.static_storage/
|
||||
.media/
|
||||
local_settings.py
|
||||
|
||||
# Flask stuff:
|
||||
instance/
|
||||
.webassets-cache
|
||||
|
||||
# Scrapy stuff:
|
||||
.scrapy
|
||||
|
||||
# Sphinx documentation
|
||||
docs/_build/
|
||||
|
||||
# PyBuilder
|
||||
target/
|
||||
|
||||
# Jupyter Notebook
|
||||
.ipynb_checkpoints
|
||||
|
||||
# pyenv
|
||||
.python-version
|
||||
|
||||
# celery beat schedule file
|
||||
celerybeat-schedule
|
||||
|
||||
# SageMath parsed files
|
||||
*.sage.py
|
||||
|
||||
# Environments
|
||||
.env
|
||||
.venv
|
||||
env/
|
||||
venv/
|
||||
ENV/
|
||||
env.bak/
|
||||
venv.bak/
|
||||
|
||||
# Spyder project settings
|
||||
.spyderproject
|
||||
.spyproject
|
||||
|
||||
# Rope project settings
|
||||
.ropeproject
|
||||
|
||||
# mkdocs documentation
|
||||
/site
|
||||
|
||||
# mypy
|
||||
.mypy_cache/
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and WebStorm
|
||||
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
|
||||
|
||||
# User-specific stuff:
|
||||
.idea/**/workspace.xml
|
||||
.idea/**/tasks.xml
|
||||
.idea/dictionaries
|
||||
|
||||
# Sensitive or high-churn files:
|
||||
.idea/**/dataSources/
|
||||
.idea/**/dataSources.ids
|
||||
.idea/**/dataSources.local.xml
|
||||
.idea/**/sqlDataSources.xml
|
||||
.idea/**/dynamic.xml
|
||||
.idea/**/uiDesigner.xml
|
||||
|
||||
# Gradle:
|
||||
.idea/**/gradle.xml
|
||||
.idea/**/libraries
|
||||
|
||||
# CMake
|
||||
cmake-build-debug/
|
||||
cmake-build-release/
|
||||
|
||||
# Mongo Explorer plugin:
|
||||
.idea/**/mongoSettings.xml
|
||||
|
||||
## File-based project format:
|
||||
*.iws
|
||||
|
||||
## Plugin-specific files:
|
||||
|
||||
# IntelliJ
|
||||
out/
|
||||
|
||||
# mpeltonen/sbt-idea plugin
|
||||
.idea_modules/
|
||||
|
||||
# JIRA plugin
|
||||
atlassian-ide-plugin.xml
|
||||
|
||||
# Cursive Clojure plugin
|
||||
.idea/replstate.xml
|
||||
|
||||
# Crashlytics plugin (for Android Studio and IntelliJ)
|
||||
com_crashlytics_export_strings.xml
|
||||
crashlytics.properties
|
||||
crashlytics-build.properties
|
||||
fabric.properties
|
||||
|
||||
|
||||
|
||||
*.ipynb
|
|
@ -0,0 +1,37 @@
|
|||
|
||||
|
||||
|
||||
# class EnumBodyDict(dict):
|
||||
# """ https://news.ycombinator.com/item?id=5691483 """
|
||||
# def __init__(self, *a, **kw):
|
||||
# self._keys_accessed = []
|
||||
# dict.__init__(self, *a, **kw)
|
||||
#
|
||||
# def __getitem__(self, key):
|
||||
# self._keys_accessed.append(key)
|
||||
# return dict.__getitem__(self, key)
|
||||
#
|
||||
#
|
||||
# class EnumMeta(type):
|
||||
# @classmethod
|
||||
# def __prepare__(metacls, name, bases):
|
||||
# return EnumBodyDict()
|
||||
#
|
||||
# def __new__(cls, name, bases, classdict):
|
||||
# next_enum_value = max(classdict.values()) + 1
|
||||
#
|
||||
# for name in classdict._keys_accessed:
|
||||
# if name not in classdict:
|
||||
# classdict[name] = next_enum_value
|
||||
# next_enum_value += 1
|
||||
#
|
||||
# return type.__new__(cls, name, bases, classdict)
|
||||
#
|
||||
#
|
||||
# class Enum(object, metaclass=EnumMeta):
|
||||
#
|
||||
#
|
||||
# # proposed enum implementation here
|
||||
#
|
||||
# # class Config(metaclass=Struct):
|
||||
#
|
|
@ -0,0 +1,2 @@
|
|||
if __name__ == '__main__':
|
||||
pass # todo
|
|
@ -0,0 +1,163 @@
|
|||
import weakref
|
||||
from pathlib import Path
|
||||
from typing import NamedTuple, Optional, List, Tuple
|
||||
|
||||
import click
|
||||
import numpy as np
|
||||
from scipy.io import wavfile
|
||||
|
||||
|
||||
class ScreenSize(NamedTuple):
|
||||
width: int
|
||||
height: int
|
||||
|
||||
|
||||
class Config(NamedTuple):
|
||||
wave_dir: str # TODO remove, a function will expand wildcards and create List[WaveConfig]
|
||||
master_wave: Optional[str]
|
||||
|
||||
fps: int
|
||||
# TODO algorithm and twiddle knobs
|
||||
|
||||
screen: ScreenSize
|
||||
|
||||
|
||||
Folder = click.Path(exists=True, file_okay=False)
|
||||
File = click.Path(exists=True, dir_okay=False)
|
||||
|
||||
FPS = 60 # fps
|
||||
|
||||
@click.command()
|
||||
@click.argument('wave_dir', type=Folder)
|
||||
@click.option('master_wave', type=File, default=None)
|
||||
@click.option('fps', default=FPS)
|
||||
def main(wave_dir: str, master_wave: Optional[str], fps: int):
|
||||
cfg = Config(
|
||||
wave_dir=wave_dir,
|
||||
master_wave=master_wave,
|
||||
fps=fps,
|
||||
screen=ScreenSize(640, 360) # todo
|
||||
)
|
||||
|
||||
ovgen = Ovgen(cfg)
|
||||
ovgen.write()
|
||||
|
||||
|
||||
COLOR_CHANNELS = 3
|
||||
|
||||
class Ovgen:
|
||||
def __init__(self, cfg: Config):
|
||||
self.cfg = cfg
|
||||
self.waves: List[Wave] = []
|
||||
|
||||
def write(self):
|
||||
self.load_waves() # self.waves =
|
||||
self.render()
|
||||
|
||||
def load_waves(self):
|
||||
wave_dir = Path(self.cfg.wave_dir)
|
||||
|
||||
for idx, path in enumerate(wave_dir.glob('*.wav')):
|
||||
wcfg = WaveConfig(
|
||||
wave_path=str(path),
|
||||
coords=self.get_coords(idx)
|
||||
)
|
||||
wave = Wave(wcfg, str(path))
|
||||
self.waves.append(wave)
|
||||
|
||||
def get_coords(self, idx: int):
|
||||
# TODO multi column
|
||||
screen = self.cfg.screen
|
||||
|
||||
width = screen.width
|
||||
height = screen.height // len(self.waves) # todo +1 if we draw the overall waveform
|
||||
x = 0
|
||||
y = height * idx
|
||||
|
||||
return Coords(x=x, y=y, width=width, height=height)
|
||||
|
||||
def render(self):
|
||||
# Calculate number of frames (TODO master file?)
|
||||
fps = self.cfg.fps
|
||||
nframes = fps * self.waves[0].get_s()
|
||||
nframes = int(nframes) + 1
|
||||
|
||||
screen = self.cfg.screen
|
||||
sc = np.ndarray((screen.height, screen.width, COLOR_CHANNELS), np.int8) # TODO https://matplotlib.org/gallery/subplots_axes_and_figures/ganged_plots.html
|
||||
|
||||
# For each frame, render each wave
|
||||
for frame in range(nframes):
|
||||
second = frame / fps
|
||||
|
||||
for wave in self.waves:
|
||||
sample = round(wave.smp_s * second)
|
||||
trigger_sample = wave.trigger.get_trigger(sample)
|
||||
|
||||
# render todo
|
||||
image = wave.render(trigger_sample)
|
||||
# blit TODO
|
||||
|
||||
|
||||
class Coords(NamedTuple):
|
||||
""" x is right, y is down """
|
||||
x: int
|
||||
y: int
|
||||
width: int
|
||||
height: int
|
||||
|
||||
|
||||
class WaveConfig(NamedTuple):
|
||||
wave_path: str
|
||||
coords: Coords
|
||||
# TODO color
|
||||
|
||||
|
||||
class Wave:
|
||||
def __init__(self, wcfg: WaveConfig, wave_path: str):
|
||||
self.wcfg = wcfg
|
||||
self.smp_s, self.data = wavfile.read(wave_path)
|
||||
|
||||
# FIXME cfg
|
||||
frames = 1
|
||||
self.trigger = Trigger(self, self.smp_s // FPS * frames, 0.1)
|
||||
|
||||
def get_smp(self) -> int:
|
||||
return len(self.data)
|
||||
|
||||
def get_s(self) -> float:
|
||||
"""
|
||||
:return: time (seconds)
|
||||
"""
|
||||
return self.get_smp() / self.smp_s
|
||||
|
||||
def render(self, trigger_sample: int) -> np.ndarray:
|
||||
"""
|
||||
:param trigger_sample: Sample index
|
||||
:return: image or something
|
||||
"""
|
||||
pass # TODO
|
||||
|
||||
|
||||
class Trigger:
|
||||
def __init__(self, wave: Wave, scan_nsamp: int, align_amount: float):
|
||||
"""
|
||||
Correlation-based trigger which looks at a window of `scan_length` samples.
|
||||
|
||||
it's complicated
|
||||
|
||||
:param wave: Wave file
|
||||
:param scan_nsamp: Number of samples used to align adjacent frames
|
||||
:param align_amount: Amount of centering to apply to each frame, within [0, 1]
|
||||
"""
|
||||
|
||||
# probably unnecessary
|
||||
self.wave = weakref.proxy(wave)
|
||||
self.scan_nsamp = scan_nsamp
|
||||
self.align_amount = align_amount
|
||||
|
||||
def get_trigger(self, offset: int) -> int:
|
||||
"""
|
||||
:param offset: sample index
|
||||
:return: new sample index, corresponding to rising edge
|
||||
"""
|
||||
return offset # todo
|
|
@ -0,0 +1,106 @@
|
|||
from typing import List, Union
|
||||
|
||||
import numpy as np
|
||||
from wavetable.gauss import nyquist_exclusive, nyquist_inclusive
|
||||
|
||||
|
||||
# I don't know why, but "inclusive" makes my test cases work.
|
||||
NYQUIST = nyquist_inclusive
|
||||
|
||||
|
||||
def _zoh_transfer(nsamp):
|
||||
nyquist = NYQUIST(nsamp)
|
||||
return np.sinc(np.arange(nyquist) / nsamp)
|
||||
|
||||
|
||||
def _realify(a: np.ndarray):
|
||||
return np.copysign(np.abs(a), a.real)
|
||||
|
||||
|
||||
InputArray = Union[np.ndarray, List[complex]]
|
||||
|
||||
|
||||
def rfft_norm(signal: InputArray, *args, **kwargs) -> np.ndarray:
|
||||
""" Computes "normalized" FFT of signal. """
|
||||
return np.fft.rfft(signal, *args, **kwargs) / len(signal)
|
||||
|
||||
|
||||
def irfft_norm(spectrum: InputArray, nsamp=None, *args, **kwargs) -> np.ndarray:
|
||||
""" Computes "normalized" signal of spectrum. """
|
||||
signal = np.fft.irfft(spectrum, nsamp, *args, **kwargs)
|
||||
return signal * len(signal)
|
||||
|
||||
|
||||
def rfft_zoh(signal: InputArray):
|
||||
""" Computes "normalized" FFT of signal, with simulated ZOH frequency response. """
|
||||
nsamp = len(signal)
|
||||
spectrum = rfft_norm(signal)
|
||||
|
||||
# Muffle everything ~~but Nyquist~~, like real hardware.
|
||||
# Nyquist is already real.
|
||||
nyquist = NYQUIST(nsamp)
|
||||
spectrum[:nyquist] *= _zoh_transfer(nsamp)
|
||||
|
||||
return spectrum
|
||||
|
||||
|
||||
def _zero_pad(spectrum: InputArray, harmonic):
|
||||
""" Do not use, concatenating a waveform multiple times works just as well. """
|
||||
|
||||
# https://stackoverflow.com/a/5347492/2683842
|
||||
nyquist = len(spectrum) - 1
|
||||
padded = np.zeros(nyquist * harmonic + 1, dtype=complex)
|
||||
padded[::harmonic] = spectrum
|
||||
return padded
|
||||
|
||||
|
||||
def irfft_zoh(spectrum: InputArray, nsamp=None):
|
||||
""" Computes "normalized" signal of spectrum, counteracting ZOH frequency response. """
|
||||
compute_nsamp = (nsamp is None)
|
||||
|
||||
if nsamp is None:
|
||||
nsamp = 2 * (len(spectrum) - 1)
|
||||
nin = nsamp // 2 + 1
|
||||
|
||||
if compute_nsamp:
|
||||
assert nin == len(spectrum)
|
||||
|
||||
spectrum = np.copy(spectrum[:nin])
|
||||
|
||||
# Treble-boost everything ~~but Nyquist~~.
|
||||
# Make Nyquist purely real.
|
||||
nyquist = NYQUIST(nsamp)
|
||||
real = nyquist_exclusive(nsamp)
|
||||
|
||||
spectrum[:nyquist] /= _zoh_transfer(nsamp)
|
||||
spectrum[real:] = _realify(spectrum[real:])
|
||||
|
||||
return irfft_norm(spectrum, nsamp)
|
||||
|
||||
|
||||
def irfft_old(spectrum: InputArray, nsamp=None):
|
||||
"""
|
||||
Calculate the inverse Fourier transform of $spectrum, optionally
|
||||
truncating to $nsamp entries.
|
||||
|
||||
if nsamp is None, calculate nsamp = 2*(len-1).
|
||||
nin = (nsamp//2) + 1.
|
||||
|
||||
if nsamp is even, realify the last coeff to preserve Nyquist energy.
|
||||
"""
|
||||
compute_nsamp = (nsamp is None)
|
||||
|
||||
if nsamp is None:
|
||||
nsamp = 2 * (len(spectrum) - 1)
|
||||
nin = nsamp // 2 + 1
|
||||
|
||||
if compute_nsamp:
|
||||
assert nin == len(spectrum)
|
||||
|
||||
spectrum = spectrum[:nin].copy()
|
||||
|
||||
if nsamp % 2 == 0:
|
||||
last = spectrum[-1]
|
||||
spectrum[-1] = np.copysign(abs(last), last.real)
|
||||
|
||||
return irfft_norm(spectrum, nsamp)
|
|
@ -0,0 +1,13 @@
|
|||
from setuptools import setup
|
||||
|
||||
setup(
|
||||
name='ovgenpy',
|
||||
version='0',
|
||||
packages=[''],
|
||||
url='',
|
||||
license='BSD-2-Clause',
|
||||
author='nyanpasu64',
|
||||
author_email='',
|
||||
description='',
|
||||
install_requires=['numpy', 'scipy', 'imageio', 'click'] # 'pyqt5'
|
||||
)
|
Ładowanie…
Reference in New Issue