# -*- coding: utf-8 -*-
"""
Raspberry Pi camera implementation of the PiCameraStreamer class.
**NOTES:**
Still port used for image capture.
Preview port reserved for onboard GPU preview.
Video port:
* Splitter port 0: Image capture (if `use_video_port == True`)
* Splitter port 1: Streaming frames
* Splitter port 2: Video capture
* Splitter port 3: [Currently unused]
PiCameraStreamer streams at video_resolution
Camera capture resolution set to stream_resolution in frames()
Video port uses that resolution for everything. If a different resolution
is specified for video capture, this is handled by the resizer.
Still capture (if use_video_port == False) uses pause_stream
to temporarily increase the capture resolution.
"""
import logging
import time
# Type hinting
from typing import BinaryIO, Optional, Tuple, Union
import numpy as np
# Pi camera
import picamerax
import picamerax.array
from openflexure_microscope.camera.base import BaseCamera
from openflexure_microscope.utilities import json_to_ndarray, ndarray_to_json
# Richard's fix gain
from .set_picamera_gain import set_analog_gain, set_digital_gain
# MAIN CLASS
[docs]class PiCameraStreamer(BaseCamera):
"""Raspberry Pi camera implementation of PiCameraStreamer."""
picamera_settings_keys = [
"exposure_mode",
"analog_gain",
"digital_gain",
"shutter_speed",
"awb_gains",
"awb_mode",
"framerate",
"saturation",
"iso",
"brightness",
"contrast",
"crop",
"drc_strength",
"exposure_compensation",
"image_effect",
"meter_mode",
"sharpness",
"annotate_text",
"annotate_text_size",
"zoom",
]
def __init__(self):
# Run BaseCamera init
BaseCamera.__init__(self)
#: :py:class:`picamerax.PiCamera`: Attached Picamera object
self.picamera: picamerax.PiCamera = picamerax.PiCamera()
# Store state of PiCameraStreamer
self.preview_active: bool = False
# Reset variable states
self.set_zoom(1.0)
#: tuple: Resolution for image captures
self.image_resolution: Tuple[int, int] = tuple(self.picamera.MAX_RESOLUTION)
#: tuple: Resolution for stream and video captures
self.stream_resolution: Tuple[int, int] = (832, 624)
#: tuple: Resolution for numpy array captures
self.numpy_resolution: Tuple[int, int] = (1312, 976)
self.jpeg_quality: int = 100 #: int: JPEG quality
self.mjpeg_quality: int = 75 #: int: MJPEG quality
self.mjpeg_bitrate: int = -1 #: int: MJPEG quality
# Solid bitrate options:
# -1: Maximum
# 25000000: High
# 17000000: Normal
# 5000000: Low (may impact fast AF)
# 2500000: Very low (may impact fast AF)
# Start stream recording (and set resolution)
self.start_stream()
# Wait until frames are available
logging.debug("Waiting for frames...")
self.stream.new_frame.wait()
logging.debug("Camera initialised")
@property
def camera(self):
logging.warning(
"PiCameraStreamer.camera is deprecated. Replace with PiCameraStreamer.picamera"
)
return self.picamera
@property
def configuration(self) -> dict:
"""The current camera configuration."""
return {"board": self.picamera.revision}
@property
def state(self) -> dict:
"""The current read-only camera state."""
return {}
[docs] def close(self):
"""Close the Raspberry Pi PiCameraStreamer."""
# Stop stream recording
self.stop_stream()
# Run BaseCamera close method
super().close()
# Detach Pi camera
if self.picamera:
self.picamera.close()
# HANDLE SETTINGS
[docs] def read_settings(self) -> dict:
"""
Return config dictionary of the PiCameraStreamer.
"""
conf_dict: dict = {
"stream_resolution": self.stream_resolution,
"image_resolution": self.image_resolution,
"numpy_resolution": self.numpy_resolution,
"jpeg_quality": self.jpeg_quality,
"mjpeg_quality": self.mjpeg_quality,
"mjpeg_bitrate": self.mjpeg_bitrate,
"picamera": {},
}
# Include a subset of picamera properties. Excludes lens shading table
for key in PiCameraStreamer.picamera_settings_keys:
try:
value = getattr(self.picamera, key)
logging.debug("Reading PiCamera().%s: %s", key, value)
conf_dict["picamera"][key] = value
except AttributeError:
logging.debug("Unable to read PiCamera attribute %s", (key))
# Include a serialised lens shading table
if (
hasattr(self.picamera, "lens_shading_table")
and getattr(self.picamera, "lens_shading_table") is not None
):
conf_dict["picamera"]["lens_shading_table"] = ndarray_to_json(
getattr(self.picamera, "lens_shading_table")
)
return conf_dict
[docs] def update_settings(self, config: dict):
"""
Write a config dictionary to the PiCameraStreamer config.
The passed dictionary may contain other parameters not relevant to
camera config. Eg. Passing a general config file will work fine.
Args:
config (dict): Dictionary of config parameters.
"""
paused_stream = False
logging.debug("PiCameraStreamer: Applying config:")
logging.debug(config)
with self.lock(timeout=None):
# Apply valid config params to Picamera object
if not self.record_active: # If not recording a video
# Pause stream while changing settings
if self.stream_active: # If stream is active
logging.info("Pausing stream to update config.")
self.stop_stream() # Pause stream
paused_stream = True # Remember to unpause stream when done
# PiCamera parameters
if "picamera" in config: # If new settings are given
self.apply_picamera_settings(
config["picamera"], pause_for_effect=True
)
# Handle lens shading if camera supports it
if (
hasattr(self.picamera, "lens_shading_table")
and "lens_shading_table" in config["picamera"]
):
try:
self.picamera.lens_shading_table = json_to_ndarray(
config["picamera"].get("lens_shading_table")
)
except KeyError as e:
logging.error(e)
# PiCameraStreamer parameters
for key, value in config.items(): # For each provided setting
if (key != "picamera") and hasattr(self, key):
setattr(self, key, value)
# If stream was paused to update config, unpause
if paused_stream:
logging.info("Resuming stream.")
self.start_stream()
else:
raise Exception(
"Cannot update camera config while recording is active."
)
[docs] def apply_picamera_settings(
self, settings_dict: dict, pause_for_effect: bool = True
):
"""
Args:
settings_dict (dict): Dictionary of properties to apply to the :py:class:`picamerax.PiCamera`: object
pause_for_effect (bool): Pause tactically to reduce risk of timing issues
"""
# Set exposure mode
if "exposure_mode" in settings_dict:
logging.debug(
"Applying exposure_mode: %s", (settings_dict["exposure_mode"])
)
self.picamera.exposure_mode = settings_dict["exposure_mode"]
# Apply gains and let them settle
if "analog_gain" in settings_dict:
logging.debug("Applying analog_gain: %s", (settings_dict["analog_gain"]))
set_analog_gain(self.picamera, float(settings_dict["analog_gain"]))
if "digital_gain" in settings_dict:
logging.debug("Applying digital_gain: %s", (settings_dict["digital_gain"]))
set_digital_gain(self.picamera, float(settings_dict["digital_gain"]))
# Apply shutter speed
if "shutter_speed" in settings_dict:
logging.debug(
"Applying shutter_speed: %s", (settings_dict["shutter_speed"])
)
self.picamera.shutter_speed = int(settings_dict["shutter_speed"])
time.sleep(0.2) # Let gains settle
# Handle AWB in a half-smart way
if "awb_gains" in settings_dict:
logging.debug("Applying awb_mode: off")
self.picamera.awb_mode = "off"
logging.debug("Applying awb_gains: %s", (settings_dict["awb_gains"]))
self.picamera.awb_gains = settings_dict["awb_gains"]
elif "awb_mode" in settings_dict:
logging.debug("Applying awb_mode: %s", (settings_dict["awb_mode"]))
self.picamera.awb_mode = settings_dict["awb_mode"]
# Handle some properties that can be quickly applied
batched_keys = ["framerate", "saturation"]
for key in batched_keys:
if (key in settings_dict) and hasattr(self.picamera, key):
logging.debug("Applying %s: %s", key, settings_dict[key])
setattr(self.picamera, key, settings_dict[key])
# Final optional pause to settle
if pause_for_effect:
time.sleep(0.2)
[docs] def set_zoom(self, zoom_value: Union[float, int] = 1.0) -> None:
"""
Change the camera zoom, handling re-centering and scaling.
"""
with self.lock(timeout=None):
self.zoom_value = float(zoom_value)
if self.zoom_value < 1:
self.zoom_value = 1
# Richard's code for zooming !
fov = self.picamera.zoom
centre = np.array([fov[0] + fov[2] / 2.0, fov[1] + fov[3] / 2.0])
size = 1.0 / self.zoom_value
# If the new zoom value would be invalid, move the centre to
# keep it within the camera's sensor (this is only relevant
# when zooming out, if the FoV is not centred on (0.5, 0.5)
for i in range(2):
if np.abs(centre[i] - 0.5) + size / 2 > 0.5:
centre[i] = 0.5 + (1.0 - size) / 2 * np.sign(centre[i] - 0.5)
logging.info("setting zoom, centre %s, size %s", centre, size)
new_fov = (centre[0] - size / 2, centre[1] - size / 2, size, size)
self.picamera.zoom = new_fov
[docs] def start_preview(
self,
fullscreen: bool = True,
window: Optional[Tuple[int, int, int, int]] = None,
):
"""Start the on board GPU camera preview."""
with self.lock(timeout=1):
try:
if not self.picamera.preview:
logging.debug("Starting preview")
self.picamera.start_preview(fullscreen=fullscreen, window=window)
else:
logging.debug("Resizing preview")
if window:
self.picamera.preview.window = window
if fullscreen:
self.picamera.preview.fullscreen = fullscreen
self.preview_active = True
except picamerax.exc.PiCameraMMALError as e:
logging.error(
"Suppressed a MMALError in start_preview. Exception: %s", (e)
)
except picamerax.exc.PiCameraValueError as e:
logging.error(
"Suppressed a ValueError exception in start_preview. Exception: %s",
(e),
)
[docs] def stop_preview(self):
"""Stop the on board GPU camera preview."""
with self.lock(timeout=1):
if self.picamera.preview:
self.picamera.stop_preview()
self.preview_active = False
[docs] def start_recording(
self, output: Union[str, BinaryIO], fmt: str = "h264", quality: int = 15
):
"""Start recording.
Start a new video recording, writing to a output object.
Args:
output: String or file-like object to write capture data to
fmt (str): Format of the capture.
quality (int): Video recording quality.
Returns:
output_object (str/BytesIO): Target object.
"""
with self.lock(timeout=5):
# Start recording method only if a current recording is not running
if not self.record_active:
# Start the camera video recording on port 2
logging.info("Recording to %s", (output))
self.picamera.start_recording(
output,
format=fmt,
splitter_port=2,
resize=self.stream_resolution,
quality=quality,
)
# Update state
self.record_active = True
return output
else:
logging.warning(
"Cannot start a new recording\
until the current recording has stopped."
)
return None
[docs] def stop_recording(self):
"""Stop the last started video recording on splitter port 2."""
with self.lock(timeout=5):
# Stop the camera video recording on port 2
logging.info("Stopping recording")
self.picamera.stop_recording(splitter_port=2)
logging.info("Recording stopped")
# Update state
self.record_active = False
[docs] def start_stream(self) -> None:
"""
Sets the camera resolution to the video/stream resolution, and starts recording if the stream should be active.
"""
with self.lock(timeout=None):
# Reduce the resolution for video streaming
try:
self.picamera._check_recording_stopped() # pylint: disable=W0212
except picamerax.exc.PiCameraRuntimeError:
logging.info(
"Error while changing resolution: Recording already running."
)
else:
self.picamera.resolution = self.stream_resolution
# Sprinkled a sleep to prevent camera getting confused by rapid commands
time.sleep(0.2)
# If the stream should be active
try:
# Start recording on stream port
self.picamera.start_recording(
self.stream,
format="mjpeg",
quality=self.mjpeg_quality,
bitrate=self.mjpeg_bitrate, # RWB: disable bitrate control
# (bitrate control makes JPEG size less good as a focus
# metric)
splitter_port=1,
)
except picamerax.exc.PiCameraAlreadyRecording:
logging.info("Error while starting preview: Recording already running.")
else:
self.stream_active = True
logging.debug(
"Started MJPEG stream at %s on port %s", self.stream_resolution, 1
)
[docs] def stop_stream(self) -> None:
"""
Sets the camera resolution to the still-image resolution, and stops recording if the stream is active.
Args:
splitter_port (int): Splitter port to stop recording on
"""
with self.lock:
# Stop the camera video recording on port 1
try:
self.picamera.stop_recording(splitter_port=1)
except picamerax.exc.PiCameraNotRecording:
logging.info("Not recording on splitter_port %s", (1))
else:
self.stream_active = False
logging.info(
"Stopped MJPEG stream on port %s. Switching to %s.",
1,
self.image_resolution,
)
# Increase the resolution for taking an image
time.sleep(
0.2
) # Sprinkled a sleep to prevent camera getting confused by rapid commands
self.picamera.resolution = self.image_resolution
[docs] def capture(
self,
output: Union[str, BinaryIO],
fmt: str = "jpeg",
use_video_port: bool = False,
resize: Optional[Tuple[int, int]] = None,
bayer: bool = True,
thumbnail: Optional[Tuple[int, int, int]] = None,
):
"""
Capture a still image to a StreamObject.
Defaults to JPEG format.
Target object can be overridden for development purposes.
Args:
output: String or file-like object to write capture data to
fmt: Format of the capture.
use_video_port: Capture from the video port used for streaming. Lower resolution, faster.
resize: Resize the captured image.
bayer: Store raw bayer data in capture
thumbnail: Dimensions and quality (x, y, quality) of a thumbnail to generate, if supported
Returns:
output_object (str/BytesIO): Target object.
"""
with self.lock:
logging.info("Capturing to %s", (output))
# Set resolution and stop stream recording if necessary
if not use_video_port:
self.stop_stream()
self.picamera.capture(
output,
format=fmt,
quality=self.jpeg_quality,
resize=resize,
bayer=(not use_video_port) and bayer,
use_video_port=use_video_port,
thumbnail=thumbnail,
)
# Set resolution and start stream recording if necessary
if not use_video_port:
self.start_stream()
return output
[docs] def array(self, use_video_port: bool = True) -> np.ndarray:
"""Capture an uncompressed still RGB image to a Numpy array.
Args:
use_video_port (bool): Capture from the video port used for streaming. Lower resolution, faster.
resize ((int, int)): Resize the captured image.
Returns:
output_array (np.ndarray): Output array of capture
"""
with self.lock:
logging.debug("Creating PiRGBArray")
with picamerax.array.PiRGBArray(self.picamera) as output:
logging.info("Capturing to %s", (output))
self.picamera.capture(
output, format="rgb", use_video_port=use_video_port
)
return output.array