Source code for pyro_risks.datasets.utils

# Copyright (C) 2021, Pyronear contributors.

# This program is licensed under the GNU Affero General Public License version 3.
# See LICENSE or go to <https://www.gnu.org/licenses/agpl-3.0.txt> for full license details.

import requests
import os
import gzip
import tarfile
import shutil
import warnings

from scipy import spatial
from typing import Tuple, Optional, List, Any

from io import BytesIO
from datetime import datetime
from urllib.parse import urlparse
from zipfile import ZipFile

import numpy as np
import pandas as pd


[docs]def get_intersection_range(ts1: pd.Series, ts2: pd.Series) -> pd.DatetimeIndex: """Computes the intersecting date range of two series. Args: ts1: time series ts2: time series """ # Time span selection time_range1 = max(ts1.min(), ts2.min()) time_range2 = min(ts1.max(), ts2.max()) if time_range1 > time_range2: raise ValueError("Extracts do not have intersecting date range") return pd.date_range(time_range1, time_range2)
[docs]def find_closest_weather_station( df_weather: pd.DataFrame, latitude: pd.DataFrame, longitude: pd.DataFrame ) -> int: """ The weather dataframe SHOULD contain a "STATION" column giving the id of each weather station in the dataset. Args: df_weather: pd.DataFrame Dataframe of weather conditions latitude: float Latitude of the point to which we want to find the closest weather station longitude: float Longitude of the point to which we want to find the closest weather station Returns: int Id of the closest weather station of the point (lat, lon) """ if "STATION" not in df_weather.columns: raise ValueError("STATION column is missing in given weather dataframe.") weather = df_weather.drop_duplicates(subset=["STATION", "LATITUDE", "LONGITUDE"]) zipped_station_lat_lon = zip( weather["STATION"].values.tolist(), weather["LATITUDE"].values.tolist(), weather["LONGITUDE"].values.tolist(), ) list_station_lat_lon = list(zipped_station_lat_lon) reference_station = list_station_lat_lon[0][0] latitude_0 = list_station_lat_lon[0][1] longitude_0 = list_station_lat_lon[0][2] min_distance = np.sqrt( (latitude - latitude_0) ** 2 + (longitude - longitude_0) ** 2 ) for k in range(1, weather.shape[0]): current_latitude = list_station_lat_lon[k][1] current_longitude = list_station_lat_lon[k][2] current_distance = np.sqrt( (latitude - current_latitude) ** 2 + (longitude - current_longitude) ** 2 ) if current_distance < min_distance: min_distance = current_distance reference_station = list_station_lat_lon[k][0] return int(reference_station)
[docs]def find_closest_location( df_weather: pd.DataFrame, latitude: float, longitude: float ) -> Tuple[float, float]: """ For a given point (`latitude`, `longitude`), get the closest point which exists in `df_weather`. This function is to be used when the user do not choose to use weather stations data but satellite data e.g. ERA5 Land variables. Args: df_weather: pd.DataFrame Dataframe of land/weather conditions latitude: float Latitude of the point to which we want to find the closest point in `df_weather`. longitude: float Longitude of the point to which we want to find the closest in `df_weather`. Returns: Tuple(float, float) Tuple of the closest weather point (closest_lat, closest_lon) of the point (lat, lon) """ if "STATION" in df_weather.columns: raise ValueError( "STATION is in the columns, should use `find_closest_weather_station`." ) weather = df_weather.drop_duplicates(subset=["latitude", "longitude"]) zipped_points_lat_lon = zip( weather["latitude"].values.tolist(), weather["longitude"].values.tolist() ) list_station_lat_lon = list(zipped_points_lat_lon) latitude_0 = list_station_lat_lon[0][0] longitude_0 = list_station_lat_lon[0][1] reference_point = (latitude_0, longitude_0) min_distance = np.sqrt( (latitude - latitude_0) ** 2 + (longitude - longitude_0) ** 2 ) for k in range(1, weather.shape[0]): current_latitude = list_station_lat_lon[k][0] current_longitude = list_station_lat_lon[k][1] current_distance = np.sqrt( (latitude - current_latitude) ** 2 + (longitude - current_longitude) ** 2 ) if current_distance < min_distance: min_distance = current_distance reference_point = (current_latitude, current_longitude) return reference_point
[docs]def url_retrieve(url: str, timeout: Optional[float] = None) -> bytes: """Retrives and pass the content of an URL request. Args: url: URL to request timeout: number of seconds before the request times out. Defaults to 4. Raises: requests.exceptions.ConnectionError: Return: Content of the response """ response = requests.get(url, timeout=timeout, allow_redirects=True) if response.status_code != 200: raise requests.exceptions.ConnectionError( f"Error code {response.status_code} - could not download {url}" ) return response.content
[docs]def get_fname(url: str) -> Tuple[str, Optional[str], Optional[str]]: """Find file name, extension and compression of an archive located by an URL. Args: url: URL of the compressed archive Raises: ValueError: if URL contains more than one extension ValueError: if URL contains more than one compression format Returns: A tuple containing the base file name, extension and compression format """ supported_compressions = ["tar", "gz", "zip"] supported_extensions = ["csv", "geojson", "shp", "shx", "nc"] archive_name = urlparse(url).path.rpartition("/")[-1] base = archive_name.split(".")[0] list_extensions = list(set(supported_extensions) & set(archive_name.split("."))) list_compressions = list(set(supported_compressions) & set(archive_name.split("."))) if len(list_extensions) == 0: extension = None elif len(list_extensions) == 1: extension = list_extensions[0] else: raise ValueError(f"Error {url} contains more than one extension") if len(list_compressions) == 0: compression = None elif len(list_compressions) == 1: compression = list_compressions[0] elif len(list_compressions) == 2: compression = "tar.gz" else: raise ValueError(f"Error {url} contains more than one compression format") return (base, extension, compression)
[docs]def download( url: str, default_extension: str, unzip: Optional[bool] = True, destination: str = "./tmp", ) -> None: """Helper function for downloading, unzipping and saving compressed file from a given URL. Args: url: URL of the compressed archive default_extension: extension of the archive unzip: whether archive should be unzipped. Defaults to True. destination: folder where the file should be saved. Defaults to '.'. """ base, extension, compression = get_fname(url) content = url_retrieve(url) if unzip and compression == "zip": os.makedirs(os.path.dirname(destination), exist_ok=True) with ZipFile(BytesIO(content)) as zip_file: zip_file.extractall(destination) elif unzip and compression == "tar.gz": os.makedirs(os.path.dirname(destination), exist_ok=True) with tarfile.open(fileobj=BytesIO(content), mode="r:gz") as tar_file: tar_file.extractall(path=destination) elif unzip and compression == "gz": file_name = ( f"{base}.{extension}" if extension is not None else f"{base}.{default_extension}" ) full_path = os.path.join(destination, file_name) os.makedirs(os.path.dirname(full_path), exist_ok=True) with gzip.open(BytesIO(content)) as gzip_file, open( full_path, "wb+" ) as unzipped_file: shutil.copyfileobj(gzip_file, unzipped_file) elif not unzip and compression is None: file_name = ( f"{base}.{extension}" if extension is not None else f"{base}.{default_extension}" ) full_path = os.path.join(destination, file_name) os.makedirs(os.path.dirname(full_path), exist_ok=True) with open(full_path, "wb+") as file: file.write(content) elif not unzip and isinstance(compression, str): file_name = f"{base}.{compression}" full_path = os.path.join(destination, file_name) os.makedirs(os.path.dirname(full_path), exist_ok=True) with open(full_path, "wb+") as file: file.write(content) else: raise ValueError("If the file is not compressed set unzip to False")
[docs]def get_ghcn( start_year: Optional[int] = None, end_year: Optional[int] = None, destination: str = "./ghcn", ) -> None: """Download yearly Global Historical Climatology Network - Daily (GHCN-Daily) (.csv) From (NCEI). Args: start_year: first year to be retrieved. Defaults to None. end_year: first that will not be retrieved. Defaults to None. destination: destination directory. Defaults to './ghcn'. """ start_year = datetime.now().year if start_year is None else start_year end_year = ( datetime.now().year + 1 if end_year is None or start_year == end_year else end_year ) for year in range(start_year, end_year): url = f"https://www1.ncdc.noaa.gov/pub/data/ghcn/daily/by_year/{year}.csv.gz" download(url=url, default_extension="csv", unzip=True, destination=destination)
[docs]def get_modis( start_year: Optional[int] = None, end_year: Optional[int] = None, yearly: Optional[bool] = False, destination: str = "./firms", ) -> None: """Download last 24H or yearly France active fires from the FIRMS NASA. Args: start_year: first year to be retrieved. Defaults to None. end_year: first that will not be retrieved. Defaults to None. yearly: whether to download yearly active fires or not. Defaults to False. destination: destination directory. Defaults to './firms'.] """ if yearly is True: start_year = datetime.now().year - 1 if start_year is None else start_year end_year = ( datetime.now().year if end_year is None or start_year == end_year else end_year ) for year in range(start_year, end_year): assert ( start_year != 2020 or end_year != 2021 ), "MODIS active fire archives are only available for the years from 2000 to 2019" url = f"https://firms.modaps.eosdis.nasa.gov/data/country/modis/{year}/modis_{year}_France.csv" download( url=url, default_extension="csv", unzip=False, destination=destination ) else: if start_year is not None: raise BaseException( warnings.warn( "The active fires from the last 24H of the MODIS Satellite will be download." ) ) # type: ignore else: url = "https://firms.modaps.eosdis.nasa.gov/data/active_fire/c6/csv/MODIS_C6_Europe_24h.csv" download( url=url, default_extension="csv", unzip=False, destination=destination )
[docs]def get_nearest_points( source_points: List[Tuple[Any, Any]], candidates: List[Tuple[Any, Any]] ) -> Tuple: """ Find nearest neighbor for all source points from a set of candidate points using KDTree algorithm. Args: source_points: List[Tuple] List of tuples (lat, lon) for which you want to find the closest point in candidates. candidates: List[Tuple] List of tuples (lat, lon) which are all possible closest points. Returns: Tuple indices : array of integers The locations of the neighbors in candidates. distances : array of floats The distances to the nearest neighbors.. """ # Create tree from the candidate points tree = spatial.cKDTree(candidates) # Find closest points and distances distances, indices = tree.query(source_points, k=1) return indices, distances