946 lines
28 KiB
Python
946 lines
28 KiB
Python
|
"""
|
||
|
boson.py
|
||
|
====================================
|
||
|
Class for interacting with FLIR Boson cameras
|
||
|
"""
|
||
|
from flirpy.camera.core import Core
|
||
|
import struct
|
||
|
import ctypes
|
||
|
import binascii
|
||
|
from serial.tools import list_ports
|
||
|
import os
|
||
|
import sys
|
||
|
import logging
|
||
|
import cv2
|
||
|
import time
|
||
|
import matplotlib.pyplot as plt
|
||
|
import png
|
||
|
|
||
|
|
||
|
# FFC Mode enum
|
||
|
FLR_BOSON_MANUAL_FFC = 0
|
||
|
FLR_BOSON_AUTO_FFC = 1
|
||
|
FLR_BOSON_EXTERNAL_FFC = 2
|
||
|
FLR_BOSON_SHUTTER_TEST_FFC = 3
|
||
|
FLR_BOSON_FFCMODE_END = 4
|
||
|
|
||
|
# FFC State enum
|
||
|
FLR_BOSON_NO_FFC_PERFORMED = 0
|
||
|
FLR_BOSON_FFC_IMMINENT = 1
|
||
|
FLR_BOSON_FFC_IN_PROGRESS = 2
|
||
|
FLR_BOSON_FFC_COMPLETE = 3
|
||
|
FLR_BOSON_FFCSTATUS_END = 4
|
||
|
|
||
|
crc_table = [
|
||
|
0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50A5, 0x60C6, 0x70E7,
|
||
|
0x8108, 0x9129, 0xA14A, 0xB16B, 0xC18C, 0xD1AD, 0xE1CE, 0xF1EF,
|
||
|
0x1231, 0x0210, 0x3273, 0x2252, 0x52B5, 0x4294, 0x72F7, 0x62D6,
|
||
|
0x9339, 0x8318, 0xB37B, 0xA35A, 0xD3BD, 0xC39C, 0xF3FF, 0xE3DE,
|
||
|
0x2462, 0x3443, 0x0420, 0x1401, 0x64E6, 0x74C7, 0x44A4, 0x5485,
|
||
|
0xA56A, 0xB54B, 0x8528, 0x9509, 0xE5EE, 0xF5CF, 0xC5AC, 0xD58D,
|
||
|
0x3653, 0x2672, 0x1611, 0x0630, 0x76D7, 0x66F6, 0x5695, 0x46B4,
|
||
|
0xB75B, 0xA77A, 0x9719, 0x8738, 0xF7DF, 0xE7FE, 0xD79D, 0xC7BC,
|
||
|
0x48C4, 0x58E5, 0x6886, 0x78A7, 0x0840, 0x1861, 0x2802, 0x3823,
|
||
|
0xC9CC, 0xD9ED, 0xE98E, 0xF9AF, 0x8948, 0x9969, 0xA90A, 0xB92B,
|
||
|
0x5AF5, 0x4AD4, 0x7AB7, 0x6A96, 0x1A71, 0x0A50, 0x3A33, 0x2A12,
|
||
|
0xDBFD, 0xCBDC, 0xFBBF, 0xEB9E, 0x9B79, 0x8B58, 0xBB3B, 0xAB1A,
|
||
|
0x6CA6, 0x7C87, 0x4CE4, 0x5CC5, 0x2C22, 0x3C03, 0x0C60, 0x1C41,
|
||
|
0xEDAE, 0xFD8F, 0xCDEC, 0xDDCD, 0xAD2A, 0xBD0B, 0x8D68, 0x9D49,
|
||
|
0x7E97, 0x6EB6, 0x5ED5, 0x4EF4, 0x3E13, 0x2E32, 0x1E51, 0x0E70,
|
||
|
0xFF9F, 0xEFBE, 0xDFDD, 0xCFFC, 0xBF1B, 0xAF3A, 0x9F59, 0x8F78,
|
||
|
0x9188, 0x81A9, 0xB1CA, 0xA1EB, 0xD10C, 0xC12D, 0xF14E, 0xE16F,
|
||
|
0x1080, 0x00A1, 0x30C2, 0x20E3, 0x5004, 0x4025, 0x7046, 0x6067,
|
||
|
0x83B9, 0x9398, 0xA3FB, 0xB3DA, 0xC33D, 0xD31C, 0xE37F, 0xF35E,
|
||
|
0x02B1, 0x1290, 0x22F3, 0x32D2, 0x4235, 0x5214, 0x6277, 0x7256,
|
||
|
0xB5EA, 0xA5CB, 0x95A8, 0x8589, 0xF56E, 0xE54F, 0xD52C, 0xC50D,
|
||
|
0x34E2, 0x24C3, 0x14A0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405,
|
||
|
0xA7DB, 0xB7FA, 0x8799, 0x97B8, 0xE75F, 0xF77E, 0xC71D, 0xD73C,
|
||
|
0x26D3, 0x36F2, 0x0691, 0x16B0, 0x6657, 0x7676, 0x4615, 0x5634,
|
||
|
0xD94C, 0xC96D, 0xF90E, 0xE92F, 0x99C8, 0x89E9, 0xB98A, 0xA9AB,
|
||
|
0x5844, 0x4865, 0x7806, 0x6827, 0x18C0, 0x08E1, 0x3882, 0x28A3,
|
||
|
0xCB7D, 0xDB5C, 0xEB3F, 0xFB1E, 0x8BF9, 0x9BD8, 0xABBB, 0xBB9A,
|
||
|
0x4A75, 0x5A54, 0x6A37, 0x7A16, 0x0AF1, 0x1AD0, 0x2AB3, 0x3A92,
|
||
|
0xFD2E, 0xED0F, 0xDD6C, 0xCD4D, 0xBDAA, 0xAD8B, 0x9DE8, 0x8DC9,
|
||
|
0x7C26, 0x6C07, 0x5C64, 0x4C45, 0x3CA2, 0x2C83, 0x1CE0, 0x0CC1,
|
||
|
0xEF1F, 0xFF3E, 0xCF5D, 0xDF7C, 0xAF9B, 0xBFBA, 0x8FD9, 0x9FF8,
|
||
|
0x6E17, 0x7E36, 0x4E55, 0x5E74, 0x2E93, 0x3EB2, 0x0ED1, 0x1EF0
|
||
|
]
|
||
|
import subprocess
|
||
|
import pkg_resources
|
||
|
|
||
|
class Boson(Core):
|
||
|
"""
|
||
|
Opens a FLIR Boson camera. By default, flirpy will attempt to locate your
|
||
|
camera by its USB PID/VID. You can also force a particular port (if you have two
|
||
|
cameras connected, for example) using the port parameter.
|
||
|
|
||
|
Parameters
|
||
|
---
|
||
|
port
|
||
|
the serial port of the camera
|
||
|
|
||
|
baudate
|
||
|
the baudrate to use when connecting to the camera
|
||
|
|
||
|
loglevel
|
||
|
logging level, default is level is WARNING
|
||
|
|
||
|
"""
|
||
|
def __init__(self, port=None, baudrate=921600, loglevel=logging.WARNING):
|
||
|
self.command_count = 0
|
||
|
self.cap = None
|
||
|
self.conn = None
|
||
|
|
||
|
logging.basicConfig(level=loglevel)
|
||
|
self.logger = logging.getLogger(__name__)
|
||
|
|
||
|
if port is None:
|
||
|
port = self.find_serial_device()
|
||
|
if port is not None:
|
||
|
self.connect(port, baudrate)
|
||
|
|
||
|
if self.conn.is_open:
|
||
|
self.logger.info("Connected")
|
||
|
else:
|
||
|
self.connect(port, baudrate)
|
||
|
if self.conn.is_open:
|
||
|
self.logger.info("Connected")
|
||
|
|
||
|
@classmethod
|
||
|
def find_serial_device(self):
|
||
|
"""
|
||
|
Attempts to find and return the serial port that the Boson is connected to.
|
||
|
|
||
|
Returns
|
||
|
-------
|
||
|
string
|
||
|
serial port name
|
||
|
"""
|
||
|
port = None
|
||
|
|
||
|
device_list = list_ports.comports()
|
||
|
|
||
|
VID = 0x09CB
|
||
|
PID = 0x4007
|
||
|
|
||
|
for device in device_list:
|
||
|
if device.vid == VID and device.pid == PID:
|
||
|
port = device.device
|
||
|
break
|
||
|
|
||
|
return port
|
||
|
|
||
|
@classmethod
|
||
|
def find_video_device(self):
|
||
|
"""
|
||
|
Attempts to automatically detect which video device corresponds to the Boson by searching for the PID and VID.
|
||
|
|
||
|
Returns
|
||
|
-------
|
||
|
int
|
||
|
device number
|
||
|
"""
|
||
|
|
||
|
res = None
|
||
|
|
||
|
if sys.platform.startswith('win32'):
|
||
|
device_check_path = pkg_resources.resource_filename('flirpy', 'bin/find_cameras.exe')
|
||
|
device_id = int(subprocess.check_output([device_check_path, "FLIR Video"]).decode())
|
||
|
self.logger.info("Device ID: {}".format(device_id))
|
||
|
if device_id >= 0:
|
||
|
return device_id
|
||
|
|
||
|
elif sys.platform == "darwin":
|
||
|
output = subprocess.check_output(["system_profiler", "SPCameraDataType"]).decode()
|
||
|
devices = [line.strip() for line in output.decode().split("\n") if line.strip().startswith("Model")]
|
||
|
|
||
|
device_id = 0
|
||
|
|
||
|
for device in devices:
|
||
|
if device.contains("VendorID_2507") and device.contains("ProductID_16391"):
|
||
|
return device_id
|
||
|
|
||
|
else:
|
||
|
import pyudev
|
||
|
|
||
|
context = pyudev.Context()
|
||
|
devices = pyudev.Enumerator(context)
|
||
|
|
||
|
path = "/sys/class/video4linux/"
|
||
|
video_devices = [os.path.join(path, device) for device in os.listdir(path)]
|
||
|
dev = []
|
||
|
for device in video_devices:
|
||
|
udev = pyudev.Devices.from_path(context, device)
|
||
|
|
||
|
try:
|
||
|
vid= udev.properties['ID_VENDOR_ID']
|
||
|
pid = udev.properties['ID_MODEL_ID']
|
||
|
|
||
|
if vid.lower() == "09cb" and pid.lower() == "4007":
|
||
|
dev.append(int(device.split('video')[-1]))
|
||
|
except KeyError:
|
||
|
pass
|
||
|
|
||
|
# For some reason multiple devices can show up
|
||
|
# The lower index is often correct, so do a sort first
|
||
|
if len(dev) > 1:
|
||
|
for d in sorted(dev):
|
||
|
cam = cv2.VideoCapture(d + cv2.CAP_V4L2)
|
||
|
data = cam.read()
|
||
|
cam.release()
|
||
|
|
||
|
# data[1] is the image and should be a numpy array
|
||
|
if data[0] == True and data[1] is not None:
|
||
|
res = d
|
||
|
break
|
||
|
elif len(dev) == 1:
|
||
|
res = dev[0]
|
||
|
|
||
|
return res
|
||
|
|
||
|
def setup_video(self, device_id=None):
|
||
|
"""
|
||
|
Setup the camera for video/frame capture.
|
||
|
|
||
|
Attempts to automatically locate the camera, then opens an OpenCV VideoCapture object. The
|
||
|
capture object is setup to capture raw video.
|
||
|
"""
|
||
|
|
||
|
if device_id is None:
|
||
|
self.logger.debug("Locating cameras")
|
||
|
device_id = self.find_video_device()
|
||
|
|
||
|
if device_id is None:
|
||
|
raise ValueError("Boson not connected.")
|
||
|
else:
|
||
|
self.logger.debug("Located camera at {}".format(device_id))
|
||
|
|
||
|
if sys.platform.startswith('linux'):
|
||
|
self.cap = cv2.VideoCapture(device_id + cv2.CAP_V4L2)
|
||
|
elif sys.platform.startswith('win32'):
|
||
|
self.cap = cv2.VideoCapture(device_id + cv2.CAP_DSHOW)
|
||
|
else:
|
||
|
# Catch anything else, e.g. Mac?
|
||
|
self.cap = cv2.VideoCapture(device_id)
|
||
|
|
||
|
if not self.cap.isOpened():
|
||
|
raise IOError("Failed to open capture device {}".format(device_id))
|
||
|
|
||
|
# Attempt to set 320x256 which only has an effect on the Boson 320
|
||
|
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH,640)
|
||
|
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT,512)
|
||
|
|
||
|
# The order of these calls matters!
|
||
|
# The Y16 command tells the Boson to send pre-AGC 16 bit pixels. This means the pixel intensity is linearly
|
||
|
# proportional to the flux incident on the pixel. This also means all internal image processing done by the
|
||
|
# boson is ignored. There are other options, see Boson datasheet.
|
||
|
self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*"Y16 "))
|
||
|
self.cap.set(cv2.CAP_PROP_CONVERT_RGB, 0)
|
||
|
|
||
|
def grab(self, device_id=None):
|
||
|
"""
|
||
|
Captures and returns an image.
|
||
|
|
||
|
Parameters
|
||
|
----------
|
||
|
|
||
|
int
|
||
|
the device ID for the camera. On a laptop, this is likely to be 1 if
|
||
|
you have an internal webcam.
|
||
|
|
||
|
Returns
|
||
|
-------
|
||
|
|
||
|
np.array, or None if an error occurred
|
||
|
captured image
|
||
|
"""
|
||
|
|
||
|
if self.cap is None:
|
||
|
self.setup_video(device_id)
|
||
|
|
||
|
res, image = self.cap.read()
|
||
|
|
||
|
if not res:
|
||
|
self.logger.warn("Failed to capture image")
|
||
|
|
||
|
return image
|
||
|
|
||
|
def release(self):
|
||
|
"""
|
||
|
This will release the camera hardware.
|
||
|
:return:
|
||
|
"""
|
||
|
if self.cap:
|
||
|
self.cap.release()
|
||
|
|
||
|
def reboot(self):
|
||
|
"""
|
||
|
Reboot the camera
|
||
|
"""
|
||
|
function_id = 0x00050010
|
||
|
self._send_packet(function_id)
|
||
|
|
||
|
return
|
||
|
|
||
|
def get_sensor_serial(self):
|
||
|
"""
|
||
|
Get the serial number of the sensor
|
||
|
|
||
|
Returns
|
||
|
-------
|
||
|
int
|
||
|
serial number
|
||
|
"""
|
||
|
function_id = 0x00050006
|
||
|
|
||
|
res = self._send_packet(function_id, receive_size=4)
|
||
|
res = self._decode_packet(res, receive_size=4)
|
||
|
|
||
|
if res is not None and len(res) == 4:
|
||
|
res = struct.unpack(">I", res)[0]
|
||
|
|
||
|
return res
|
||
|
|
||
|
def get_firmware_revision(self):
|
||
|
"""
|
||
|
Get the camera's software revision.
|
||
|
|
||
|
Returns
|
||
|
-------
|
||
|
tuple (int, int, int)
|
||
|
revision (Major, Minor, Patch)
|
||
|
"""
|
||
|
|
||
|
function_id = 0x00050022
|
||
|
|
||
|
res = self._send_packet(function_id, receive_size=12)
|
||
|
res = self._decode_packet(res, receive_size=12)
|
||
|
|
||
|
if res is not None and len(res) == 12:
|
||
|
res = struct.Struct(">iii").unpack_from(res)
|
||
|
|
||
|
return res
|
||
|
|
||
|
def get_part_number(self):
|
||
|
"""
|
||
|
Get the camera part number
|
||
|
|
||
|
Returns
|
||
|
-------
|
||
|
int
|
||
|
part number
|
||
|
"""
|
||
|
function_id = 0x00050004
|
||
|
res = self._send_packet(function_id, receive_size=20)
|
||
|
res = self._decode_packet(res, receive_size=20)
|
||
|
|
||
|
if res is not None and len(res) == 20:
|
||
|
res = res.decode("utf-8")
|
||
|
|
||
|
return res
|
||
|
|
||
|
def do_ffc(self):
|
||
|
"""
|
||
|
Manually request a flat field correction (FFC)
|
||
|
"""
|
||
|
function_id = 0x00050007
|
||
|
self._send_packet(function_id)
|
||
|
|
||
|
return
|
||
|
|
||
|
def get_ffc_state(self):
|
||
|
"""
|
||
|
Returns the FFC state:
|
||
|
|
||
|
0 = FLR_BOSON_NO_FFC_PERFORMED
|
||
|
1 = FLR_BOSON_FFC_IMMINENT
|
||
|
2 = FLR_BOSON_FFC_IN_PROGRESS
|
||
|
3 = FLR_BOSON_FFC_COMPLETE
|
||
|
4 = FLR_BOSON_FFCSTATUS_END
|
||
|
|
||
|
These return values are available as e.g.:
|
||
|
|
||
|
flirpy.camera.boson.FLR_BOSON_NO_FFC_PERFORMED
|
||
|
|
||
|
Returns
|
||
|
-------
|
||
|
|
||
|
int
|
||
|
FFC state
|
||
|
|
||
|
"""
|
||
|
function_id = 0x0005000C
|
||
|
|
||
|
res = self._send_packet(function_id, receive_size=2)
|
||
|
res = self._decode_packet(res, receive_size=2)
|
||
|
|
||
|
if res is not None and len(res) == 2:
|
||
|
res = struct.unpack(">H", res)[0]
|
||
|
|
||
|
return res
|
||
|
|
||
|
def get_ffc_mode(self):
|
||
|
"""
|
||
|
Get the current FFC mode
|
||
|
|
||
|
FLR_BOSON_MANUAL_FFC = 0
|
||
|
FLR_BOSON_AUTO_FFC = 1
|
||
|
FLR_BOSON_EXTERNAL_FFC = 2
|
||
|
FLR_BOSON_SHUTTER_TEST_FFC = 3
|
||
|
FLR_BOSON_FFCMODE_END = 4
|
||
|
|
||
|
|
||
|
Returns
|
||
|
-------
|
||
|
|
||
|
int
|
||
|
FFC mode
|
||
|
"""
|
||
|
function_id = 0x00050013
|
||
|
res = self._send_packet(function_id, receive_size=4)
|
||
|
res = self._decode_packet(res, receive_size=4)
|
||
|
|
||
|
if res is not None and len(res) == 4:
|
||
|
res = struct.unpack(">I", res)[0]
|
||
|
|
||
|
return res
|
||
|
|
||
|
def get_gao_ffc_mode(self):
|
||
|
"""
|
||
|
Get whether the ffc correction is applied in the Boson image processing pipeline.
|
||
|
0 = disabled
|
||
|
1 = enabled
|
||
|
|
||
|
|
||
|
Returns
|
||
|
-------
|
||
|
|
||
|
int
|
||
|
FLR_ENABLE_E
|
||
|
"""
|
||
|
function_id = 0x00000004
|
||
|
res = self._send_packet(function_id, receive_size=4)
|
||
|
res = self._decode_packet(res, receive_size=4)
|
||
|
|
||
|
return struct.unpack(">I", res)[0]
|
||
|
|
||
|
def get_ffc_desired(self):
|
||
|
"""
|
||
|
Get the state of the FFC desired flag. 0 = not desired. 1 = desired.
|
||
|
:return: int 0 or 1
|
||
|
"""
|
||
|
function_id = 0x00050055
|
||
|
res = self._send_packet(function_id, receive_size=4)
|
||
|
res = self._decode_packet(res, receive_size=4)
|
||
|
|
||
|
return struct.unpack(">I", res)[0]
|
||
|
|
||
|
def get_nuc_desired(self):
|
||
|
"""
|
||
|
Get the state of the NUC table switch desired flag. 0 = not desired. 1 = desired.
|
||
|
:return: int 0 or 1
|
||
|
"""
|
||
|
function_id = 0x0005005F
|
||
|
res = self._send_packet(function_id, receive_size=2)
|
||
|
res = self._decode_packet(res, receive_size=2)
|
||
|
|
||
|
return struct.unpack(">H", res)[0]
|
||
|
|
||
|
def do_nuc_table_switch(self):
|
||
|
"""
|
||
|
This will perform a table NUC table switch, if the nuc table switch desired flag is set.
|
||
|
:return:
|
||
|
"""
|
||
|
function_id = 0x00050050
|
||
|
self._send_packet(function_id)
|
||
|
|
||
|
def set_ffc_auto(self):
|
||
|
"""
|
||
|
Set the FFC mode to automatic
|
||
|
"""
|
||
|
function_id = 0x00050012
|
||
|
command = struct.pack(">I", FLR_BOSON_AUTO_FFC)
|
||
|
res = self._send_packet(function_id, data=command)
|
||
|
res = self._decode_packet(res)
|
||
|
|
||
|
return
|
||
|
|
||
|
def set_ffc_manual(self):
|
||
|
"""
|
||
|
Set the FFC mode to manual
|
||
|
"""
|
||
|
function_id = 0x00050012
|
||
|
command = struct.pack(">I", FLR_BOSON_MANUAL_FFC)
|
||
|
res = self._send_packet(function_id, data=command)
|
||
|
res = self._decode_packet(res)
|
||
|
|
||
|
return
|
||
|
|
||
|
def set_ffc_temperature_threshold(self, temp_change):
|
||
|
"""
|
||
|
Set the change in camera temperature (Celsius) required before an FFC is requested.
|
||
|
|
||
|
Parameters
|
||
|
----------
|
||
|
float
|
||
|
temperature change in Celsius
|
||
|
"""
|
||
|
function_id = 0x00050008
|
||
|
command = struct.pack(">H", int(temp_change * 10))
|
||
|
res = self._send_packet(function_id, data=command)
|
||
|
res = self._decode_packet(res)
|
||
|
|
||
|
return
|
||
|
|
||
|
def get_ffc_temperature_threshold(self):
|
||
|
"""
|
||
|
Get the change in camera temperature before an FFC is requested
|
||
|
|
||
|
Returns
|
||
|
-------
|
||
|
float
|
||
|
temperature change in Celsius
|
||
|
"""
|
||
|
function_id = 0x00050009
|
||
|
|
||
|
res = self._send_packet(function_id, receive_size=2)
|
||
|
res = self._decode_packet(res, receive_size=2)
|
||
|
|
||
|
if res is not None and len(res) == 2:
|
||
|
res = struct.unpack(">H", res)[0]/10.0
|
||
|
|
||
|
return res
|
||
|
|
||
|
def set_ffc_frame_threshold(self, seconds):
|
||
|
"""
|
||
|
Set the number of seconds before an FFC is requested.
|
||
|
|
||
|
Parameters
|
||
|
----------
|
||
|
int
|
||
|
seconds between FFC requests
|
||
|
"""
|
||
|
function_id = 0x0005000A
|
||
|
command = struct.pack(">I", seconds)
|
||
|
res = self._send_packet(function_id, data=command)
|
||
|
res = self._decode_packet(res)
|
||
|
|
||
|
return
|
||
|
|
||
|
def get_ffc_frame_threshold(self):
|
||
|
"""
|
||
|
Get the number of frames before an FFC is requested.
|
||
|
"""
|
||
|
function_id = 0x0005000B
|
||
|
res = self._send_packet(function_id, receive_size=4)
|
||
|
res = self._decode_packet(res, receive_size=4)
|
||
|
|
||
|
if res is not None and len(res) == 4:
|
||
|
res = struct.unpack(">I", res)[0]
|
||
|
|
||
|
return res
|
||
|
|
||
|
def get_last_ffc_temperature(self):
|
||
|
"""
|
||
|
Get the temperature (in Kelvin) that the last FFC occured
|
||
|
"""
|
||
|
function_id = 0x0005005E
|
||
|
|
||
|
res = self._send_packet(function_id, receive_size=2)
|
||
|
res = self._decode_packet(res, receive_size=2)
|
||
|
|
||
|
if res is not None and len(res) ==2:
|
||
|
res = struct.unpack(">H", res)[0]/10.0
|
||
|
|
||
|
return res
|
||
|
|
||
|
def get_last_ffc_frame_count(self):
|
||
|
"""
|
||
|
Get the frame count when the last FFC occured
|
||
|
"""
|
||
|
|
||
|
function_id = 0x0005005D
|
||
|
|
||
|
res = self._send_packet(function_id, receive_size=4)
|
||
|
res = self._decode_packet(res, receive_size=4)
|
||
|
|
||
|
if res is not None and len(res) == 4:
|
||
|
res = struct.unpack(">I", res)[0]
|
||
|
|
||
|
return res
|
||
|
|
||
|
def set_num_ffc_frame(self, num_frame):
|
||
|
"""
|
||
|
Set the number of frames for the camera to integrate during FFC. 8 is factory default. With averager on, the
|
||
|
time to perform ffc is doubled.
|
||
|
:param num_frame: int 2, 4, 8, 16.
|
||
|
:return:
|
||
|
"""
|
||
|
function_id = 0x0000000D
|
||
|
command = struct.pack(">H", num_frame)
|
||
|
res = self._send_packet(function_id, data=command)
|
||
|
res = self._decode_packet(res)
|
||
|
|
||
|
return
|
||
|
|
||
|
def get_num_ffc_frame(self):
|
||
|
"""
|
||
|
Get the number of frames the camera integrates during FFC.
|
||
|
:return: int: 2,4,6, or 8
|
||
|
"""
|
||
|
function_id = 0x0000000E
|
||
|
res = self._send_packet(function_id, receive_size=2)
|
||
|
res = self._decode_packet(res)
|
||
|
|
||
|
if res is not None and len(res) == 2:
|
||
|
res = struct.unpack(">H",res)[0]
|
||
|
|
||
|
return res
|
||
|
|
||
|
def get_frame_count(self):
|
||
|
"""
|
||
|
Get the number of frames captured since the camera was turned on.
|
||
|
|
||
|
Returns
|
||
|
-------
|
||
|
int
|
||
|
number of frames
|
||
|
"""
|
||
|
function_id = 0x00020002
|
||
|
|
||
|
res = self._send_packet(function_id, receive_size=4)
|
||
|
res = self._decode_packet(res, receive_size=4)
|
||
|
|
||
|
if res is not None and len(res) == 4:
|
||
|
res = struct.unpack(">I", res)[0]
|
||
|
|
||
|
return res
|
||
|
|
||
|
def get_fpa_temperature(self):
|
||
|
"""
|
||
|
Get the current focal plane array (FPA) temperature in Celsius.
|
||
|
|
||
|
Returns
|
||
|
-------
|
||
|
float
|
||
|
FPA temperature in Celsius
|
||
|
"""
|
||
|
function_id = 0x00050030
|
||
|
|
||
|
res = self._send_packet(function_id, receive_size=2)
|
||
|
res = self._decode_packet(res, receive_size=2)
|
||
|
|
||
|
if res is not None and len(res) == 2:
|
||
|
res = struct.unpack(">H", res)[0]/10.0
|
||
|
|
||
|
return res
|
||
|
|
||
|
def get_camera_serial(self):
|
||
|
"""
|
||
|
Get the camera serial number
|
||
|
|
||
|
Returns
|
||
|
-------
|
||
|
int
|
||
|
serial number
|
||
|
"""
|
||
|
function_id = 0x00050002
|
||
|
|
||
|
res = self._send_packet(function_id, receive_size=4)
|
||
|
res = self._decode_packet(res, receive_size=4)
|
||
|
|
||
|
if res is not None and len(res) == 4:
|
||
|
res = struct.unpack(">I", res)[0]
|
||
|
|
||
|
return res
|
||
|
|
||
|
def set_pwr_on_defaults(self):
|
||
|
"""
|
||
|
Apply the current settings as power on defaults.
|
||
|
:return:
|
||
|
"""
|
||
|
function_id = 0x00050018
|
||
|
self._send_packet(function_id)
|
||
|
|
||
|
return
|
||
|
|
||
|
def set_pwr_on_defaults_factory(self):
|
||
|
"""
|
||
|
This will restore power on defaults to factory settings. The camera should be disconnected and reconnected to
|
||
|
make sure all changes take effect. A reboot does not appear to be enough.
|
||
|
:return:
|
||
|
"""
|
||
|
function_id = 0x0005001B
|
||
|
self._send_packet(function_id) # Read factory boot settings and set them as current settings
|
||
|
self.set_pwr_on_defaults() # Set the factory boot settings as power on settings.
|
||
|
self.logger.warn("consider cycling power and reconnecting to camera to insure all factory settings take effect.")
|
||
|
|
||
|
return
|
||
|
|
||
|
def set_averager(self, value):
|
||
|
"""
|
||
|
This will set the smart averager state to the value
|
||
|
:param value: int 0=off (factory default), 1=on
|
||
|
:return:
|
||
|
"""
|
||
|
function_id = 0x0000000B
|
||
|
command = struct.pack(">I", value)
|
||
|
# Tell the camera to set the smart averager state
|
||
|
self._send_packet(function_id, data=command)
|
||
|
# But this change does not happen immediately. Instead it must be saved as a power on default.
|
||
|
self.set_pwr_on_defaults() # Set the current states as power on defaults
|
||
|
self.logger.warn("The camera must be disconnected and reconnected for the change in averager state to take effect.")
|
||
|
|
||
|
return
|
||
|
|
||
|
def get_averager(self):
|
||
|
"""
|
||
|
Get the current state of the smart averager.
|
||
|
:return: int. 0=off 1=on.
|
||
|
"""
|
||
|
function_id = 0x0000000C
|
||
|
res = self._send_packet(function_id, receive_size=4)
|
||
|
res = self._decode_packet(res, receive_size=4)
|
||
|
|
||
|
if res is not None and len(res) == 4:
|
||
|
res = struct.unpack(">I", res)[0]
|
||
|
|
||
|
return res
|
||
|
|
||
|
def _decode_packet(self, data, receive_size=0):
|
||
|
"""
|
||
|
Decodes a data packet from the camera.
|
||
|
|
||
|
Packet Format:
|
||
|
|
||
|
Start frame byte = 0x8E
|
||
|
Channel ID = 0
|
||
|
Bytes 0:3 - sequence number
|
||
|
Bytes 4:7 - function ID
|
||
|
Bytes 8:11 - return code
|
||
|
Bytes 12: - payload (optional)
|
||
|
CRC bytes - unsigned 16-bit CRC
|
||
|
End frame byte = 0xAE
|
||
|
|
||
|
Non-zero return codes are logged from the camera as warnings.
|
||
|
|
||
|
"""
|
||
|
payload = None
|
||
|
payload_len = len(data) - 15
|
||
|
|
||
|
if payload_len > 0:
|
||
|
try:
|
||
|
frame = struct.Struct(">BBIII{}sB".format(payload_len))
|
||
|
res = frame.unpack(data)
|
||
|
start_marker, channel_id, sequence, function_id, return_code, payload, end_marker = res
|
||
|
except Exception as e:
|
||
|
self.logger.error(str(e))
|
||
|
self.logger.error("Failed to unpack payload")
|
||
|
return None
|
||
|
elif payload_len < 0:
|
||
|
self.logger.error("Failed to open payload")
|
||
|
return None
|
||
|
else:
|
||
|
try:
|
||
|
frame = struct.Struct(">BBIIIHB")
|
||
|
res = frame.unpack(data)
|
||
|
|
||
|
start_marker, channel_id, sequence, function_id, return_code, crc, end_marker = res
|
||
|
except Exception as e:
|
||
|
self.logger.error(str(e))
|
||
|
self.logger.error("Failed to unpack payload")
|
||
|
return None
|
||
|
|
||
|
if return_code == 0x0203:
|
||
|
self.logger.warning("Boson response: range error")
|
||
|
elif return_code == 0x017F:
|
||
|
self.logger.warning("Boson response: buffer overflow")
|
||
|
elif return_code == 0x017E:
|
||
|
self.logger.warning("Boson response: excess bytes")
|
||
|
elif return_code == 0x017D:
|
||
|
self.logger.warning("Boson response: insufficient bytes")
|
||
|
elif return_code == 0x0170:
|
||
|
self.logger.warning("Boson response: unspecified error")
|
||
|
elif return_code == 0x0162:
|
||
|
self.logger.warning("Boson response: bad payload")
|
||
|
elif return_code == 0x0161:
|
||
|
self.logger.warning("Boson response: bad command ID")
|
||
|
|
||
|
if start_marker != 0x8E and end_marker != 0xAE:
|
||
|
self.logger.warning("Invalid frame markers")
|
||
|
|
||
|
header = struct.Struct(">BBIII")
|
||
|
header_bytes = bytearray(header.pack(start_marker,
|
||
|
channel_id,
|
||
|
sequence,
|
||
|
function_id,
|
||
|
return_code))
|
||
|
|
||
|
unstuffed_payload = None
|
||
|
|
||
|
if payload_len > 0:
|
||
|
unstuffed_payload = self._unstuff(payload) # data + crc
|
||
|
|
||
|
frame = struct.Struct(">{}sH".format(len(unstuffed_payload)-2)) # split data, crc
|
||
|
unstuffed_payload, crc = frame.unpack(unstuffed_payload)
|
||
|
|
||
|
crc_bytes = self._crc(header_bytes, unstuffed_payload)
|
||
|
|
||
|
if crc != crc_bytes:
|
||
|
self.logger.debug("Got crc {}, but expected {}".format(crc, crc_bytes))
|
||
|
self.logger.debug("Payload: {}".format(payload))
|
||
|
self.logger.debug("Frame: {}".format(data))
|
||
|
self.logger.warning("Invalid checksum, but data may be OK")
|
||
|
|
||
|
return unstuffed_payload
|
||
|
|
||
|
def _bitstuff(self, data):
|
||
|
"""
|
||
|
Escapes a buffer for transmission over serial
|
||
|
"""
|
||
|
temp = bytearray()
|
||
|
|
||
|
for byte in data:
|
||
|
|
||
|
if sys.version_info[0] < 3:
|
||
|
byte = ord(byte)
|
||
|
|
||
|
if byte == 0x8E:
|
||
|
temp.append(0x9E)
|
||
|
temp.append(0x81)
|
||
|
elif byte == 0x9E:
|
||
|
temp.append(0x9E)
|
||
|
temp.append(0x91)
|
||
|
elif byte == 0xAE:
|
||
|
temp.append(0x9E)
|
||
|
temp.append(0xA1)
|
||
|
else:
|
||
|
temp.append(byte)
|
||
|
|
||
|
return temp
|
||
|
|
||
|
def _unstuff(self, data):
|
||
|
"""
|
||
|
Un-escapes a buffer for transmission over serial
|
||
|
"""
|
||
|
temp = bytearray()
|
||
|
unstuff = False
|
||
|
|
||
|
for i, byte in enumerate(data):
|
||
|
|
||
|
if sys.version_info[0] < 3:
|
||
|
byte = ord(byte)
|
||
|
|
||
|
if unstuff:
|
||
|
temp.append(byte + 0xD)
|
||
|
unstuff = False
|
||
|
elif byte == 0x9E:
|
||
|
if i == (len(data) - 1):
|
||
|
continue
|
||
|
elif data[i+1] in [0x81, 0x91, 0xA1]:
|
||
|
unstuff = True
|
||
|
else:
|
||
|
continue
|
||
|
else:
|
||
|
temp.append(byte)
|
||
|
|
||
|
return temp
|
||
|
|
||
|
def receive(self, timeout=1):
|
||
|
frame = self.conn.read(17) # Should be at least this big
|
||
|
tstart = time.time()
|
||
|
|
||
|
while True:
|
||
|
if len(frame) > 0 and frame[-1] == 0xAE:
|
||
|
break
|
||
|
|
||
|
frame += self.conn.read_all()
|
||
|
|
||
|
if len(frame) >= 1544:
|
||
|
break
|
||
|
|
||
|
if time.time() - tstart > timeout:
|
||
|
break
|
||
|
|
||
|
return frame
|
||
|
|
||
|
def _crc(self, header, payload=None):
|
||
|
"""
|
||
|
Compute a CRC on a data packet
|
||
|
"""
|
||
|
data = header[1:]
|
||
|
|
||
|
if payload is not None:
|
||
|
data += bytes(payload)
|
||
|
|
||
|
return binascii.crc_hqx(data, 0x1D0F)
|
||
|
|
||
|
def _send_packet(self, function_id, data=None, receive_size=0):
|
||
|
"""
|
||
|
Sends a data packet to the camera.
|
||
|
|
||
|
Packet Format:
|
||
|
|
||
|
Start frame byte = 0x8E
|
||
|
Channel ID = 0
|
||
|
Bytes 0:3 - sequence number
|
||
|
Bytes 4:7 - function ID
|
||
|
Bytes 8:11 - 0xFFFFFFFF
|
||
|
Bytes 12: - payload (optional)
|
||
|
CRC bytes - unsigned 16-bit CRC
|
||
|
End frame byte = 0xAE
|
||
|
"""
|
||
|
|
||
|
start_frame = 0x8E
|
||
|
end_frame = 0xAE
|
||
|
channel_number = 0x00
|
||
|
|
||
|
header = struct.Struct(">BBIII")
|
||
|
header_bytes = bytearray(header.pack(start_frame,
|
||
|
channel_number,
|
||
|
self.command_count,
|
||
|
function_id,
|
||
|
0xFFFFFFFF))
|
||
|
|
||
|
# The CRC is computed from the channel number to the last data byte.
|
||
|
# It is computed on the "raw" payload, before bitstuffing
|
||
|
crc_bytes = self._crc(header_bytes, data)
|
||
|
|
||
|
footer = struct.Struct(">HB")
|
||
|
footer_bytes = footer.pack(crc_bytes, end_frame)
|
||
|
|
||
|
# Stuff the data.
|
||
|
payload = header_bytes
|
||
|
|
||
|
if data is not None:
|
||
|
payload += self._bitstuff(data)
|
||
|
|
||
|
payload += footer_bytes
|
||
|
|
||
|
self.send(payload)
|
||
|
res = self.receive()
|
||
|
|
||
|
return res
|
||
|
|
||
|
def test_capture_unix():
|
||
|
with Boson() as camera:
|
||
|
res = camera.grab()
|
||
|
|
||
|
assert res is not None
|
||
|
assert len(res.shape) == 2
|
||
|
assert res.dtype == "uint16"
|
||
|
|
||
|
|
||
|
camera =Boson( loglevel=logging.INFO)
|
||
|
print(camera.logger)
|
||
|
img = camera.grab(2113)
|
||
|
|
||
|
plt.imshow(img, cmap='magma')
|
||
|
plt.colorbar()
|
||
|
plt.show()
|
||
|
|
||
|
# print(img.shape)
|
||
|
# print(camera.get_fpa_temperature())
|
||
|
# cv2.imshow("Imgae",cv2.)
|
||
|
cv2.waitKey(0)
|