implementing serial communication

This commit is contained in:
jpunkt 2021-10-24 18:17:36 +02:00
parent f8b21a75d7
commit c657e782f0
6 changed files with 267 additions and 19 deletions

View file

@ -1,5 +1,5 @@
from pkg_resources import resource_filename
__version__ = '0.0.2'
__version__ = '0.1.0'
SOUNDS_PATH = resource_filename(__name__, 'sounds/')

254
pizzactrl/hal_serial.py Normal file
View file

@ -0,0 +1,254 @@
import logging
import functools
from time import sleep
from enum import Enum
from typing import Any, List
from scipy.io.wavfile import write as writewav
import sounddevice as sd
import soundfile as sf
import numpy as np
from . import gpio_pins
from picamera import PiCamera
from gpiozero import Button, OutputDevice, PWMOutputDevice, PWMLED
import serial
logger = logging.getLogger(__name__)
# Constants
VIDEO_RES = (1920, 1080) # Video Resolution
PHOTO_RES = (2592, 1944) # Photo Resolution
AUDIO_REC_SR = 44100 # Audio Recording Samplerate
SERIAL_DEV = '/dev/serial0'
SERIAL_BAUDRATE = 9600
class SerialCommands(Enum):
MOTOR_VERT = 'V'
MOTOR_HOR = 'H'
BACKLIGHT = 'B'
FRONTLIGHT = 'F'
USER_INTERACTION = 'U'
REWIND = 'R'
class PizzaHAL:
"""
This class holds a represenation of the pizza box hardware and provides
methods to interact with it.
- lights upper/lower on/off
- motor up-down/left-right speed distance
- scroll up-down/left-right positions
- lid open/closed detectors
- user interface buttons
"""
def __init__(self, serialdev: str = SERIAL_DEV, baudrate: int = SERIAL_BAUDRATE):
self.serialcon = serial.Serial(serialdev, baudrate=baudrate, timeout=None)
self.camera = None
self.soundcache = {}
self.blocked = False
def send_cmd(self, command: SerialCommands, *options):
self.blocked = True
opt_str = '+'.join(str(x) for x in options)
cmd_str = f'{command.value}:{opt_str}'
self.serialcon.write(cmd_str.encode('utf-8'))
resp = self.serialcon.readline()
self.blocked = False
return resp
def blocking(func):
@functools.wraps(func)
def _wrapper(*args, **kwargs):
hal = kwargs.get('hal', None)
if hal is not None:
logger.debug('blocking...')
while hal.blocked:
pass
hal.blocked = True
func(*args, **kwargs)
if hal is not None:
logger.debug('unblocking')
hal.blocked = False
sleep(0.1)
return _wrapper
@blocking
def move_vert(hal: PizzaHAL, steps: int):
"""
Move the motor controlling the vertical scroll a given distance.
"""
hal.send_cmd(SerialCommands.MOTOR_VERT, steps)
def move_hor(hal: PizzaHAL, steps: int):
"""
Move the motor controlling the horizontal scroll a given distance.
"""
hal.send_cmd(SerialCommands.MOTOR_HOR, steps)
@blocking
def rewind(hal: PizzaHAL):
"""
Rewind both scrolls.
"""
hal.send_cmd(SerialCommands.REWIND)
def turn_off(hal: PizzaHAL):
"""
Turn off everything.
"""
hal.send_cmd(SerialCommands.BACKLIGHT, 0)
hal.send_cmd(SerialCommands.FRONTLIGHT, 0)
def wait_for_input(hal: PizzaHAL, go_callback: Any,
back_callback: Any, **kwargs):
"""
Blink leds on buttons. Wait until the user presses a button, then execute
the appropriate callback
:param hal: The hardware abstraction object
:param go_callback: called when button 'go' is pressed
:param back_callback: called whan button 'back' is pressed
"""
resp = hal.send_cmd(SerialCommands.USER_INTERACTION)
if resp == 'B':
go_callback(**kwargs)
elif resp == 'R':
back_callback(**kwargs)
@blocking
def light_layer(hal: PizzaHAL, intensity: float, fade: float = 0.0, **kwargs):
"""
Turn on the light to illuminate the upper scroll
:param hal: The hardware abstraction object
:param fade: float
Default 0, time in seconds to fade in or out
:param intensity: float
Intensity of the light in percent
"""
hal.send_cmd(SerialCommands.FRONTLIGHT, int(intensity * 100), int(fade * 1000))
@blocking
def backlight(hal: PizzaHAL, intensity: float, fade: float = 0.0,
steps: int = 100, **kwargs):
"""
Turn on the backlight
:param hal: The hardware abstraction object
:param fade: float
Default 0, time in seconds to fade in or out
:param intensity: float
Intensity of the light in percent
:param steps: int
How many steps for the fade (default: 10)
"""
hal.send_cmd(SerialCommands.BACKLIGHT, int(intensity * 100), int(fade * 1000))
@blocking
def play_sound(hal: PizzaHAL, sound: Any, **kwargs):
"""
Play a sound.
:param hal: The hardware abstraction object
:param sound: The sound to be played
"""
# Extract data and sampling rate from file
try:
data, fs = hal.soundcache.get(str(sound), sf.read(str(sound), dtype='float32'))
sd.play(data, fs)
sd.wait() # Wait until file is done playing
except KeyboardInterrupt:
logger.debug('skipped playback')
# sd.stop()
@blocking
def record_sound(hal: PizzaHAL, filename: Any, duration: int,
cache: bool = False, **kwargs):
"""
Record sound using the microphone
:param hal: The hardware abstraction object
:param filename: The path of the file to record to
:param duration: The time to record in seconds
:param cache: `True` to save recording to cache. Default is `False`
"""
myrecording = sd.rec(int(duration * AUDIO_REC_SR),
samplerate=AUDIO_REC_SR,
channels=2)
sd.wait() # Wait until recording is finished
writewav(str(filename), AUDIO_REC_SR, myrecording)
if cache:
hal.soundcache[str(filename)] = (myrecording, AUDIO_REC_SR)
@blocking
def record_video(hal: PizzaHAL, filename: Any, duration: float, **kwargs):
"""
Record video using the camera
:param hal: The hardware abstraction object
:param filename: The path of the file to record to
:param duration: The time to record in seconds
"""
hal.camera.resolution = VIDEO_RES
hal.camera.start_recording(str(filename))
hal.camera.wait_recording(duration)
hal.camera.stop_recording()
@blocking
def take_photo(hal: PizzaHAL, filename: Any, **kwargs):
"""
Take a foto with the camera
:param hal: The hardware abstraction object
:param filename: The path of the filename for the foto
"""
hal.camera.resolution = PHOTO_RES
hal.camera.capture(str(filename))
@blocking
def init_sounds(hal: PizzaHAL, sounds: List):
"""
Load prerecorded Sounds into memory
:param hal:
:param sounds: A list of sound files
"""
if hal.soundcache is None:
hal.soundcache = {}
for sound in sounds:
# Extract data and sampling rate from file
data, fs = sf.read(str(sound), dtype='float32')
hal.soundcache[str(sound)] = (data, fs)
@blocking
def init_camera(hal: PizzaHAL):
if hal.camera is None:
hal.camera = PiCamera()

