# audio.py # Released under the MIT License (MIT). See LICENSE. # Copyright (c) 2021 Peter Hinch import hardware_setup # Create a display instance from gui.core.ugui import Screen, ssd from machine import I2S from machine import Pin import pyb # *************** # Do allocations early BUFSIZE = 1024*20 # 5.8ms/KiB 8KiB occasional dropouts WAVSIZE = 1024*2 root = "/sd/music" # Location of directories containing albums pyb.Pin("EN_3V3").on() # provide 3.3V on 3V3 output pin # ======= I2S CONFIGURATION ======= I2S_ID = 1 # allocate sample array once wav_samples = bytearray(WAVSIZE) # The proper way is to parse the WAV file as per # https://github.com/miketeachman/micropython-i2s-examples/blob/master/examples/wavplayer.py # Here for simplicity we assume stereo files ripped from CD's. config = { 'sck' : Pin('W29'), 'ws' : Pin('W16'), 'sd' : Pin('Y4'), 'mode' : I2S.TX, 'bits' : 16, # Sample size in bits/channel 'format' : I2S.STEREO, 'rate' : 44100, # Sample rate in Hz 'ibuf' : BUFSIZE, # Buffer size } audio_out = I2S(I2S_ID, **config) # ======= GUI ======= from gui.widgets import Button, CloseButton, HorizSlider, Listbox, Label from gui.core.writer import CWriter # Font for CWriter import gui.fonts.arial10 as arial10 import gui.fonts.icons as icons from gui.core.colors import * import os import gc import uasyncio as asyncio import sys # Initial check on filesystem try: subdirs = [x[0] for x in os.ilistdir(root) if x[1] == 0x4000] if len(subdirs): subdirs.sort() else: print("No albums found in ", root) sys.exit(1) except OSError: print(f"Expected {root} directory not found.") sys.exit(1) class SelectScreen(Screen): songs = [] album = "" def __init__(self, wri): super().__init__() Listbox(wri, 2, 2, elements = subdirs, dlines = 8, width=100, callback = self.lbcb) def lbcb(self, lb): # sort directory = ''.join((root, '/', lb.textvalue())) songs = [x[0] for x in os.ilistdir(directory) if x[1] != 0x4000] songs.sort() SelectScreen.songs = [''.join((directory, '/', x)) for x in songs] SelectScreen.album = lb.textvalue() Screen.back() class BaseScreen(Screen): def __init__(self): self.swriter = asyncio.StreamWriter(audio_out) args = { 'bdcolor' : RED, 'slotcolor' : BLUE, 'legends' : ('-48dB', '-24dB', '0dB'), 'value' : 0.5, 'height' : 15, } buttons = { 'shape' : CIRCLE, 'fgcolor' : GREEN, } super().__init__() # Audio status self.playing = False # Track is playing self.stop_play = False # Command self.paused = False self.songs = [] # Paths to songs in album self.song_idx = 0 # Current index into .songs self.offset = 0 # Offset into file self.volume = -3 wri = CWriter(ssd, arial10, GREEN, BLACK, False) wri_icons = CWriter(ssd, icons, WHITE, BLACK, False) Button(wri_icons, 2, 2, text='E', callback=self.new, args=(wri,), **buttons) # New Button(wri_icons, row := 30, col := 2, text='D', callback=self.replay, **buttons) # Replay Button(wri_icons, row, col := col + 25, text='F', callback=self.play_cb, **buttons) # Play Button(wri_icons, row, col := col + 25, text='B', callback=self.pause, **buttons) # Pause Button(wri_icons, row, col := col + 25, text='A', callback=self.stop, **buttons) # Stop Button(wri_icons, row, col + 25, text='C', callback=self.skip, **buttons) # Skip row = 60 col = 2 self.lbl = Label(wri, row, col, 120) self.lblsong = Label(wri, self.lbl.mrow + 2, col, 120) row = 110 col = 14 HorizSlider(wri, row, col, callback=self.slider_cb, **args) CloseButton(wri) # Quit the application # self.reg_task(asyncio.create_task(self.report())) async def report(self): while True: gc.collect() print(gc.mem_free()) await asyncio.sleep(20) def slider_cb(self, s): self.volume = round(8 * (s.value() - 1)) def play_cb(self, _): self.play_album() def pause(self, _): self.stop_play = True self.paused = True self.show_song() def stop(self, _): # Abandon album self.stop_play = True self.paused = False self.song_idx = 0 self.show_song() def replay(self, _): if self.stop_play: self.song_idx = max(0, self.song_idx - 1) else: self.stop_play = True # Replay from start self.paused = False self.show_song() #self.play_album() def skip(self, _): self.stop_play = True self.paused = False self.song_idx = min(self.song_idx + 1, len(self.songs) -1) self.show_song() #self.play_album() def new(self, _, wri): self.stop_play = True self.paused = False Screen.change(SelectScreen, args=[wri,]) def play_album(self): if not self.playing: self.reg_task(asyncio.create_task(self.album_task())) def after_open(self): self.songs = SelectScreen.songs self.lbl.value(SelectScreen.album) if self.songs: self.song_idx = 0 # Start on track 0 self.show_song() #self.play_album() def show_song(self): # 13ms song = self.songs[self.song_idx] ns = song.find(SelectScreen.album) ne = song[ns:].find('/') + 1 end = song[ns + ne:].find(".wav") self.lblsong.value(song[ns + ne: ns + ne + end]) async def refresh_and_stop(self): Screen.rfsh_start.set() # Allow refresh Screen.rfsh_done.clear() await Screen.rfsh_done.wait() # Wait for a refresh to end Screen.rfsh_start.clear() # Prevent another. async def refresh_ctrl(self): # Enter with refresh paused await asyncio.sleep_ms(100) # Time for initial buffer fill try: while True: await self.refresh_and_stop() # Allow one screen refresh await asyncio.sleep_ms(20) # Time for buffer top-up finally: # Allow refresh to free-run Screen.rfsh_start.set() async def album_task(self): self.playing = True # Prevent other instances self.stop_play = False # Leave paused status unchanged songs = self.songs[self.song_idx :] # Start from current index for song in songs: self.show_song() await self.refresh_and_stop() # Pause until refresh is stopped. # Delay refresh to ensure buffer is filled rc = asyncio.create_task(self.refresh_ctrl()) await asyncio.sleep_ms(0) await self.play_song(song) rc.cancel() # Restore normal display refresh if self.stop_play: break # A callback has stopped playback self.song_idx += 1 else: self.song_idx = 0 # Played to completion. self.show_song() self.playing = False rc.cancel() # Restore normal display refresh # Open and play a binary wav file async def play_song(self, song): wav_samples_mv = memoryview(wav_samples) size = len(wav_samples) if not self.paused: # advance to first byte of Data section in WAV file. This is not # correct for all WAV files. See link above. self.offset = 44 swriter = self.swriter with open(song, "rb") as wav: _ = wav.seek(self.offset) while (num_read := wav.readinto(wav_samples_mv)) and not self.stop_play: I2S.shift(buf=wav_samples_mv[:num_read], bits=16, shift=self.volume) # HACK awaiting https://github.com/micropython/micropython/pull/7868 swriter.out_buf = wav_samples_mv[:num_read] await swriter.drain() # wav_samples is now empty self.offset += size def test(): print('Audio demo.') try: Screen.change(BaseScreen) # A class is passed here, not an instance. finally: audio_out.deinit() print("========== CLOSE AUDIO ==========") test()