Source code for openflexure_microscope.camera.base

# -*- coding: utf-8 -*-
import io
import logging
import threading
import time
from abc import ABCMeta, abstractmethod
from collections import namedtuple

from labthings import ClientEvent, StrictLock

# Class to store a frames metadata
TrackerFrame = namedtuple("TrackerFrame", ["size", "time"])


[docs]class FrameStream(io.BytesIO): """ A file-like object used to analyse MJPEG frames. Instead of analysing a load of real MJPEG frames after they've been stored in a BytesIO stream, we tell the camera to write frames to this class instead. We then do analysis as the frames are written, and discard the heavier image data. """ def __init__(self, *args, **kwargs): # Array of TrackerFrame objects io.BytesIO.__init__(self, *args, **kwargs) # Array of TrackerFramer objects self.frames = [] # Last acquired TrackerFramer object self.last = None # Are we currently tracking frame sizes? self.tracking = False # Event to track if a new frame is available since the last getvalue() call self.new_frame = threading.Event() def start_tracking(self): if not self.tracking: logging.debug("Started tracking frame data") self.tracking = True def stop_tracking(self): if self.tracking: logging.debug("Stopped tracking frame data") self.tracking = False def reset_tracking(self): self.frames = []
[docs] def write(self, s): """ Write a new frame to the FrameStream. Does a few things: 1. If tracking frame size, store the size in self.frames 2. Rewind and truncate the stream (delete previous frame) 3. Store the new frame image 4. Set the new_frame event """ if self.tracking: frame = TrackerFrame(size=len(s), time=time.time()) self.frames.append(frame) self.last = frame # Reset the stream for the next frame super().seek(0) super().truncate() # Write the new frame super().write(s) # Set the new frame event self.new_frame.set()
[docs] def getvalue(self) -> bytes: """ Clear tne new_frame event and return frame data """ self.new_frame.clear() return super().getvalue()
[docs] def getframe(self) -> bytes: """Wait for a new frame to be available, then return it""" self.new_frame.wait() return self.getvalue()
[docs]class BaseCamera(metaclass=ABCMeta): """ Base implementation of StreamingCamera. """ def __init__(self): self.thread = None self.camera = None self.lock = StrictLock(name="Camera", timeout=None) self.stream = FrameStream() # Iterator function that yields new frames as they are acquired self.frames_iterator = None self.frame = None self.last_access = 0 self.event = ClientEvent() self.stop = False # Used to indicate that the stream loop should break self.stream_active = False self.record_active = False self.preview_active = False @property @abstractmethod def configuration(self): """The current camera configuration.""" @property @abstractmethod def state(self): """The current read-only camera state.""" @property def settings(self): return self.read_settings()
[docs] @abstractmethod def update_settings(self, config: dict): """Update settings from a config dictionary""" with self.lock(timeout=None): # Apply valid config params to camera object for key, value in config.items(): # For each provided setting if hasattr(self, key): # If the instance has a matching property setattr(self, key, value) # Set to the target value
[docs] @abstractmethod def read_settings(self) -> dict: """Return the current settings as a dictionary""" return {}
def __enter__(self): """Create camera on context enter.""" return self def __exit__(self, exc_type, exc_value, traceback): """Close camera stream on context exit.""" self.close()
[docs] def close(self): """Close the BaseCamera and all attached StreamObjects.""" logging.info("Closing %s", (self)) # Stop worker thread self.stop_worker() logging.info("Closed %s", (self))
# START AND STOP WORKER THREAD
[docs] def start_worker(self, timeout: int = 5) -> bool: """Start the background camera thread if it isn't running yet.""" timeout_time = time.time() + timeout self.last_access = time.time() self.stop = False if not self.stream_active: # Start background frame thread self.thread = threading.Thread(target=self._thread) self.thread.daemon = True self.thread.start() # wait until frames are available logging.info("Waiting for frames") while self.get_frame() is None: if time.time() > timeout_time: raise TimeoutError("Timeout waiting for frames.") else: time.sleep(0.1) return True
[docs] def stop_worker(self, timeout: int = 5) -> bool: """Flag worker thread for stop. Waits for thread close or timeout.""" logging.debug("Stopping worker thread") timeout_time = time.time() + timeout if self.stream_active: self.stop = True self.thread.join() # Wait for stream thread to exit logging.debug("Waiting for stream thread to exit.") while self.stream_active: if time.time() > timeout_time: logging.debug("Timeout waiting for worker thread close.") raise TimeoutError("Timeout waiting for worker thread close.") else: time.sleep(0.1) return True
# HANDLE STREAM FRAMES
[docs] def get_frame(self): """Return the current camera frame.""" self.last_access = time.time() # wait for a signal from the camera thread self.event.wait() self.event.clear() return self.frame
[docs] @abstractmethod def frames(self): """Create generator that returns frames from the camera."""
# WORKER THREAD def _thread(self): """Camera background thread.""" # Set the camera object's frame iterator self.frames_iterator = self.frames() logging.debug("Entering worker thread.") self.stream_active = True for frame in self.frames_iterator: # Store most recent frame self.frame = frame # Signal to clients that a new frame is available # We use this event because each client could be # reading frames slower than we're acquiring them. self.event.set() # send signal to clients try: if self.stop is True: logging.debug("Worker thread flagged for stop.") self.frames_iterator.close() break except AttributeError: pass logging.debug("BaseCamera worker thread exiting...") # Set stream_activate state self.stream_active = False