# Copyright (C) 2022-2026, Pyronear.
# This program is licensed under the Apache License 2.0.
# See LICENSE or go to <https://opensource.org/licenses/Apache-2.0> for full license details.
import logging
import pathlib
import time
from io import BytesIO
from typing import List, Optional
import cv2
import numpy as np
import requests
import urllib3
from PIL import Image
__all__ = ["ReolinkCamera"]
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Configure logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
[docs]
class ReolinkCamera:
"""
A controller class for interacting with Reolink cameras.
Attributes:
ip_address (str): IP address of the Reolink camera.
username (str): Username for accessing the camera.
password (str): Password for accessing the camera.
cam_type (str): Type of the camera, e.g., 'static' or 'ptz' (pan-tilt-zoom), defaults to 'ptz'.
cam_poses (Optional[List[int]]): List of preset positions for PTZ cameras.
protocol (str): Protocol used for communication, defaults to 'https'.
Methods:
capture(patrol_id): Captures an image from the camera. Moves to position `patrol_id` if provided.
move_camera(operation, speed, idx): Moves the camera based on the operation type and speed.
move_in_seconds(s, operation, speed): Moves the camera for a specific duration and then stops.
get_ptz_preset(): Retrieves preset positions for a PTZ camera.
set_ptz_preset(idx): Sets a PTZ preset position using an ID.
"""
def __init__(
self,
ip_address: str,
username: str,
password: str,
cam_type: str = "ptz",
cam_poses: Optional[List[int]] = None,
cam_azimuths: Optional[List[int]] = None,
protocol: str = "https",
focus_position: Optional[int] = None,
) -> None:
self.ip_address = ip_address
self.username = username
self.password = password
self.cam_type = cam_type
self.cam_poses = cam_poses if cam_poses is not None else []
self.cam_azimuths = cam_azimuths if cam_azimuths is not None else []
self.protocol = protocol
self.focus_position = focus_position
# Initialisation de position de caméra (si définie)
if self.cam_poses:
self.move_camera("ToPos", idx=int(self.cam_poses[0]), speed=50)
# Fix focus position
if focus_position is not None:
self.set_auto_focus(disable=True)
self.set_manual_focus(position=focus_position)
def _build_url(self, command: str) -> str:
"""Constructs a URL for API commands to the camera."""
return (
f"{self.protocol}://{self.ip_address}/cgi-bin/api.cgi?"
f"cmd={command}&user={self.username}&password={self.password}&channel=0"
)
def _handle_response(self, response, success_message: str):
"""Handles HTTP responses, logging success or errors based on response data."""
# logger.info(f"{response.status_code}")
if response.status_code == 200:
response_data = response.json()
if response_data[0]["code"] == 0:
logger.debug(success_message)
else:
logger.error(f"Error: {response_data}")
return response_data
logger.error(f"Failed operation: {response.status_code}, {response.text}")
return None
[docs]
def capture(self, patrol_id: Optional[int] = None, timeout: int = 2) -> Optional[Image.Image]:
"""
Captures an image from the camera. Optionally moves the camera to a preset position before capturing.
Args:
patrol_id (Optional[int]): The preset position ID to move to before capturing.
timeout (int): Timeout for the HTTP request.
Returns:
Image.Image: An image captured from the camera, or None if there was an error.
"""
if patrol_id is not None:
self.move_camera("ToPos", idx=int(patrol_id), speed=50)
time.sleep(1)
url = self._build_url("Snap")
logger.debug("Start capture")
try:
response = requests.get(url, verify=False, timeout=timeout) # nosec: B501
if response.status_code == 200:
image_data = BytesIO(response.content)
return Image.open(image_data).convert("RGB")
logger.error(f"Failed to capture image: {response.status_code}, {response.text}")
except requests.RequestException as e:
logger.error(f"Request failed: {e}")
return None
[docs]
def move_camera(self, operation: str, speed: int = 20, idx: int = 0) -> None:
"""
Sends a command to move the camera.
Args:
operation (str): The operation to perform, e.g., 'Left', 'Right'.
speed (int): The speed of the operation.
idx (int): The ID of the position to move to (relevant for PTZ cameras).
"""
url = self._build_url("PtzCtrl")
data = [
{
"cmd": "PtzCtrl",
"action": 0,
"param": {"channel": 0, "op": operation, "id": idx, "speed": speed},
}
]
response = requests.post(url, json=data, verify=False) # nosec: B501
self._handle_response(response, "PTZ operation successful.")
[docs]
def move_in_seconds(
self,
s: float,
operation: str = "Right",
speed: int = 20,
save_path: str = "im.jpg",
) -> None:
"""
Moves the camera in a specified direction for a specified number of seconds.
Args:
s (float): Duration in seconds to move the camera.
operation (str): Direction to move the camera.
speed (int): Speed of the movement.
save_path (str): After movement capture and save image at save_path
"""
self.move_camera(operation, speed)
time.sleep(s)
self.move_camera("Stop")
time.sleep(1)
im = self.capture()
if im is not None and save_path is not None:
im.save(save_path)
[docs]
def get_ptz_preset(self):
"""
Retrieves the preset positions available for PTZ cameras.
Returns:
List[Dict]: A list of preset positions and their details if successful, else None.
"""
url = self._build_url("GetPtzPreset")
data = [{"cmd": "GetPtzPreset", "action": 1, "param": {"channel": 0}}]
response = requests.post(url, json=data, verify=False) # nosec: B501
response_data = self._handle_response(response, "Presets retrieved successfully.")
if response_data and response_data[0]["code"] == 0:
return response_data[0].get("value", {}).get("PtzPreset", [])
return None
[docs]
def set_ptz_preset(self, idx: Optional[int] = None) -> None:
"""
Sets a PTZ preset position. If no ID is provided, finds the next available slot.
Args:
idx (Optional[int]): The preset ID to set. If None, finds an available ID automatically.
Raises:
ValueError: If no slots are available for new presets.
"""
if idx is None:
presets_ptz = self.get_ptz_preset()
for cfg in presets_ptz:
if cfg["enable"] == 0:
idx = cfg["id"]
break
if idx is None:
raise ValueError("No available slots for new presets.")
url = self._build_url("SetPtzPreset")
name = f"pos{idx}"
data = [
{
"cmd": "SetPtzPreset",
"action": 0, # The action code for setting data
"param": {"PtzPreset": {"channel": 0, "enable": 1, "id": idx, "name": name}},
}
]
response = requests.post(url, json=data, verify=False) # nosec: B501
# Utilizing the shared response handling method
self._handle_response(response, f"Preset {name} set successfully.")
def reboot_camera(self):
url = self._build_url("Reboot")
data = [{"cmd": "Reboot"}]
response = requests.post(url, json=data, verify=False)
return self._handle_response(response, "Camera reboot initiated successfully.")
def get_auto_focus(self):
url = self._build_url("GetAutoFocus")
data = [{"cmd": "GetAutoFocus", "action": 1, "param": {"channel": 0}}]
response = requests.post(url, json=data, verify=False)
return self._handle_response(response, "Fetched AutoFocus settings successfully.")
def set_auto_focus(self, disable: bool):
url = self._build_url("SetAutoFocus")
data = [
{
"cmd": "SetAutoFocus",
"action": 0,
"param": {"AutoFocus": {"channel": 0, "disable": int(disable)}},
}
]
response = requests.post(url, json=data, verify=False)
return self._handle_response(response, "Set AutoFocus settings successfully.")
def start_zoom_focus(self, position: int):
if self.cam_type != "static":
url = self._build_url("StartZoomFocus")
data = [
{
"cmd": "StartZoomFocus",
"action": 0,
"param": {"ZoomFocus": {"channel": 0, "pos": position, "op": "ZoomPos"}},
}
]
response = requests.post(url, json=data, verify=False)
return self._handle_response(response, "Started ZoomFocus successfully.")
return None
def set_manual_focus(self, position: int):
"""
Set manual focus to a specific position.
Args:
position (int): Focus position (e.g., between 0 and 1000).
"""
if self.cam_type != "static":
url = self._build_url("StartZoomFocus")
data = [
{
"cmd": "StartZoomFocus",
"action": 0,
"param": {"ZoomFocus": {"channel": 0, "pos": position, "op": "FocusPos"}},
}
]
response = requests.post(url, json=data, verify=False)
return self._handle_response(response, f"Manual focus set at position {position}")
return None
def get_focus_level(self):
"""Retrieve the current manual focus and zoom positions."""
url = self._build_url("GetZoomFocus")
data = [{"cmd": "GetZoomFocus", "action": 0, "param": {"channel": 0}}]
response = requests.post(url, json=data, verify=False)
result = self._handle_response(response, "Got zoom/focus values")
if result and result[0]["code"] == 0:
zoom_focus = result[0]["value"]["ZoomFocus"]
focus = zoom_focus.get("focus", {}).get("pos")
zoom = zoom_focus.get("zoom", {}).get("pos")
return {"focus": focus, "zoom": zoom}
return None
def _measure_sharpness(self, pil_image):
img = pil_image.convert("L")
arr = np.array(img)
laplacian = cv2.Laplacian(arr, cv2.CV_64F)
return laplacian.var()
def focus_finder(self, save_images: bool = False, retry_depth: int = 0) -> int:
"""
Perform greedy focus optimization to find the sharpest focus position,
starting from self.focus_position or 720 and sweeping ±50.
Includes recheck at ±15 to avoid local maxima and a max retry depth.
Args:
save_images (bool): If True, save each captured image to disk.
retry_depth (int): Internal counter to prevent infinite recursion.
Returns:
int: Best focus position found.
"""
max_retries = 10
abs_min = 600
abs_max = 900
def capture_and_score(pos):
pos = max(abs_min, min(abs_max, pos)) # Clamp to global bounds
self.set_manual_focus(pos)
start = time.time()
image = self.capture()
duration = time.time() - start
if image is None:
return 0
score = self._measure_sharpness(image)
logger.info(f"[{self.ip_address}] Focus {pos}: Sharpness = {score:.2f}, Time = {duration:.2f}s")
if save_images:
folder = f"focus_debug/{self.ip_address.replace('.', '_')}"
pathlib.Path(folder).mkdir(exist_ok=True, parents=True)
image.save(f"{folder}/focus_{pos}.jpg")
return score
if self.cam_type != "static":
current = self.focus_position if self.focus_position is not None else 720
min_focus = max(abs_min, current - 50)
max_focus = min(abs_max, current + 50)
sharp_current = capture_and_score(current)
sharp_prev = capture_and_score(current - 1)
sharp_next = capture_and_score(current + 1)
if sharp_prev > sharp_current and sharp_prev >= sharp_next:
direction = -1
elif sharp_next > sharp_current:
direction = 1
else:
logger.info(f"[{self.ip_address}] Best focus already at {current} with sharpness {sharp_current:.2f}")
self.focus_position = current
self.set_manual_focus(self.focus_position)
return current
best_pos = current + direction
best_score = max(sharp_prev, sharp_next)
while True:
next_pos = best_pos + direction
if next_pos < min_focus or next_pos > max_focus:
break
score = capture_and_score(next_pos)
if score > best_score:
best_pos = next_pos
best_score = score
else:
break
# Check ±15 around the greedy result
for offset in [-15, 15]:
probe_pos = best_pos + offset
if abs_min <= probe_pos <= abs_max:
probe_score = capture_and_score(probe_pos)
if probe_score > best_score:
logger.info(
f"[{self.ip_address}] Found better focus at offset {offset} → retrying from {probe_pos}"
)
if retry_depth < max_retries:
self.focus_position = probe_pos
return self.focus_finder(save_images=save_images, retry_depth=retry_depth + 1)
logger.info(f"[{self.ip_address}] Best focus position: {best_pos} with sharpness {best_score:.2f}")
self.focus_position = best_pos
self.set_manual_focus(self.focus_position)
return best_pos
return 720