pizzabox-main/pizzactrl/hal_serial.py
jpunkt d837134db5 Major refactoring:
- New Storyboard class now has execution logic included
- Implemented dummy storyboard for testing
- Language selection is a state in Statemachine
- Added test class for instantiating a Statemachine
2022-01-17 20:39:36 +01:00

354 lines
10 KiB
Python

import logging
import functools
import threading
from time import sleep
from enum import Enum
from typing import Any, List, Iterable
from scipy.io.wavfile import write as writewav
import sounddevice as sd
import soundfile as sf
import pygame.mixer as mx
from . import gpio_pins
from picamera import PiCamera
from gpiozero import Button
import serial
logger = logging.getLogger(__name__)
# Constants
VIDEO_RES = (1920, 1080) # Video Resolution
PHOTO_RES = (2592, 1944) # Photo Resolution
AUDIO_REC_SR = 44100 # Audio Recording Samplerate
SERIAL_DEV = '/dev/serial0' # Serial port to use
SERIAL_BAUDRATE = 115200 # Serial connection baud rate
SERIAL_CONN_TIMEOUT = 60 # Serial connection read timeout
class Lights(Enum):
BACKLIGHT = 0
FRONTLIGHT = 1
class Scrolls(Enum):
HORIZONTAL = 0
VERTICAL = 1
class SerialCommands(Enum):
HELLO = b'\x00'
ALREADY_CONNECTED = b'\x01'
ERROR = b'\x02'
RECEIVED = b'\x03'
SET_MOVEMENT = b'M'
SET_LIGHT = b'L'
DO_IT = b'D'
USER_INTERACT = b'U'
RECORD = b'C'
REWIND = b'R'
DEBUG_SCROLL = b'S'
DEBUG_SENSORS = b'Z'
EOT = b'\n'
class SerialCommunicationError(Exception):
pass
class PizzaHAL:
"""
This class holds a represenation of the pizza box hardware and provides
methods to interact with it.
- lights upper/lower on/off
- motor up-down/left-right speed distance
- scroll up-down/left-right positions
- lid open/closed detectors
- user interface buttons
"""
def __init__(self, serialdev: str = SERIAL_DEV, baudrate: int = SERIAL_BAUDRATE, timeout: float = SERIAL_CONN_TIMEOUT):
self.serialcon = serial.Serial(serialdev, baudrate=baudrate, timeout=timeout)
self.btn_start = Button(gpio_pins.BTN_START)
self.camera = None
self.soundcache = {}
self.connected = False
def init_connection(self):
self.serialcon.write(SerialCommands.HELLO.value + SerialCommands.EOT.value)
resp = self.serialcon.read_until()
if resp == (SerialCommands.HELLO.value + SerialCommands.EOT.value):
self.serialcon.write(SerialCommands.ALREADY_CONNECTED.value + SerialCommands.EOT.value)
resp = self.serialcon.read_until()
if resp == (SerialCommands.ALREADY_CONNECTED.value + SerialCommands.EOT.value):
logger.info('Serial Connection established')
elif resp == b'':
raise SerialCommunicationError('Timeout on initializing connection.')
else:
raise SerialCommunicationError(f'Serial Connection received invalid response to ALREADY CONNECTED: {resp}')
elif resp == (SerialCommands.ALREADY_CONNECTED.value + SerialCommands.EOT.value):
logger.warn('Serial Connection received ALREADY CONNECTED as response to HELLO. Assuming connection ok.')
elif resp == b'':
raise SerialCommunicationError('Timeout on initializing connection.')
else:
raise SerialCommunicationError(f'Serial Connection received invalid response to HELLO: {resp}')
self.connected = True
def init_sounds(self, sounds: List=None):
"""
Load prerecorded Sounds into memory
:param hal:
:param sounds: A list of sound files
"""
# if self.soundcache is None:
# self.soundcache = {}
if not mx.get_init():
mx.init()
# if sounds is not None:
# for sound in sounds:
# # Extract data and sampling rate from file
# # data, fs = sf.read(str(sound), dtype='float32')
# # self.soundcache[str(sound)] = (data, fs)
# self.soundcache[str(sound)] = mx.Sound(str(sound))
def init_camera(self):
if self.camera is None:
self.camera = PiCamera(sensor_mode=5)
def play_sound(self, sound: str):
s = self.soundcache.get(sound, mx.Sound(sound))
s.play()
def stop_sound(self):
if mx.get_busy():
mx.stop()
def send_cmd(self, command: SerialCommands, *options):
"""
Send a command and optional options. Options need to be encoded as bytes before passing.
"""
if not self.connected:
raise SerialCommunicationError("Serial Communication not initialized. Call `init_connection()` before `send_cmd()`.")
self.serialcon.write(command.value)
for o in options:
self.serialcon.write(o)
self.serialcon.write(SerialCommands.EOT.value)
resp = self.serialcon.read_until()
while resp == b'':
# If serial communication timeout occurs, response is empty.
# Read again to allow for longer waiting times
resp = self.serialcon.read_until()
if not resp.startswith(SerialCommands.RECEIVED.value):
raise SerialCommunicationError(f'Serial Communication received unexpected response: {resp}')
return resp
def set_movement(hal: PizzaHAL,
scroll: Scrolls,
steps: int,
**kwargs):
"""
Move the motor controlling the vertical scroll a given distance.
"""
scroll = int(scroll.value)
hal.send_cmd(SerialCommands.SET_MOVEMENT,
scroll.to_bytes(1, 'little', signed=False),
steps.to_bytes(2, 'little', signed=True))
def rewind(hal: PizzaHAL, **kwargs):
"""
Rewind both scrolls.
"""
hal.send_cmd(SerialCommands.REWIND)
def turn_off(hal: PizzaHAL, **kwargs):
"""
Turn off the lights.
"""
set_light(hal, Lights.BACKLIGHT, 0, 0, 0, 0, 0)
set_light(hal, Lights.FRONTLIGHT, 0, 0, 0, 0, 0)
do_it(hal)
def wait_for_input(hal: PizzaHAL,
blue_cb: Any = None,
red_cb: Any = None,
yellow_cb: Any = None,
green_cb: Any = None,
timeout_cb: Any = None,
sound: Any = None,
timeout=120, **kwargs):
"""
Blink leds on buttons. Wait until the user presses a button, then execute
the appropriate callback. If a callback is not defined, the button is not
used.
Optionally plays sound which can be interrupted by user input.
:param hal: The hardware abstraction object
:param blue_cb: Callback for blue button press
:param red_cb: Callback for red button press
:param yellow_cb: Callback for yellow button press
:param green_cb: Callback for green button press
:param timeout_cb: Callback for no button press
:param sound: Name of sound file to play until user presses a button
:param timeout: Time to wait before abort. 0 to wait forever
"""
if timeout is not None:
timeout *= 1000
else:
timeout = 0
bitmask = (1 if blue_cb else 0) | \
(2 if red_cb else 0) | \
(4 if yellow_cb else 0) | \
(8 if green_cb else 0)
if sound is not None:
logger.debug(f'Waiting for user, playing sound {sound}.')
hal.play_sound(str(sound))
resp = hal.send_cmd(SerialCommands.USER_INTERACT, bitmask.to_bytes(1, 'little', signed=False), timeout.to_bytes(4, 'little', signed=False))
if sound is not None:
hal.stop_sound()
if len(resp) != 3:
raise SerialCommunicationError(f'USER_INTERACTION expects 3 bytes, received {resp}')
resp = resp[1]
if resp == 1:
blue_cb(**kwargs)
elif resp == 2:
red_cb(**kwargs)
elif resp == 4:
yellow_cb(**kwargs)
elif resp == 8:
green_cb(**kwargs)
elif timeout_cb is not None:
timeout_cb(**kwargs)
def set_light(hal: PizzaHAL,
light: Lights,
r: float,
g: float,
b: float,
w: float,
fade: float = 0.0,
**kwargs):
"""
Turn on the light to illuminate the upper scroll
:param hal: The hardware abstraction object
:param fade: float
Default 0, time in seconds to fade in or out
"""
# convert color to 32bit number
color = (int(w * 255) << 24) | (int(b * 255) << 16) | (int(g * 255) << 8) | (int(r * 255))
hal.send_cmd(SerialCommands.SET_LIGHT,
int(light.value).to_bytes(1, 'little'),
int(color).to_bytes(4, 'little'),
int(fade * 1000).to_bytes(4, 'little'))
def do_it(hal: PizzaHAL):
"""
Execute set commands
"""
hal.send_cmd(SerialCommands.DO_IT)
def play_sound(hal: PizzaHAL, sound: Any, **kwargs):
"""
Play a sound (blocking).
:param hal: The hardware abstraction object
:param sound: The sound to be played
"""
# Extract data and sampling rate from file
try:
hal.play_sound(str(sound))
while mx.get_busy():
pass
except KeyboardInterrupt:
mx.stop()
logger.debug('skipped playback')
def record_sound(hal: PizzaHAL, filename: Any,
duration: float,
cache: bool = False, **kwargs):
"""
Record sound using the microphone
:param hal: The hardware abstraction object
:param filename: The path of the file to record to
:param duration: The time to record in seconds
:param cache: `True` to save recording to cache. Default is `False`
"""
myrecording = sd.rec(int(duration * AUDIO_REC_SR),
samplerate=AUDIO_REC_SR,
channels=2)
hal.send_cmd(SerialCommands.RECORD, int(duration*1000).to_bytes(4, 'little', signed=False))
sd.stop()
writewav(str(filename), AUDIO_REC_SR, myrecording)
if cache:
hal.soundcache[str(filename)] = (myrecording, AUDIO_REC_SR)
def record_video(hal: PizzaHAL, filename: Any, duration: float, **kwargs):
"""
Record video using the camera
:param hal: The hardware abstraction object
:param filename: The path of the file to record to
:param duration: The time to record in seconds
"""
hal.camera.resolution = VIDEO_RES
hal.camera.start_recording(str(filename))
hal.camera.wait_recording(duration)
hal.camera.stop_recording()
def take_photo(hal: PizzaHAL, filename: Any, **kwargs):
"""
Take a foto with the camera
:param hal: The hardware abstraction object
:param filename: The path of the filename for the foto
"""
hal.camera.resolution = PHOTO_RES
hal.camera.capture(str(filename))