Source code for pyroengine.core

# Copyright (C) 2022-2024, Pyronear.

# This program is licensed under the Apache License 2.0.
# See LICENSE or go to <https://opensource.org/licenses/Apache-2.0> for full license details.

import asyncio
import logging
import time
from datetime import datetime
from typing import Any, List

import numpy as np
import urllib3

from .engine import Engine
from .sensors import ReolinkCamera

__all__ = ["SystemController", "is_day_time"]

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

# Configure logging
logging.basicConfig(format="%(asctime)s | %(levelname)s: %(message)s", level=logging.INFO, force=True)


def is_day_time(cache, frame, strategy, delta=0):
    """
    Determines if it is daytime using specified strategies.

    Strategies:
    1. Time-based: Compares the current time with sunrise and sunset times.
    2. IR-based: Analyzes the color of the image; IR cameras produce black and white images at night.

    Args:
        cache (Path): Cache folder where `sunset_sunrise.txt` is located.
        frame (PIL.Image): Frame to analyze with the IR strategy.
        strategy (str): Strategy to define daytime ("time", "ir", or "both").
        delta (int): Time delta in seconds before and after sunrise/sunset.

    Returns:
        bool: True if it is daytime, False otherwise.
    """
    is_day = True
    if strategy in ["both", "time"]:
        with open(cache.joinpath("sunset_sunrise.txt")) as f:
            lines = f.readlines()
        sunrise = datetime.strptime(lines[0].strip(), "%H:%M")
        sunset = datetime.strptime(lines[1].strip(), "%H:%M")
        now = datetime.strptime(datetime.now().isoformat().split("T")[1][:5], "%H:%M")
        if (now - sunrise).total_seconds() < -delta or (sunset - now).total_seconds() < -delta:
            is_day = False

    if strategy in ["both", "ir"]:
        frame = np.array(frame)
        if np.max(frame[:, :, 0] - frame[:, :, 1]) == 0:
            is_day = False

    return is_day


async def capture_camera_image(camera: ReolinkCamera, image_queue: asyncio.Queue) -> bool:
    """
    Captures an image from the camera and puts it into a queue. Returns whether it is daytime for this camera.

    Args:
        camera (ReolinkCamera): The camera instance.
        image_queue (asyncio.Queue): The queue to put the captured image.

    Returns:
        bool: True if it is daytime according to this camera, False otherwise.
    """
    cam_id = camera.ip_address
    try:
        if camera.cam_type == "ptz":
            for idx, pose_id in enumerate(camera.cam_poses):
                cam_id = f"{camera.ip_address}_{pose_id}"
                frame = camera.capture()
                # Move camera to the next pose to avoid waiting
                next_pos_id = camera.cam_poses[(idx + 1) % len(camera.cam_poses)]
                camera.move_camera("ToPos", idx=int(next_pos_id), speed=50)
                if frame is not None:
                    await image_queue.put((cam_id, frame))
                    await asyncio.sleep(0)  # Yield control
                    if not is_day_time(None, frame, "ir"):
                        return False
        else:
            frame = camera.capture()
            if frame is not None:
                await image_queue.put((cam_id, frame))
                await asyncio.sleep(0)  # Yield control
                if not is_day_time(None, frame, "ir"):
                    return False
    except Exception as e:
        logging.exception(f"Error during image capture from camera {cam_id}: {e}")
    return True


[docs]class SystemController: """ Controls the system for capturing and analyzing camera streams. Attributes: engine (Engine): The image analyzer engine. cameras (List[ReolinkCamera]): List of cameras to capture streams from. """ def __init__(self, engine: Engine, cameras: List[ReolinkCamera]) -> None: """ Initializes the SystemController. Args: engine (Engine): The image analyzer engine. cameras (List[ReolinkCamera]): List of cameras to capture streams from. """ self.engine = engine self.cameras = cameras self.is_day = True async def capture_images(self, image_queue: asyncio.Queue) -> bool: """ Captures images from all cameras using asyncio. Args: image_queue (asyncio.Queue): The queue to put the captured images. Returns: bool: True if it is daytime according to all cameras, False otherwise. """ tasks = [capture_camera_image(camera, image_queue) for camera in self.cameras] day_times = await asyncio.gather(*tasks) return all(day_times) async def analyze_stream(self, image_queue: asyncio.Queue) -> None: """ Analyzes the image stream from the queue. Args: image_queue (asyncio.Queue): The queue with images to analyze. """ while True: item = await image_queue.get() if item is None: break cam_id, frame = item try: self.engine.predict(frame, cam_id) except Exception as e: logging.error(f"Error running prediction: {e}") finally: image_queue.task_done() # Mark the task as done async def night_mode(self) -> bool: """ Checks if it is nighttime for any camera. Returns: bool: True if it is daytime for all cameras, False otherwise. """ for camera in self.cameras: cam_id = camera.ip_address try: if camera.cam_type == "ptz": for idx, pose_id in enumerate(camera.cam_poses): cam_id = f"{camera.ip_address}_{pose_id}" frame = camera.capture() # Move camera to the next pose to avoid waiting next_pos_id = camera.cam_poses[(idx + 1) % len(camera.cam_poses)] camera.move_camera("ToPos", idx=int(next_pos_id), speed=50) if frame is not None: if not is_day_time(None, frame, "ir"): return False else: frame = camera.capture() if frame is not None: if not is_day_time(None, frame, "ir"): return False except Exception as e: logging.exception(f"Error during image capture from camera {cam_id}: {e}") return True async def run(self, period: int = 30, send_alerts: bool = True) -> bool: """ Captures and analyzes all camera streams, then processes alerts. Args: period (int): The time period between captures in seconds. send_alerts (bool): Boolean to activate / deactivate alert sending. Returns: bool: True if it is daytime according to all cameras, False otherwise. """ try: image_queue: asyncio.Queue[Any] = asyncio.Queue() # Start the image processor task processor_task = asyncio.create_task(self.analyze_stream(image_queue)) # Capture images concurrently self.is_day = await self.capture_images(image_queue) # Wait for the image processor to finish processing await image_queue.join() # Ensure all tasks are marked as done # Signal the image processor to stop processing await image_queue.put(None) await processor_task # Ensure the processor task completes # Process alerts if send_alerts: try: self.engine._process_alerts() except Exception as e: logging.error(f"Error processing alerts: {e}") return self.is_day except Exception as e: logging.warning(f"Analyze stream error: {e}") return True async def main_loop(self, period: int, send_alerts: bool = True) -> None: """ Main loop to capture and process images at regular intervals. Args: period (int): The time period between captures in seconds. send_alerts (bool): Boolean to activate / deactivate alert sending. """ while True: start_ts = time.time() await self.run(period, send_alerts) if not self.is_day: while not await self.night_mode(): logging.info("Nighttime detected by at least one camera, sleeping for 1 hour.") await asyncio.sleep(3600) # Sleep for 1 hour else: # Sleep only once all images are processed loop_time = time.time() - start_ts sleep_time = max(period - (loop_time), 0) logging.info(f"Loop run under {loop_time:.2f} seconds, sleeping for {sleep_time:.2f}") await asyncio.sleep(sleep_time) def __repr__(self) -> str: """ Returns a string representation of the SystemController. Returns: str: A string representation of the SystemController. """ repr_str = f"{self.__class__.__name__}(" for cam in self.cameras: repr_str += f"\n\t{repr(cam)}," return repr_str + "\n)"