# -*- coding: utf-8 -*-
"""
Raspberry Pi camera implementation of the PiCameraStreamer class.
**NOTES:**
Still port used for image capture.
Preview port reserved for onboard GPU preview.
Video port:
* Splitter port 0: Image capture (if `use_video_port == True`)
* Splitter port 1: Streaming frames
* Splitter port 2: Video capture
* Splitter port 3: [Currently unused]
PiCameraStreamer streams at video_resolution
Camera capture resolution set to stream_resolution in frames()
Video port uses that resolution for everything. If a different resolution
is specified for video capture, this is handled by the resizer.
Still capture (if use_video_port == False) uses pause_stream
to temporarily increase the capture resolution.
"""
import logging
import time
# Type hinting
from typing import Tuple
import numpy as np
# Pi camera
import picamerax
import picamerax.array
from openflexure_microscope.camera.base import BaseCamera
from openflexure_microscope.paths import settings_file_path
from openflexure_microscope.utilities import json_to_ndarray, ndarray_to_json
# Richard's fix gain
from .set_picamera_gain import set_analog_gain, set_digital_gain
# MAIN CLASS
[docs]class PiCameraStreamer(BaseCamera):
"""Raspberry Pi camera implementation of PiCameraStreamer."""
picamera_settings_keys = [
"exposure_mode",
"analog_gain",
"digital_gain",
"shutter_speed",
"awb_gains",
"awb_mode",
"framerate",
"saturation",
"iso",
"brightness",
"contrast",
"crop",
"drc_strength",
"exposure_compensation",
"image_effect",
"meter_mode",
"sharpness",
"annotate_text",
"annotate_text_size",
"zoom",
]
def __init__(self):
# Run BaseCamera init
BaseCamera.__init__(self)
# Attach to Pi camera
self.camera = (
picamerax.PiCamera()
) #: :py:class:`picamerax.PiCamera`: Picamera object
# Store state of PiCameraStreamer
self.preview_active = False
# Reset variable states
self.set_zoom(1.0)
# Set default settings
self.image_resolution = tuple(
self.camera.MAX_RESOLUTION
) #: tuple: Resolution for image captures
self.stream_resolution = (
832,
624,
) #: tuple: Resolution for stream and video captures
self.numpy_resolution = (
1312,
976,
) #: tuple: Resolution for numpy array captures
self.jpeg_quality = 100 #: int: JPEG quality
self.mjpeg_quality = 75 #: int: MJPEG quality
# Set default lens shading table path
self.picamera_lst_path = settings_file_path(
"picamera_lst.npy"
) #: str: Path of .npy lens shading table file
# Start the stream worker on init
self.start_worker()
@property
def configuration(self):
"""The current camera configuration."""
return {"board": self.camera.revision}
@property
def state(self):
"""The current read-only camera state."""
return {}
[docs] def initialisation(self):
"""Run any initialisation code when the frame iterator starts."""
[docs] def close(self):
"""Close the Raspberry Pi PiCameraStreamer."""
# Run BaseCamera close method
BaseCamera.close(self)
# Detach Pi camera
if self.camera:
self.camera.close()
# HANDLE SETTINGS
[docs] def read_settings(self) -> dict:
"""
Return config dictionary of the PiCameraStreamer.
"""
# Get config items from the base class
conf_dict = BaseCamera.read_settings(self)
# Include device-specific config items
conf_dict.update(
{
"stream_resolution": self.stream_resolution,
"image_resolution": self.image_resolution,
"numpy_resolution": self.numpy_resolution,
"jpeg_quality": self.jpeg_quality,
"mjpeg_quality": self.mjpeg_quality,
"picamera": {},
}
)
# Include a subset of picamera properties. Excludes lens shading table
for key in PiCameraStreamer.picamera_settings_keys:
try:
value = getattr(self.camera, key)
logging.debug("Reading PiCamera().%s: %s", key, value)
conf_dict["picamera"][key] = value
except AttributeError:
logging.debug("Unable to read PiCamera attribute %s", (key))
# Include a serialised lens shading table
if (
hasattr(self.camera, "lens_shading_table")
and getattr(self.camera, "lens_shading_table") is not None
):
conf_dict["picamera"]["lens_shading_table"] = ndarray_to_json(
getattr(self.camera, "lens_shading_table")
)
return conf_dict
[docs] def update_settings(self, config: dict):
"""
Write a config dictionary to the PiCameraStreamer config.
The passed dictionary may contain other parameters not relevant to
camera config. Eg. Passing a general config file will work fine.
Args:
config (dict): Dictionary of config parameters.
"""
paused_stream = False
logging.debug("PiCameraStreamer: Applying config:")
logging.debug(config)
with self.lock(timeout=None):
# Apply valid config params to Picamera object
if not self.record_active: # If not recording a video
# Pause stream while changing settings
if self.stream_active: # If stream is active
logging.info("Pausing stream to update config.")
self.stop_stream_recording() # Pause stream
paused_stream = True # Remember to unpause stream when done
# PiCamera parameters
if "picamera" in config: # If new settings are given
self.apply_picamera_settings(
config["picamera"], pause_for_effect=True
)
# Handle lens shading if camera supports it
if (
hasattr(self.camera, "lens_shading_table")
and "lens_shading_table" in config["picamera"]
):
try:
self.camera.lens_shading_table = json_to_ndarray(
config["picamera"].get("lens_shading_table")
)
except KeyError as e:
logging.error(e)
# PiCameraStreamer parameters
for key, value in config.items(): # For each provided setting
if (key != "picamera") and hasattr(self, key):
setattr(self, key, value)
# If stream was paused to update config, unpause
if paused_stream:
logging.info("Resuming stream.")
self.start_stream_recording()
else:
raise Exception(
"Cannot update camera config while recording is active."
)
[docs] def apply_picamera_settings(
self, settings_dict: dict, pause_for_effect: bool = True
):
"""
Args:
settings_dict (dict): Dictionary of properties to apply to the :py:class:`picamerax.PiCamera`: object
pause_for_effect (bool): Pause tactically to reduce risk of timing issues
"""
# Set exposure mode
if "exposure_mode" in settings_dict:
logging.debug(
"Applying exposure_mode: %s", (settings_dict["exposure_mode"])
)
self.camera.exposure_mode = settings_dict["exposure_mode"]
# Apply gains and let them settle
if "analog_gain" in settings_dict:
logging.debug("Applying analog_gain: %s", (settings_dict["analog_gain"]))
set_analog_gain(self.camera, float(settings_dict["analog_gain"]))
if "digital_gain" in settings_dict:
logging.debug("Applying digital_gain: %s", (settings_dict["digital_gain"]))
set_digital_gain(self.camera, float(settings_dict["digital_gain"]))
# Apply shutter speed
if "shutter_speed" in settings_dict:
logging.debug(
"Applying shutter_speed: %s", (settings_dict["shutter_speed"])
)
self.camera.shutter_speed = int(settings_dict["shutter_speed"])
time.sleep(0.2) # Let gains settle
# Handle AWB in a half-smart way
if "awb_gains" in settings_dict:
logging.debug("Applying awb_mode: off")
self.camera.awb_mode = "off"
logging.debug("Applying awb_gains: %s", (settings_dict["awb_gains"]))
self.camera.awb_gains = settings_dict["awb_gains"]
elif "awb_mode" in settings_dict:
logging.debug("Applying awb_mode: %s", (settings_dict["awb_mode"]))
self.camera.awb_mode = settings_dict["awb_mode"]
# Handle some properties that can be quickly applied
batched_keys = ["framerate", "saturation"]
for key in batched_keys:
if (key in settings_dict) and hasattr(self.camera, key):
logging.debug("Applying %s: %s", key, settings_dict[key])
setattr(self.camera, key, settings_dict[key])
# Final optional pause to settle
if pause_for_effect:
time.sleep(0.2)
[docs] def set_zoom(self, zoom_value: float = 1.0) -> None:
"""
Change the camera zoom, handling re-centering and scaling.
"""
with self.lock(timeout=None):
self.zoom_value = float(zoom_value)
if self.zoom_value < 1:
self.zoom_value = 1
# Richard's code for zooming !
fov = self.camera.zoom
centre = np.array([fov[0] + fov[2] / 2.0, fov[1] + fov[3] / 2.0])
size = 1.0 / self.zoom_value
# If the new zoom value would be invalid, move the centre to
# keep it within the camera's sensor (this is only relevant
# when zooming out, if the FoV is not centred on (0.5, 0.5)
for i in range(2):
if np.abs(centre[i] - 0.5) + size / 2 > 0.5:
centre[i] = 0.5 + (1.0 - size) / 2 * np.sign(centre[i] - 0.5)
logging.info("setting zoom, centre %s, size %s", centre, size)
new_fov = (centre[0] - size / 2, centre[1] - size / 2, size, size)
self.camera.zoom = new_fov
# LAUNCH ACTIONS
[docs] def start_preview(self, fullscreen=True, window=None):
"""Start the on board GPU camera preview."""
with self.lock(timeout=1):
try:
if not self.camera.preview:
logging.debug("Starting preview")
self.camera.start_preview(fullscreen=fullscreen, window=window)
else:
logging.debug("Resizing preview")
if window:
self.camera.preview.window = window
if fullscreen:
self.camera.preview.fullscreen = fullscreen
self.preview_active = True
except picamerax.exc.PiCameraMMALError as e:
logging.error(
"Suppressed a MMALError in start_preview. Exception: %s", (e)
)
except picamerax.exc.PiCameraValueError as e:
logging.error(
"Suppressed a ValueError exception in start_preview. Exception: %s",
(e),
)
[docs] def stop_preview(self):
"""Stop the on board GPU camera preview."""
with self.lock(timeout=1):
if self.camera.preview:
self.camera.stop_preview()
self.preview_active = False
[docs] def start_recording(self, output, fmt: str = "h264", quality: int = 15):
"""Start recording.
Start a new video recording, writing to a output object.
Args:
output: String or file-like object to write capture data to
fmt (str): Format of the capture.
quality (int): Video recording quality.
Returns:
output_object (str/BytesIO): Target object.
"""
with self.lock(timeout=5):
# Start recording method only if a current recording is not running
if not self.record_active:
# Start the camera video recording on port 2
logging.info("Recording to %s", (output))
self.camera.start_recording(
output,
format=fmt,
splitter_port=2,
resize=self.stream_resolution,
quality=quality,
)
# Update state
self.record_active = True
return output
else:
logging.warning(
"Cannot start a new recording\
until the current recording has stopped."
)
return None
[docs] def stop_recording(self):
"""Stop the last started video recording on splitter port 2."""
with self.lock(timeout=5):
# Stop the camera video recording on port 2
logging.info("Stopping recording")
self.camera.stop_recording(splitter_port=2)
logging.info("Recording stopped")
# Update state
self.record_active = False
[docs] def stop_stream_recording(self, splitter_port: int = 1, **kwargs) -> None:
"""
Sets the camera resolution to the still-image resolution, and stops recording if the stream is active.
Args:
splitter_port (int): Splitter port to stop recording on
"""
for k in kwargs.keys():
logging.warning(
"Warning, kwarg %s is invalid for stop_stream_recording.", k
)
with self.lock:
# Stop the camera video recording on port 1
try:
self.camera.stop_recording(splitter_port=splitter_port)
except picamerax.exc.PiCameraNotRecording:
logging.info("Not recording on splitter_port %s", (splitter_port))
else:
logging.info(
"Stopped MJPEG stream on port %s. Switching to %s.",
splitter_port,
self.image_resolution,
)
# Increase the resolution for taking an image
time.sleep(
0.2
) # Sprinkled a sleep to prevent camera getting confused by rapid commands
self.camera.resolution = self.image_resolution
[docs] def start_stream_recording(self, splitter_port: int = 1, **kwargs) -> None:
"""
Sets the camera resolution to the video/stream resolution, and starts recording if the stream should be active.
Args:
splitter_port (int): Splitter port to start recording on
"""
for k in kwargs.keys():
logging.warning(
"Warning, kwarg %s is invalid for stop_stream_recording.", k
)
with self.lock(timeout=None):
# Reduce the resolution for video streaming
try:
self.camera._check_recording_stopped() # pylint: disable=W0212
except picamerax.exc.PiCameraRuntimeError:
logging.info(
"Error while changing resolution: Recording already running."
)
else:
self.camera.resolution = self.stream_resolution
# Sprinkled a sleep to prevent camera getting confused by rapid commands
time.sleep(0.2)
# If the stream should be active
if self.stream_active:
try:
# Start recording on stream port
self.camera.start_recording(
self.stream,
format="mjpeg",
quality=self.mjpeg_quality,
bitrate=-1, # RWB: disable bitrate control
# (bitrate control makes JPEG size less good as a focus
# metric)
splitter_port=splitter_port,
)
except picamerax.exc.PiCameraAlreadyRecording:
logging.info(
"Error while starting preview: Recording already running."
)
else:
logging.debug(
"Started MJPEG stream at %s on port %s",
self.stream_resolution,
splitter_port,
)
[docs] def capture(
self,
output,
fmt: str = "jpeg",
use_video_port: bool = False,
resize: Tuple[int, int] = None,
bayer: bool = True,
thumbnail: tuple = None,
):
"""
Capture a still image to a StreamObject.
Defaults to JPEG format.
Target object can be overridden for development purposes.
Args:
output: String or file-like object to write capture data to
fmt (str): Format of the capture.
use_video_port (bool): Capture from the video port used for streaming. Lower resolution, faster.
resize ((int, int)): Resize the captured image.
bayer (bool): Store raw bayer data in capture
Returns:
output_object (str/BytesIO): Target object.
"""
with self.lock:
logging.info("Capturing to %s", (output))
# Set resolution and stop stream recording if necessary
if not use_video_port:
self.stop_stream_recording()
self.camera.capture(
output,
format=fmt,
quality=self.jpeg_quality,
resize=resize,
bayer=(not use_video_port) and bayer,
use_video_port=use_video_port,
thumbnail=thumbnail,
)
# Set resolution and start stream recording if necessary
if not use_video_port:
self.start_stream_recording()
return output
[docs] def array(self, use_video_port=True) -> np.ndarray:
"""Capture an uncompressed still RGB image to a Numpy array.
Args:
use_video_port (bool): Capture from the video port used for streaming. Lower resolution, faster.
resize ((int, int)): Resize the captured image.
Returns:
output_array (np.ndarray): Output array of capture
"""
with self.lock:
logging.debug("Creating PiRGBArray")
with picamerax.array.PiRGBArray(self.camera) as output:
logging.info("Capturing to %s", (output))
self.camera.capture(output, format="rgb", use_video_port=use_video_port)
return output.array
# HANDLE STREAM FRAMES
[docs] def wait_for_camera(self, timeout=5):
"""Wait for camera object, with 5 second timeout."""
timeout_time = time.time() + timeout
while not self.camera:
if time.time() > timeout_time:
raise TimeoutError("Timeout waiting for camera")
else:
pass
[docs] def frames(self):
"""
Create generator that returns frames from the camera.
Records video from port 1 to a byte stream,
and iterates sequential frames.
"""
# Run this initialisation method
self.initialisation()
self.wait_for_camera()
# Start stream recording (and set resolution)
self.start_stream_recording()
logging.debug("STREAM ACTIVE")
# While the iterator is not closed
try:
while True:
# Wait for the next frame and then yield it
yield self.stream.getframe()
# When GeneratorExit or StopIteration raised, run cleanup code
finally:
# Stop stream recording (and set resolution)
self.stop_stream_recording()
logging.debug("FRAME ITERATOR END")