View file

@ -11,9 +11,9 @@ from enum import Enum, auto
from pizzactrl import fs_names, sb_dummy, sb_de_alt
from .storyboard import Activity
from .hal import play_sound, take_photo, record_video, record_sound, turn_off, \
from .hal_serial import play_sound, take_photo, record_video, record_sound, turn_off, \
PizzaHAL, init_camera, init_sounds, wait_for_input, \
light_layer, backlight, advance, rewind
light_layer, backlight, move_vert, move_hor, rewind
logger = logging.getLogger(__name__)
@ -118,7 +118,7 @@ class Statemachine:
# check scroll positions and rewind if necessary
turn_off(self.hal)
rewind(self.hal.motor_ud, self.hal.ud_sensor)
#rewind(self.hal.motor_ud, self.hal.ud_sensor)
if not os.path.exists(fs_names.USB_STICK):
logger.warning('USB-Stick not found.')
@ -129,7 +129,8 @@ class Statemachine:
play_sound(self.hal, fs_names.SFX_POST_OK)
# Callback for start when blue button is held
self.hal.btn_forward.when_deactivated = self._start
# TODO add start button
#self.hal.btn_forward.when_deactivated = self._start
self.state = State.IDLE_START
@ -190,7 +191,7 @@ class Statemachine:
elif act.activity is Activity.ADVANCE_UP:
if chapter.move and self.move:
logger.debug(
f'advance{advance}({self.hal.motor_ud}, '
f'advance({self.hal.motor_ud}, '
f'{self.hal.ud_sensor})')
advance(motor=self.hal.motor_ud,
sensor=self.hal.ud_sensor)

View file

@ -8,6 +8,7 @@ class Activity(Enum):
RECORD_VIDEO = {'duration': 0.0, 'filename': ''}
TAKE_PHOTO = {'filename': ''}
ADVANCE_UP = {'speed': 0.3, 'direction': True}
ADVANCE_LEFT = {'speed': 0.3, 'direction': True, 'steps': 180} # TODO set right number of steps
LIGHT_LAYER = {'intensity': 1.0, 'fade': 0.0, 'layer': True}
LIGHT_BACK = {'intensity': 1.0, 'fade': 0.0}
@ -39,7 +40,7 @@ class Chapter:
if self.pos >= len(self.activities):
raise StopIteration
act = self.activities[self.pos]
if act.activity is Activity.ADVANCE_UP:
if act.activity is Activity.ADVANCE_UP: # TODO add ADVANCE_LEFT
self.move_ud += 1
self.pos += 1
return act

View file

@ -5,3 +5,4 @@ click
sounddevice
soundfile
scipy
pyserial

View file

@ -1,15 +1,5 @@
import re
import ast
import scipy
import wave
import click
import gpiozero
import picamera
import pyaudio as pyaudio
import pydub
import sounddevice as sounddevice
import soundfile as soundfile
from setuptools import setup
@ -39,7 +29,8 @@ with open('pizzactrl/__init__.py', 'rb') as f:
'click',
'sounddevice',
'soundfile',
'scipy'
'scipy',
'pyserial'
],
entry_points='''