Source code for pyro_risks.datasets.nasa_wildfires

# Copyright (C) 2021, Pyronear contributors.

# This program is licensed under the GNU Affero General Public License version 3.
# See LICENSE or go to <https://www.gnu.org/licenses/agpl-3.0.txt> for full license details.

import logging
from typing import List, Optional

import geopandas as gpd
import pandas as pd

from pyro_risks import config as cfg

__all__ = ["NASAFIRMS", "NASAFIRMS_VIIRS"]

from .masks import get_french_geom


[docs]class NASAFIRMS(pd.DataFrame): """Wildfire history dataset on French territory, using data from NASA satellites. Accessible by completing the form at https://effis.jrc.ec.europa.eu/applications/data-request-form/ Careful when completing the form, you can either choose to get the dataset in json format or xlsx format. However if your source data is in a csv format, you can still use this class to clean it using the parameter `fmt`. By default, the format is considered to be json. Args: source_path: str Path or URL to your version of the source data fmt: str Format of the source data, can either be "csv", "xlsx" or "json". Default is "json". use_cols: List[str] List of columns to read from the source """ kept_cols = [ "latitude", "longitude", "acq_date", "acq_time", "confidence", "bright_t31", "frp", ] fmt = "json" def __init__( self, source_path: Optional[str] = None, fmt: Optional[str] = None, use_cols: Optional[List[str]] = None, ) -> None: """ Args: source_path: Optional[str] Path or URL to your version of the source data fmt: Optional[str] Format of the source data, can either be "csv", "xlsx" or "json". use_cols: Optional[List[str]] List of columns to keep in the dataframe """ if not isinstance(source_path, str): # Download in cache logging.warning( f"No data source specified for {self.__class__.__name__}, trying fallback." ) source_path = cfg.FR_NASA_FIRMS_FALLBACK if not isinstance(fmt, str): fmt = self.fmt if not isinstance(use_cols, list): use_cols = self.kept_cols if fmt == "json": data = pd.read_json(source_path, orient="records") data = pd.json_normalize(data["features"]) # remove unnecessary prefix data.columns = [col.split(".")[-1] for col in data.columns] # keep defined columns data = data[use_cols] elif fmt == "xlsx": data = pd.read_excel(source_path, usecols=use_cols) elif fmt == "csv": data = pd.read_csv(source_path, usecols=use_cols) # if csv format, the `acq_time` column needs to be changed # the raw data as the format "HHMM", we will transform it # so that it has the format "HHMMSS" # convert type to str data["acq_time"] = data["acq_time"].astype(str) # fill with 0 data["acq_time"] = data["acq_time"].str.ljust(6, "0") # prepare for datetime needs data["acq_time"] = data["acq_time"].apply( lambda s: ":".join(map("{}{}".format, *(s[::2], s[1::2]))) ) else: raise ValueError( "The given format cannot be read, it should be either csv, xlsx or json." ) data["acq_date_time"] = ( data["acq_date"].astype(str) + " " + data["acq_time"].astype(str) ) data["acq_date"] = pd.to_datetime( data["acq_date"], format="%Y-%m-%d", errors="coerce" ) data["acq_date_time"] = pd.to_datetime( data["acq_date_time"], format="%Y-%m-%d %H:%M:%S", errors="coerce" ) data["latitude"] = data["latitude"].astype(float) data["longitude"] = data["longitude"].astype(float) data["bright_t31"] = data["bright_t31"].astype(float) data["frp"] = data["frp"].astype(float) # add departements geometry to allow for departements merging geo_data = gpd.GeoDataFrame( data, geometry=gpd.points_from_xy(data["longitude"], data["latitude"]), crs="EPSG:4326", ) # Match the polygons using the ones of each predefined country area geo_masks = get_french_geom() geo_df = gpd.sjoin(geo_masks, geo_data, how="inner") super().__init__(geo_df.drop(["acq_time", "index_right", "geometry"], axis=1))
[docs]class NASAFIRMS_VIIRS(pd.DataFrame): """Wildfire history dataset on French territory, using data from VIIRS. Args: source_path: str Path or URL to your version of the source data fmt: str Format of the source data, can either be "csv", "xlsx" or "json". Default is "json". use_cols: List[str] List of columns to read from the source """ kept_cols = [ "latitude", "longitude", "acq_date", "acq_time", "confidence", "bright_ti4", "bright_ti5", "frp", "type", ] fmt = "csv" def __init__( self, source_path: Optional[str] = None, fmt: Optional[str] = None, use_cols: Optional[List[str]] = None, ) -> None: """ Args: source_path: Optional[str] Path or URL to your version of the source data fmt: Optional[str] Format of the source data, can either be "csv", "xlsx" or "json". use_cols: Optional[List[str]] List of columns to keep in the dataframe """ if not isinstance(source_path, str): # Download in cache logging.warning( f"No data source specified for {self.__class__.__name__}, trying fallback." ) source_path = cfg.FR_NASA_VIIRS_FALLBACK if not isinstance(fmt, str): fmt = self.fmt if not isinstance(use_cols, list): use_cols = self.kept_cols if fmt == "json": data = pd.read_json(source_path, orient="records") data = pd.json_normalize(data["features"]) # remove unnecessary prefix data.columns = [col.split(".")[-1] for col in data.columns] # keep defined columns data = data[use_cols] elif fmt == "xlsx": data = pd.read_excel(source_path, usecols=use_cols) elif fmt == "csv": data = pd.read_csv(source_path, usecols=use_cols) # if csv format, the `acq_time` column needs to be changed # the raw data as the format "HHMM", we will transform it # so that it has the format "HHMMSS" # convert type to str data["acq_time"] = data["acq_time"].astype(str) # fill with 0 data["acq_time"] = data["acq_time"].str.ljust(6, "0") # prepare for datetime needs data["acq_time"] = data["acq_time"].apply( lambda s: ":".join(map("{}{}".format, *(s[::2], s[1::2]))) ) else: raise ValueError( "The given format cannot be read, it should be either csv, xlsx or json." ) data["acq_date_time"] = ( data["acq_date"].astype(str) + " " + data["acq_time"].astype(str) ) data["acq_date"] = pd.to_datetime( data["acq_date"], format="%Y-%m-%d", errors="coerce" ) data["acq_date_time"] = pd.to_datetime( data["acq_date_time"], format="%Y-%m-%d %H:%M:%S", errors="coerce" ) data["latitude"] = data["latitude"].astype(float) data["longitude"] = data["longitude"].astype(float) data["bright_ti4"] = data["bright_ti4"].astype(float) data["bright_ti5"] = data["bright_ti5"].astype(float) data["frp"] = data["frp"].astype(float) # add departements geometry to allow for departements merging geo_data = gpd.GeoDataFrame( data, geometry=gpd.points_from_xy(data["longitude"], data["latitude"]), crs="EPSG:4326", ) # Match the polygons using the ones of each predefined country area geo_masks = get_french_geom() geo_df = gpd.sjoin(geo_masks, geo_data, how="inner") super().__init__(geo_df.drop(["acq_time", "index_right", "geometry"], axis=1))