implemented test-module
updated user-interaction not working: record_sound()
This commit is contained in:
parent
1fc554ae25
commit
77eb772776
2 changed files with 39 additions and 15 deletions
|
@ -151,8 +151,12 @@ def turn_off(hal: PizzaHAL):
|
|||
hal.send_cmd(SerialCommands.FRONTLIGHT, 0)
|
||||
|
||||
|
||||
def wait_for_input(hal: PizzaHAL, go_callback: Any,
|
||||
back_callback: Any, to_callback: Any,
|
||||
def wait_for_input(hal: PizzaHAL,
|
||||
blue_callback: Any = None,
|
||||
red_callback: Any = None,
|
||||
yellow_callback: Any = None,
|
||||
green_callback: Any = None,
|
||||
timeout_callback: Any = None,
|
||||
timeout=120, **kwargs):
|
||||
"""
|
||||
Blink leds on buttons. Wait until the user presses a button, then execute
|
||||
|
@ -164,13 +168,28 @@ def wait_for_input(hal: PizzaHAL, go_callback: Any,
|
|||
:param to_callback: called on timeout
|
||||
:param timeout: inactivity timeout in seconds (default 120)
|
||||
"""
|
||||
resp = hal.send_cmd(SerialCommands.USER_INTERACTION, timeout).strip()
|
||||
if resp == b'B':
|
||||
go_callback(**kwargs)
|
||||
elif resp == b'R':
|
||||
back_callback(**kwargs)
|
||||
else:
|
||||
to_callback(**kwargs)
|
||||
timeout *= 1000
|
||||
|
||||
bitmask = (1 if blue_callback else 0) | \
|
||||
(2 if red_callback else 0) | \
|
||||
(4 if yellow_callback else 0) | \
|
||||
(8 if green_callback else 0)
|
||||
|
||||
resp = hal.send_cmd(SerialCommands.USER_INTERACT, bitmask.to_bytes(1, 'little', signed=False), timeout.to_bytes(4, 'little', signed=False))
|
||||
if (not resp.startswith(SerialCommands.RECEIVED.value)) or (len(resp) != 3):
|
||||
raise SerialCommunicationError(f'USER_INTERACTION received {resp} (expected 3 bytes, starting with 0x03)')
|
||||
|
||||
resp = resp[1]
|
||||
if resp == 1:
|
||||
blue_callback(**kwargs)
|
||||
elif resp == 2:
|
||||
red_callback(**kwargs)
|
||||
elif resp == 4:
|
||||
yellow_callback(**kwargs)
|
||||
elif resp == 8:
|
||||
green_callback(**kwargs)
|
||||
elif timeout_callback is not None:
|
||||
timeout_callback(**kwargs)
|
||||
|
||||
|
||||
def light_layer(hal: PizzaHAL, r: float, g: float, b: float, w: float, fade: float = 0.0, **kwargs):
|
||||
|
@ -213,7 +232,7 @@ def backlight(hal: PizzaHAL, r: float, g: float, b: float, w: float, fade: float
|
|||
int(fade * 1000).to_bytes(4, 'little'))
|
||||
|
||||
|
||||
def play_sound(hal: PizzaHAL, sound: Any, **kwargs):
|
||||
def play_sound(hal: PizzaHAL, sound: Any, interruptable: bool = False, **kwargs):
|
||||
"""
|
||||
Play a sound.
|
||||
|
||||
|
@ -222,6 +241,7 @@ def play_sound(hal: PizzaHAL, sound: Any, **kwargs):
|
|||
"""
|
||||
# Extract data and sampling rate from file
|
||||
try:
|
||||
# TODO implement interruption
|
||||
data, fs = hal.soundcache.get(str(sound), sf.read(str(sound), dtype='float32'))
|
||||
sd.play(data, fs)
|
||||
sd.wait() # Wait until file is done playing
|
||||
|
@ -243,11 +263,11 @@ def record_sound(hal: PizzaHAL, filename: Any, duration: float,
|
|||
myrecording = sd.rec(int(duration * AUDIO_REC_SR),
|
||||
samplerate=AUDIO_REC_SR,
|
||||
channels=2)
|
||||
resp = hal.send_cmd(SerialCommands.RECORD, int(duration)).strip()
|
||||
if resp == b'I':
|
||||
sd.stop()
|
||||
else:
|
||||
sd.wait() # Wait until recording is finished
|
||||
|
||||
hal.send_cmd(SerialCommands.RECORD, int(duration).to_bytes(4, 'little', signed=False))
|
||||
|
||||
sd.stop()
|
||||
|
||||
writewav(str(filename), AUDIO_REC_SR, myrecording)
|
||||
if cache:
|
||||
hal.soundcache[str(filename)] = (myrecording, AUDIO_REC_SR)
|
||||
|
|
4
pizzactrl/hal_test.py
Normal file
4
pizzactrl/hal_test.py
Normal file
|
@ -0,0 +1,4 @@
|
|||
from . import hal_serial
|
||||
|
||||
hal = hal_serial.PizzaHAL()
|
||||
|
Loading…
Reference in a new issue