2021-10-24 16:17:36 +00:00
|
|
|
import logging
|
|
|
|
import functools
|
|
|
|
from time import sleep
|
|
|
|
from enum import Enum
|
|
|
|
|
|
|
|
from typing import Any, List
|
2021-11-04 14:04:12 +00:00
|
|
|
from scipy.io.wavfile import write as writewav
|
2021-10-24 16:17:36 +00:00
|
|
|
|
|
|
|
import sounddevice as sd
|
|
|
|
import soundfile as sf
|
|
|
|
|
|
|
|
from . import gpio_pins
|
|
|
|
|
|
|
|
from picamera import PiCamera
|
2021-11-01 16:00:40 +00:00
|
|
|
from gpiozero import Button
|
2021-10-24 16:17:36 +00:00
|
|
|
|
|
|
|
import serial
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
# Constants
|
|
|
|
VIDEO_RES = (1920, 1080) # Video Resolution
|
|
|
|
PHOTO_RES = (2592, 1944) # Photo Resolution
|
|
|
|
AUDIO_REC_SR = 44100 # Audio Recording Samplerate
|
|
|
|
SERIAL_DEV = '/dev/serial0'
|
|
|
|
SERIAL_BAUDRATE = 9600
|
|
|
|
|
|
|
|
|
|
|
|
class SerialCommands(Enum):
|
|
|
|
MOTOR_VERT = 'V'
|
|
|
|
MOTOR_HOR = 'H'
|
|
|
|
BACKLIGHT = 'B'
|
|
|
|
FRONTLIGHT = 'F'
|
|
|
|
USER_INTERACTION = 'U'
|
2021-11-04 13:21:33 +00:00
|
|
|
RECORD = 'C'
|
2021-10-24 16:17:36 +00:00
|
|
|
REWIND = 'R'
|
|
|
|
|
|
|
|
|
|
|
|
class PizzaHAL:
|
|
|
|
"""
|
|
|
|
This class holds a represenation of the pizza box hardware and provides
|
|
|
|
methods to interact with it.
|
|
|
|
|
|
|
|
- lights upper/lower on/off
|
|
|
|
- motor up-down/left-right speed distance
|
|
|
|
- scroll up-down/left-right positions
|
|
|
|
- lid open/closed detectors
|
|
|
|
- user interface buttons
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self, serialdev: str = SERIAL_DEV, baudrate: int = SERIAL_BAUDRATE):
|
|
|
|
self.serialcon = serial.Serial(serialdev, baudrate=baudrate, timeout=None)
|
|
|
|
|
2021-11-01 16:00:40 +00:00
|
|
|
self.btn_start = Button(gpio_pins.BTN_START)
|
|
|
|
|
2021-10-24 16:17:36 +00:00
|
|
|
self.camera = None
|
|
|
|
self.soundcache = {}
|
|
|
|
|
|
|
|
self.blocked = False
|
|
|
|
|
|
|
|
def send_cmd(self, command: SerialCommands, *options):
|
|
|
|
self.blocked = True
|
|
|
|
opt_str = '+'.join(str(x) for x in options)
|
2021-10-29 16:40:27 +00:00
|
|
|
cmd_str = f'{command.value}:{opt_str}\n'
|
2021-10-24 16:17:36 +00:00
|
|
|
self.serialcon.write(cmd_str.encode('utf-8'))
|
|
|
|
resp = self.serialcon.readline()
|
|
|
|
self.blocked = False
|
|
|
|
return resp
|
|
|
|
|
|
|
|
|
|
|
|
def blocking(func):
|
|
|
|
@functools.wraps(func)
|
|
|
|
def _wrapper(*args, **kwargs):
|
|
|
|
hal = kwargs.get('hal', None)
|
|
|
|
if hal is not None:
|
|
|
|
logger.debug('blocking...')
|
|
|
|
while hal.blocked:
|
|
|
|
pass
|
|
|
|
hal.blocked = True
|
|
|
|
func(*args, **kwargs)
|
|
|
|
if hal is not None:
|
|
|
|
logger.debug('unblocking')
|
|
|
|
hal.blocked = False
|
|
|
|
sleep(0.1)
|
|
|
|
return _wrapper
|
|
|
|
|
|
|
|
|
|
|
|
@blocking
|
|
|
|
def move_vert(hal: PizzaHAL, steps: int):
|
|
|
|
"""
|
|
|
|
Move the motor controlling the vertical scroll a given distance.
|
|
|
|
|
|
|
|
"""
|
|
|
|
hal.send_cmd(SerialCommands.MOTOR_VERT, steps)
|
|
|
|
|
|
|
|
|
|
|
|
def move_hor(hal: PizzaHAL, steps: int):
|
|
|
|
"""
|
|
|
|
Move the motor controlling the horizontal scroll a given distance.
|
|
|
|
|
|
|
|
"""
|
|
|
|
hal.send_cmd(SerialCommands.MOTOR_HOR, steps)
|
|
|
|
|
|
|
|
|
|
|
|
@blocking
|
|
|
|
def rewind(hal: PizzaHAL):
|
|
|
|
"""
|
|
|
|
Rewind both scrolls.
|
|
|
|
|
|
|
|
"""
|
|
|
|
hal.send_cmd(SerialCommands.REWIND)
|
|
|
|
|
|
|
|
|
|
|
|
def turn_off(hal: PizzaHAL):
|
|
|
|
"""
|
|
|
|
Turn off everything.
|
|
|
|
"""
|
|
|
|
hal.send_cmd(SerialCommands.BACKLIGHT, 0)
|
|
|
|
hal.send_cmd(SerialCommands.FRONTLIGHT, 0)
|
|
|
|
|
|
|
|
|
|
|
|
def wait_for_input(hal: PizzaHAL, go_callback: Any,
|
2021-11-01 16:00:40 +00:00
|
|
|
back_callback: Any, to_callback: Any,
|
|
|
|
timeout=120, **kwargs):
|
2021-10-24 16:17:36 +00:00
|
|
|
"""
|
|
|
|
Blink leds on buttons. Wait until the user presses a button, then execute
|
|
|
|
the appropriate callback
|
|
|
|
|
|
|
|
:param hal: The hardware abstraction object
|
|
|
|
:param go_callback: called when button 'go' is pressed
|
|
|
|
:param back_callback: called whan button 'back' is pressed
|
2021-11-01 16:00:40 +00:00
|
|
|
:param to_callback: called on timeout
|
|
|
|
:param timeout: inactivity timeout in seconds (default 120)
|
2021-10-24 16:17:36 +00:00
|
|
|
"""
|
2021-11-04 13:21:33 +00:00
|
|
|
resp = hal.send_cmd(SerialCommands.USER_INTERACTION, timeout).strip()
|
2021-11-04 13:33:31 +00:00
|
|
|
if resp == b'B':
|
2021-10-24 16:17:36 +00:00
|
|
|
go_callback(**kwargs)
|
2021-11-04 13:33:31 +00:00
|
|
|
elif resp == b'R':
|
2021-10-24 16:17:36 +00:00
|
|
|
back_callback(**kwargs)
|
2021-11-01 16:00:40 +00:00
|
|
|
else:
|
|
|
|
to_callback(**kwargs)
|
2021-10-24 16:17:36 +00:00
|
|
|
|
|
|
|
|
|
|
|
@blocking
|
2021-10-29 16:40:27 +00:00
|
|
|
def light_layer(hal: PizzaHAL, intensity: float, fade: float = 0.0, steps: int = 100, **kwargs):
|
2021-10-24 16:17:36 +00:00
|
|
|
"""
|
|
|
|
Turn on the light to illuminate the upper scroll
|
|
|
|
|
|
|
|
:param hal: The hardware abstraction object
|
|
|
|
:param fade: float
|
|
|
|
Default 0, time in seconds to fade in or out
|
|
|
|
:param intensity: float
|
|
|
|
Intensity of the light in percent
|
2021-10-29 16:40:27 +00:00
|
|
|
:param steps: int
|
|
|
|
How many steps for the fade (default: 100)
|
2021-10-24 16:17:36 +00:00
|
|
|
"""
|
2021-10-29 16:40:27 +00:00
|
|
|
hal.send_cmd(SerialCommands.FRONTLIGHT, int(intensity * 100), int(fade * 1000), steps)
|
2021-10-24 16:17:36 +00:00
|
|
|
|
|
|
|
|
|
|
|
@blocking
|
|
|
|
def backlight(hal: PizzaHAL, intensity: float, fade: float = 0.0,
|
|
|
|
steps: int = 100, **kwargs):
|
|
|
|
"""
|
|
|
|
Turn on the backlight
|
|
|
|
|
|
|
|
:param hal: The hardware abstraction object
|
|
|
|
:param fade: float
|
|
|
|
Default 0, time in seconds to fade in or out
|
|
|
|
:param intensity: float
|
|
|
|
Intensity of the light in percent
|
|
|
|
:param steps: int
|
2021-10-29 16:40:27 +00:00
|
|
|
How many steps for the fade (default: 100)
|
2021-10-24 16:17:36 +00:00
|
|
|
"""
|
2021-10-29 16:40:27 +00:00
|
|
|
hal.send_cmd(SerialCommands.BACKLIGHT, int(intensity * 100), int(fade * 1000), steps)
|
2021-10-24 16:17:36 +00:00
|
|
|
|
|
|
|
@blocking
|
|
|
|
def play_sound(hal: PizzaHAL, sound: Any, **kwargs):
|
|
|
|
"""
|
|
|
|
Play a sound.
|
|
|
|
|
|
|
|
:param hal: The hardware abstraction object
|
|
|
|
:param sound: The sound to be played
|
|
|
|
"""
|
|
|
|
# Extract data and sampling rate from file
|
|
|
|
try:
|
|
|
|
data, fs = hal.soundcache.get(str(sound), sf.read(str(sound), dtype='float32'))
|
|
|
|
sd.play(data, fs)
|
|
|
|
sd.wait() # Wait until file is done playing
|
|
|
|
except KeyboardInterrupt:
|
|
|
|
logger.debug('skipped playback')
|
|
|
|
# sd.stop()
|
|
|
|
|
|
|
|
|
|
|
|
@blocking
|
2021-11-19 15:09:49 +00:00
|
|
|
def record_sound(hal: PizzaHAL, filename: Any, duration: float,
|
2021-10-24 16:17:36 +00:00
|
|
|
cache: bool = False, **kwargs):
|
|
|
|
"""
|
|
|
|
Record sound using the microphone
|
|
|
|
|
|
|
|
:param hal: The hardware abstraction object
|
|
|
|
:param filename: The path of the file to record to
|
|
|
|
:param duration: The time to record in seconds
|
|
|
|
:param cache: `True` to save recording to cache. Default is `False`
|
|
|
|
"""
|
|
|
|
myrecording = sd.rec(int(duration * AUDIO_REC_SR),
|
|
|
|
samplerate=AUDIO_REC_SR,
|
|
|
|
channels=2)
|
2021-11-19 15:09:49 +00:00
|
|
|
resp = hal.send_cmd(SerialCommands.RECORD, int(duration)).strip()
|
2021-11-04 13:35:15 +00:00
|
|
|
if resp == b'I':
|
2021-11-04 13:21:33 +00:00
|
|
|
sd.stop()
|
|
|
|
else:
|
|
|
|
sd.wait() # Wait until recording is finished
|
2021-11-04 14:04:12 +00:00
|
|
|
writewav(str(filename), AUDIO_REC_SR, myrecording)
|
2021-10-24 16:17:36 +00:00
|
|
|
if cache:
|
|
|
|
hal.soundcache[str(filename)] = (myrecording, AUDIO_REC_SR)
|
|
|
|
|
|
|
|
|
|
|
|
@blocking
|
|
|
|
def record_video(hal: PizzaHAL, filename: Any, duration: float, **kwargs):
|
|
|
|
"""
|
|
|
|
Record video using the camera
|
|
|
|
|
|
|
|
:param hal: The hardware abstraction object
|
|
|
|
:param filename: The path of the file to record to
|
|
|
|
:param duration: The time to record in seconds
|
|
|
|
"""
|
|
|
|
hal.camera.resolution = VIDEO_RES
|
|
|
|
hal.camera.start_recording(str(filename))
|
|
|
|
hal.camera.wait_recording(duration)
|
|
|
|
hal.camera.stop_recording()
|
|
|
|
|
|
|
|
|
|
|
|
@blocking
|
|
|
|
def take_photo(hal: PizzaHAL, filename: Any, **kwargs):
|
|
|
|
"""
|
|
|
|
Take a foto with the camera
|
|
|
|
|
|
|
|
:param hal: The hardware abstraction object
|
|
|
|
:param filename: The path of the filename for the foto
|
|
|
|
"""
|
|
|
|
hal.camera.resolution = PHOTO_RES
|
|
|
|
hal.camera.capture(str(filename))
|
|
|
|
|
|
|
|
|
|
|
|
@blocking
|
|
|
|
def init_sounds(hal: PizzaHAL, sounds: List):
|
|
|
|
"""
|
|
|
|
Load prerecorded Sounds into memory
|
|
|
|
|
|
|
|
:param hal:
|
|
|
|
:param sounds: A list of sound files
|
|
|
|
"""
|
|
|
|
if hal.soundcache is None:
|
|
|
|
hal.soundcache = {}
|
|
|
|
|
|
|
|
for sound in sounds:
|
|
|
|
# Extract data and sampling rate from file
|
|
|
|
data, fs = sf.read(str(sound), dtype='float32')
|
|
|
|
hal.soundcache[str(sound)] = (data, fs)
|
|
|
|
|
|
|
|
|
|
|
|
@blocking
|
|
|
|
def init_camera(hal: PizzaHAL):
|
|
|
|
if hal.camera is None:
|
2021-11-04 14:52:09 +00:00
|
|
|
hal.camera = PiCamera(sensor_mode=5)
|