Implemented serial handshake with HELO pins.

Implemented stop action on lid close.
Untested: statemachine rewind on lid close.
This commit is contained in:
jpunkt 2022-01-19 22:08:46 +01:00
parent 2670c3c714
commit 4f8d97a7ff
3 changed files with 132 additions and 67 deletions

View file

@ -1,6 +1,5 @@
import logging
import functools
import threading
from time import sleep
from enum import Enum
@ -13,7 +12,7 @@ import soundfile as sf
import pygame.mixer as mx
from picamera import PiCamera
from gpiozero import Button
from gpiozero import Button, DigitalOutputDevice, DigitalInputDevice
import serial
@ -29,7 +28,8 @@ AUDIO_REC_SR = 44100 # Audio Recording Samplerate
SERIAL_DEV = '/dev/serial0' # Serial port to use
SERIAL_BAUDRATE = 115200 # Serial connection baud rate
SERIAL_CONN_TIMEOUT = 60 # Serial connection read timeout
SERIAL_CONN_TIMEOUT = 0.2 # Serial connection read timeout
HELO_TIMEOUT = 20
class Lights(Enum):
@ -47,6 +47,7 @@ class SerialCommands(Enum):
ALREADY_CONNECTED = b'\x01'
ERROR = b'\x02'
RECEIVED = b'\x03'
ABORT = b'\x63' # 99 decimal
SET_MOVEMENT = b'M'
SET_LIGHT = b'L'
@ -64,6 +65,10 @@ class SerialCommands(Enum):
EOT = b'\n'
class CommunicationError(Exception):
pass
class SerialCommunicationError(Exception):
pass
@ -84,16 +89,43 @@ class PizzaHAL:
def __init__(self, serialdev: str = SERIAL_DEV, baudrate: int = SERIAL_BAUDRATE, timeout: float = SERIAL_CONN_TIMEOUT):
self.serialcon = serial.Serial(serialdev, baudrate=baudrate, timeout=timeout)
# Lid switch with pull-up. is_pressed = True when lid is open
self.lid_switch = Button(LID_SWITCH)
self.pin_helo1 = DigitalOutputDevice(HELO1)
self.pin_helo2 = DigitalInputDevice(HELO2)
self.camera = None
self.soundcache = {}
self.connected = False
@property
def lid_open(self) -> bool:
"""
Returns True when the lid is open
"""
return self.lid_switch.is_pressed
def init_connection(self):
"""
Set HELO1 pin to `High`, wait for HELO2 to be set `High` by microcontroller.
Then perform serial handshake.
"""
self.pin_helo1.on()
timer = 0
while (not self.pin_helo2.value) and (timer < HELO_TIMEOUT):
sleep(0.1)
timer += 1
if not (timer % 100):
logger.info(f'Waiting for connection ({timer / 10}s)')
if not self.pin_helo2.value:
raise CommunicationError('Microcontroller did not respond to HELO pin.')
self.serialcon.write(SerialCommands.HELLO.value + SerialCommands.EOT.value)
resp = self.serialcon.read_until()
if resp == (SerialCommands.HELLO.value + SerialCommands.EOT.value):
self.serialcon.write(SerialCommands.ALREADY_CONNECTED.value + SerialCommands.EOT.value)
resp = self.serialcon.read_until()
@ -109,6 +141,7 @@ class PizzaHAL:
raise SerialCommunicationError('Timeout on initializing connection.')
else:
raise SerialCommunicationError(f'Serial Connection received invalid response to HELLO: {resp}')
self.connected = True
def init_sounds(self, sounds: List=None):
@ -118,18 +151,15 @@ class PizzaHAL:
:param hal:
:param sounds: A list of sound files
"""
# if self.soundcache is None:
# self.soundcache = {}
if self.soundcache is None:
self.soundcache = {}
if not mx.get_init():
mx.init()
# if sounds is not None:
# for sound in sounds:
# # Extract data and sampling rate from file
# # data, fs = sf.read(str(sound), dtype='float32')
# # self.soundcache[str(sound)] = (data, fs)
# self.soundcache[str(sound)] = mx.Sound(str(sound))
if sounds is not None:
for sound in sounds:
self.soundcache[str(sound)] = mx.Sound(str(sound))
def init_camera(self):
if self.camera is None:
@ -143,28 +173,48 @@ class PizzaHAL:
if mx.get_busy():
mx.stop()
def send_cmd(self, command: SerialCommands, *options):
def send_cmd(self, command: SerialCommands, *options, ignore_lid: bool=False):
"""
Send a command and optional options. Options need to be encoded as bytes before passing.
This function is blocking.
Returns the response from the microcontroller or `None` if the lid was closed and `ignore_lid` is `False`.
Raises a SerialCommunicationError if serial connection was not initialized or a response other than
SerialCommands.RECEIVED was received.
Raises a CommunicationError if the HELO2 pin goes low while waiting for response.
"""
if not self.connected:
raise SerialCommunicationError("Serial Communication not initialized. Call `init_connection()` before `send_cmd()`.")
self.serialcon.write(command.value)
for o in options:
self.serialcon.write(o)
self.serialcon.write(SerialCommands.EOT.value)
resp = self.serialcon.read_until()
while resp == b'':
self.serialcon.write(SerialCommands.EOT.value)
resp = b''
while resp is b'':
# If serial communication timeout occurs, response is empty.
# Read again to allow for longer waiting times
if not self.pin_helo2.value:
raise CommunicationError('Pin HELO2 LOW. Microcontroller in error state or lost connection.')
if (not ignore_lid) and (not self.lid_open):
logger.info('Lid closed while processing command. Returning None.')
return None
resp = self.serialcon.read_until()
logger.debug(f'hal.send_cmd() received {resp}')
if not resp.startswith(SerialCommands.RECEIVED.value):
raise SerialCommunicationError(f'Serial Communication received unexpected response: {resp}')
return resp
def flush_serial(self):
self.serialcon.read_all()
def set_movement(hal: PizzaHAL,
scroll: Scrolls,
@ -187,16 +237,16 @@ def rewind(hal: PizzaHAL, **kwargs):
Rewind both scrolls.
"""
hal.send_cmd(SerialCommands.REWIND)
hal.send_cmd(SerialCommands.REWIND, ignore_lid=True)
def turn_off(hal: PizzaHAL, **kwargs):
"""
Turn off the lights.
"""
set_light(hal, Lights.BACKLIGHT, 0, 0, 0, 0, 0)
set_light(hal, Lights.FRONTLIGHT, 0, 0, 0, 0, 0)
do_it(hal)
set_light(hal, Lights.BACKLIGHT, 0, 0, 0, 0, 0, ignore_lid=True)
set_light(hal, Lights.FRONTLIGHT, 0, 0, 0, 0, 0, ignore_lid=True)
do_it(hal, ignore_lid=True)
def wait_for_input(hal: PizzaHAL,
@ -241,6 +291,13 @@ def wait_for_input(hal: PizzaHAL,
if sound is not None:
hal.stop_sound()
if resp is None:
# lid was closed by user
logger.info('Lid closed during wait_for_input(). Sending ABORT.')
hal.send_cmd(SerialCommands.ABORT, ignore_lid=True)
hal.flush_serial()
return
if len(resp) != 3:
raise SerialCommunicationError(f'USER_INTERACTION expects 3 bytes, received {resp}')
@ -278,14 +335,18 @@ def set_light(hal: PizzaHAL,
hal.send_cmd(SerialCommands.SET_LIGHT,
int(light.value).to_bytes(1, 'little'),
int(color).to_bytes(4, 'little'),
int(fade * 1000).to_bytes(4, 'little'))
int(fade * 1000).to_bytes(4, 'little'),
ignore_lid=kwargs.get('ignore_lid', False))
def do_it(hal: PizzaHAL):
def do_it(hal: PizzaHAL, ignore_lid: bool=False, **kwargs):
"""
Execute set commands
"""
hal.send_cmd(SerialCommands.DO_IT)
if hal.send_cmd(SerialCommands.DO_IT, ignore_lid=ignore_lid) is None:
logger.info('Lid closed during do_it(). Sending ABORT.')
hal.send_cmd(SerialCommands.ABORT, ignore_lid=True)
hal.flush_serial()
def play_sound(hal: PizzaHAL, sound: Any, **kwargs):
@ -298,10 +359,13 @@ def play_sound(hal: PizzaHAL, sound: Any, **kwargs):
# Extract data and sampling rate from file
try:
hal.play_sound(str(sound))
while mx.get_busy():
while mx.get_busy() and hal.lid_open:
pass
if not hal.lid_open:
hal.stop_sound()
except KeyboardInterrupt:
mx.stop()
hal.stop_sound()
logger.debug('skipped playback')

View file

@ -1,3 +1,8 @@
import sys
import logging
logging.basicConfig(level=logging.DEBUG, stream=sys.stdout)
from . import hal_serial
hal = hal_serial.PizzaHAL()

View file

@ -5,18 +5,21 @@ from enum import Enum, auto
from pizzactrl import fs_names
from .storyboard import Language, Storyboard
from .hal_serial import SerialCommunicationError, PizzaHAL, wait_for_input, play_sound, turn_off
from .hal_serial import SerialCommunicationError, CommunicationError, PizzaHAL, wait_for_input, play_sound, turn_off
logger = logging.getLogger(__name__)
class FileSystemException(Exception):
pass
class State(Enum):
POWER_ON = auto()
POST = auto()
IDLE_START = auto()
LANGUAGE_SELECT = auto()
PLAY = auto()
PAUSE = auto()
POST_PROCESS = auto()
REWIND = auto()
IDLE_END = auto()
@ -43,7 +46,16 @@ class Statemachine:
self.test = test
self.loop = loop
self.state = State.POWER_ON
self.state = State.POWER_ON
def _next_state(self):
"""
Set `self.state` to the next state
"""
try:
self.state = State(self.state.value + 1)
except ValueError:
pass
def run(self):
logger.debug(f'Starting Statemachine...')
@ -62,7 +74,14 @@ class Statemachine:
while (self.state is not State.ERROR) and \
(self.state is not State.SHUTDOWN):
logger.debug(f'Run(state={self.state})')
choice[self.state]()
try:
choice[self.state]()
except (CommunicationError, SerialCommunicationError) as e:
self.state = State.ERROR
logger.error('Communication with microcontroller failed.', e)
except Exception as e:
self.state = State.ERROR
logger.error(e)
if self.state is State.ERROR:
logger.debug('An error occurred. Trying to notify user...')
@ -75,63 +94,38 @@ class Statemachine:
self._shutdown()
def _lid_open(self):
# TODO implement
pass
def _lid_closed(self):
# TODO implement
pass
def _power_on(self):
"""
Initialize hal callbacks, load sounds
"""
# TODO enable lid sensor
self.hal.lid_switch.when_pressed = self._lid_open
self.hal.lid_switch.when_released = self._lid_closed
self.hal.init_sounds()
self.hal.init_camera()
self.state = State.POST
self._next_state()
def _post(self):
"""
Power on self test.
"""
if (not self.test) and (not os.path.exists(fs_names.USB_STICK)):
logger.warning('USB-Stick not found.')
self.state = State.ERROR
return
# TODO set RPi_HELO pins, wait for response
# Callback for start when blue button is held
# self.hal.btn_start.when_activated = self._start_or_rewind
# logger.debug('start button callback activated')
try:
self.hal.init_connection()
except SerialCommunicationError as e:
self.state = State.ERROR
logger.exception(e)
return
raise FileSystemException('USB Stick not present!')
self.hal.init_connection()
# play a sound if everything is alright
play_sound(self.hal, fs_names.SFX_POST_OK)
if self.test:
self.state = State.LANGUAGE_SELECT
else:
self.state = State.IDLE_START
self._next_state()
def _idle_start(self):
"""
Device is armed. Wait for user to press start button
Device is armed. Wait for user to open the lid
"""
pass
if self.hal.lid_open:
self._next_state()
def _lang_select(self):
"""
@ -158,7 +152,7 @@ class Statemachine:
self.story.language = self.lang
logger.debug(f'User selected language={self.lang}')
self.state = State.PLAY
self._next_state()
def _play(self):
"""
@ -170,7 +164,7 @@ class Statemachine:
self.story.play_chapter()
self.story.advance_chapter()
self.state = State.POST_PROCESS
self._next_state()
def _post_process(self):
"""
@ -181,7 +175,7 @@ class Statemachine:
# cmdstring = f'MP4Box -add {fs_names.REC_DRAW_CITY} {fs_names.REC_MERGED_VIDEO}'
# call([cmdstring], shell=True)
self.state = State.REWIND
self._next_state()
def _rewind(self):
"""
@ -193,16 +187,18 @@ class Statemachine:
if self.loop:
self.state = State.IDLE_START
else:
self.state = State.IDLE_END
self._next_state()
def _idle_end(self):
"""
Initialize shutdown
"""
self.state = State.SHUTDOWN
self._next_state()
def _shutdown(self):
"""
Clean up, end execution
"""
self.hal.pin_helo1.off()
del self.hal
del self.story