# Copyright (C) 2022-2024, Pyronear.
# This program is licensed under the Apache License 2.0.
# See LICENSE or go to <https://opensource.org/licenses/Apache-2.0> for full license details.
import logging
import time
from io import BytesIO
from typing import List, Optional
import requests
import urllib3
from PIL import Image
__all__ = ["ReolinkCamera"]
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Configure logging
logging.basicConfig(level=logging.DEBUG)
[docs]class ReolinkCamera:
"""
A controller class for interacting with Reolink cameras.
Attributes:
ip_address (str): IP address of the Reolink camera.
username (str): Username for accessing the camera.
password (str): Password for accessing the camera.
cam_type (str): Type of the camera, e.g., 'static' or 'ptz' (pan-tilt-zoom).
cam_poses (Optional[List[int]]): List of preset positions for PTZ cameras.
protocol (str): Protocol used for communication, defaults to 'https'.
Methods:
capture(pos_id): Captures an image from the camera. Moves to position `pos_id` if provided.
move_camera(operation, speed, idx): Moves the camera based on the operation type and speed.
move_in_seconds(s, operation, speed): Moves the camera for a specific duration and then stops.
get_ptz_preset(): Retrieves preset positions for a PTZ camera.
set_ptz_preset(idx): Sets a PTZ preset position using an ID.
"""
def __init__(
self,
ip_address: str,
username: str,
password: str,
cam_type: str,
cam_poses: Optional[List[int]] = None,
protocol: str = "https",
):
self.ip_address = ip_address
self.username = username
self.password = password
self.cam_type = cam_type
self.cam_poses = cam_poses if cam_poses is not None else []
self.protocol = protocol
if len(self.cam_poses):
self.move_camera("ToPos", idx=int(self.cam_poses[0]), speed=50)
def _build_url(self, command: str) -> str:
"""Constructs a URL for API commands to the camera."""
return (
f"{self.protocol}://{self.ip_address}/cgi-bin/api.cgi?"
f"cmd={command}&user={self.username}&password={self.password}&channel=0"
)
def _handle_response(self, response, success_message: str):
"""Handles HTTP responses, logging success or errors based on response data."""
if response.status_code == 200:
response_data = response.json()
if response_data[0]["code"] == 0:
logging.debug(success_message)
else:
logging.error(f"Error: {response_data}")
return response_data
else:
logging.error(f"Failed operation: {response.status_code}, {response.text}")
return None
[docs] def capture(self, pos_id: Optional[int] = None, timeout: int = 2) -> Optional[Image.Image]:
"""
Captures an image from the camera. Optionally moves the camera to a preset position before capturing.
Args:
pos_id (Optional[int]): The preset position ID to move to before capturing.
timeout (int): Timeout for the HTTP request.
Returns:
Image.Image: An image captured from the camera, or None if there was an error.
"""
if pos_id is not None:
self.move_camera("ToPos", idx=int(pos_id), speed=50)
time.sleep(1)
url = self._build_url("Snap")
logging.debug("Start capture")
try:
response = requests.get(url, verify=False, timeout=timeout) # nosec: B501
if response.status_code == 200:
image_data = BytesIO(response.content)
image = Image.open(image_data).convert("RGB")
return image
else:
logging.error(f"Failed to capture image: {response.status_code}, {response.text}")
except requests.RequestException as e:
logging.error(f"Request failed: {e}")
return None
[docs] def move_camera(self, operation: str, speed: int = 20, idx: int = 0):
"""
Sends a command to move the camera.
Args:
operation (str): The operation to perform, e.g., 'Left', 'Right'.
speed (int): The speed of the operation.
idx (int): The ID of the position to move to (relevant for PTZ cameras).
"""
url = self._build_url("PtzCtrl")
data = [{"cmd": "PtzCtrl", "action": 0, "param": {"channel": 0, "op": operation, "id": idx, "speed": speed}}]
response = requests.post(url, json=data, verify=False) # nosec: B501
self._handle_response(response, "PTZ operation successful.")
[docs] def move_in_seconds(self, s: int, operation: str = "Right", speed: int = 20):
"""
Moves the camera in a specified direction for a specified number of seconds.
Args:
s (int): Duration in seconds to move the camera.
operation (str): Direction to move the camera.
speed (int): Speed of the movement.
"""
self.move_camera(operation, speed)
time.sleep(s)
self.move_camera("Stop")
time.sleep(1)
[docs] def get_ptz_preset(self):
"""
Retrieves the preset positions available for PTZ cameras.
Returns:
List[Dict]: A list of preset positions and their details if successful, else None.
"""
url = self._build_url("GetPtzPreset")
data = [{"cmd": "GetPtzPreset", "action": 1, "param": {"channel": 0}}]
response = requests.post(url, json=data, verify=False) # nosec: B501
response_data = self._handle_response(response, "Presets retrieved successfully.")
if response_data and response_data[0]["code"] == 0:
return response_data[0].get("value", {}).get("PtzPreset", [])
else:
return None
[docs] def set_ptz_preset(self, idx: Optional[int] = None):
"""
Sets a PTZ preset position. If no ID is provided, finds the next available slot.
Args:
idx (Optional[int]): The preset ID to set. If None, finds an available ID automatically.
Raises:
ValueError: If no slots are available for new presets.
"""
if idx is None:
presets_ptz = self.get_ptz_preset()
for cfg in presets_ptz:
if cfg["enable"] == 0:
idx = cfg["id"]
break
if idx is None:
raise ValueError("No available slots for new presets.")
url = self._build_url("SetPtzPreset")
name = f"pos{idx}"
data = [
{
"cmd": "SetPtzPreset",
"action": 0, # The action code for setting data
"param": {"PtzPreset": {"channel": 0, "enable": 1, "id": idx, "name": name}},
}
]
response = requests.post(url, json=data, verify=False) # nosec: B501
# Utilizing the shared response handling method
self._handle_response(response, f"Preset {name} set successfully.")
def delete_ptz_preset(self, idx: int):
"""
Deletes a PTZ preset position by setting its enable value to 0.
Args:
idx (int): The preset ID to delete.
"""
url = self._build_url("SetPtzPreset")
data = [
{
"cmd": "SetPtzPreset",
"action": 0, # The action code for setting data
"param": {"PtzPreset": {"channel": 0, "enable": 0, "id": idx}},
}
]
response = requests.post(url, json=data, verify=False) # nosec: B501
# Utilizing the shared response handling method
self._handle_response(response, f"Preset {idx} deleted successfully.")
def reboot_camera(self):
url = self._build_url("Reboot")
data = [{"cmd": "Reboot"}]
response = requests.post(url, json=data, verify=False)
return self._handle_response(response, "Camera reboot initiated successfully.")
def get_auto_focus(self):
url = self._build_url("GetAutoFocus")
data = [{"cmd": "GetAutoFocus", "action": 1, "param": {"channel": 0}}]
response = requests.post(url, json=data, verify=False)
return self._handle_response(response, "Fetched AutoFocus settings successfully.")
def set_auto_focus(self, disable: bool):
url = self._build_url("SetAutoFocus")
data = [{"cmd": "SetAutoFocus", "action": 0, "param": {"AutoFocus": {"channel": 0, "disable": int(disable)}}}]
response = requests.post(url, json=data, verify=False)
return self._handle_response(response, "Set AutoFocus settings successfully.")
def start_zoom_focus(self, position: int):
url = self._build_url("StartZoomFocus")
data = [
{
"cmd": "StartZoomFocus",
"action": 0,
"param": {"ZoomFocus": {"channel": 0, "pos": position, "op": "ZoomPos"}},
}
]
response = requests.post(url, json=data, verify=False)
return self._handle_response(response, "Started ZoomFocus successfully.")