# Copyright (C) 2022-2025, Pyronear.# This program is licensed under the Apache License 2.0.# See LICENSE or go to <https://opensource.org/licenses/Apache-2.0> for full license details.importasyncioimportloggingimporttimefromdatetimeimportdatetimefromtypingimportAny,Listimportnumpyasnpimporturllib3from.engineimportEnginefrom.sensorsimportReolinkCamera__all__=["SystemController","is_day_time"]urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)# Configure logginglogging.basicConfig(format="%(asctime)s | %(levelname)s: %(message)s",level=logging.INFO,force=True)defis_day_time(cache,frame,strategy,delta=0):""" Determines if it is daytime using specified strategies. Strategies: 1. Time-based: Compares the current time with sunrise and sunset times. 2. IR-based: Analyzes the color of the image; IR cameras produce black and white images at night. Args: cache (Path): Cache folder where `sunset_sunrise.txt` is located. frame (PIL.Image): Frame to analyze with the IR strategy. strategy (str): Strategy to define daytime ("time", "ir", or "both"). delta (int): Time delta in seconds before and after sunrise/sunset. Returns: bool: True if it is daytime, False otherwise. """is_day=Trueifstrategyin["both","time"]:withopen(cache.joinpath("sunset_sunrise.txt"))asf:lines=f.readlines()sunrise=datetime.strptime(lines[0].strip(),"%H:%M")sunset=datetime.strptime(lines[1].strip(),"%H:%M")now=datetime.strptime(datetime.now().isoformat().split("T")[1][:5],"%H:%M")if(now-sunrise).total_seconds()<-deltaor(sunset-now).total_seconds()<-delta:is_day=Falseifstrategyin["both","ir"]:frame=np.array(frame)ifnp.max(frame[:,:,0]-frame[:,:,1])==0:is_day=Falsereturnis_dayasyncdefcapture_camera_image(camera:ReolinkCamera,image_queue:asyncio.Queue)->bool:""" Captures an image from the camera and puts it into a queue. Returns whether it is daytime for this camera. Args: camera (ReolinkCamera): The camera instance. image_queue (asyncio.Queue): The queue to put the captured image. Returns: bool: True if it is daytime according to this camera, False otherwise. """cam_id=camera.ip_addresstry:ifcamera.cam_type=="ptz":foridx,pose_idinenumerate(camera.cam_poses):cam_id=f"{camera.ip_address}_{pose_id}"frame=camera.capture()# Move camera to the next pose to avoid waitingnext_pos_id=camera.cam_poses[(idx+1)%len(camera.cam_poses)]camera.move_camera("ToPos",idx=int(next_pos_id),speed=50)ifframeisnotNone:awaitimage_queue.put((cam_id,frame))awaitasyncio.sleep(0)# Yield controlifnotis_day_time(None,frame,"ir"):returnFalseelse:frame=camera.capture()ifframeisnotNone:awaitimage_queue.put((cam_id,frame))awaitasyncio.sleep(0)# Yield controlifnotis_day_time(None,frame,"ir"):returnFalseexceptExceptionase:logging.exception(f"Error during image capture from camera {cam_id}: {e}")returnTrue
[docs]classSystemController:""" Controls the system for capturing and analyzing camera streams. Attributes: engine (Engine): The image analyzer engine. cameras (List[ReolinkCamera]): List of cameras to capture streams from. """def__init__(self,engine:Engine,cameras:List[ReolinkCamera])->None:""" Initializes the SystemController. Args: engine (Engine): The image analyzer engine. cameras (List[ReolinkCamera]): List of cameras to capture streams from. """self.engine=engineself.cameras=camerasself.is_day=Trueasyncdefcapture_images(self,image_queue:asyncio.Queue)->bool:""" Captures images from all cameras using asyncio. Args: image_queue (asyncio.Queue): The queue to put the captured images. Returns: bool: True if it is daytime according to all cameras, False otherwise. """tasks=[capture_camera_image(camera,image_queue)forcamerainself.cameras]day_times=awaitasyncio.gather(*tasks)returnall(day_times)asyncdefanalyze_stream(self,image_queue:asyncio.Queue)->None:""" Analyzes the image stream from the queue. Args: image_queue (asyncio.Queue): The queue with images to analyze. """whileTrue:item=awaitimage_queue.get()ifitemisNone:breakcam_id,frame=itemtry:self.engine.predict(frame,cam_id)exceptExceptionase:logging.error(f"Error running prediction: {e}")finally:image_queue.task_done()# Mark the task as doneasyncdefnight_mode(self)->bool:""" Checks if it is nighttime for any camera. Returns: bool: True if it is daytime for all cameras, False otherwise. """forcamerainself.cameras:cam_id=camera.ip_addresstry:ifcamera.cam_type=="ptz":foridx,pose_idinenumerate(camera.cam_poses):cam_id=f"{camera.ip_address}_{pose_id}"frame=camera.capture()# Move camera to the next pose to avoid waitingnext_pos_id=camera.cam_poses[(idx+1)%len(camera.cam_poses)]camera.move_camera("ToPos",idx=int(next_pos_id),speed=50)ifframeisnotNone:ifnotis_day_time(None,frame,"ir"):returnFalseelse:frame=camera.capture()ifframeisnotNone:ifnotis_day_time(None,frame,"ir"):returnFalseexceptExceptionase:logging.exception(f"Error during image capture from camera {cam_id}: {e}")returnTrueasyncdefrun(self,period:int=30,send_alerts:bool=True)->bool:""" Captures and analyzes all camera streams, then processes alerts. Args: period (int): The time period between captures in seconds. send_alerts (bool): Boolean to activate / deactivate alert sending. Returns: bool: True if it is daytime according to all cameras, False otherwise. """try:image_queue:asyncio.Queue[Any]=asyncio.Queue()# Start the image processor taskprocessor_task=asyncio.create_task(self.analyze_stream(image_queue))# Capture images concurrentlyself.is_day=awaitself.capture_images(image_queue)# Wait for the image processor to finish processingawaitimage_queue.join()# Ensure all tasks are marked as done# Signal the image processor to stop processingawaitimage_queue.put(None)awaitprocessor_task# Ensure the processor task completes# Process alertsifsend_alerts:try:self.engine._process_alerts()exceptExceptionase:logging.error(f"Error processing alerts: {e}")returnself.is_dayexceptExceptionase:logging.warning(f"Analyze stream error: {e}")returnTrueasyncdefmain_loop(self,period:int,send_alerts:bool=True)->None:""" Main loop to capture and process images at regular intervals. Args: period (int): The time period between captures in seconds. send_alerts (bool): Boolean to activate / deactivate alert sending. """whileTrue:start_ts=time.time()awaitself.run(period,send_alerts)ifnotself.is_day:whilenotawaitself.night_mode():logging.info("Nighttime detected by at least one camera, sleeping for 1 hour.")awaitasyncio.sleep(3600)# Sleep for 1 hourelse:# Sleep only once all images are processedloop_time=time.time()-start_tssleep_time=max(period-(loop_time),0)logging.info(f"Loop run under {loop_time:.2f} seconds, sleeping for {sleep_time:.2f}")awaitasyncio.sleep(sleep_time)def__repr__(self)->str:""" Returns a string representation of the SystemController. Returns: str: A string representation of the SystemController. """repr_str=f"{self.__class__.__name__}("forcaminself.cameras:repr_str+=f"\n\t{repr(cam)},"returnrepr_str+"\n)"