Source code for pyro_risks.datasets.era_fwi_viirs

# Copyright (C) 2021, Pyronear contributors.

# This program is licensed under the GNU Affero General Public License version 3.
# See LICENSE or go to <https://www.gnu.org/licenses/agpl-3.0.txt> for full license details.

import logging
import pandas as pd
from typing import Optional

from pyro_risks.datasets import NASAFIRMS_VIIRS, ERA5Land, ERA5T
from pyro_risks.datasets.utils import get_intersection_range
from pyro_risks.datasets.fwi import GwisFwi
from pyro_risks import config as cfg

__all__ = ["MergedEraFwiViirs"]


logger = logging.getLogger("uvicorn.info")


def process_dataset_to_predict(fwi: pd.DataFrame, era: pd.DataFrame) -> pd.DataFrame:
    """Groupby and merge fwi and era5 datasets for model predictions.

    Args:
        fwi (pd.DataFrame): Fwi dataset
        era (pd.DataFrame): Era5 dataset

    Returns:
        pd.DataFrame: one line per department and day
    """
    weather = era.copy()
    weather["time"] = pd.to_datetime(
        weather["time"], format="%Y-%m-%d", errors="coerce"
    )
    fwi_df = fwi.copy()
    fwi_df["day"] = pd.to_datetime(fwi_df["day"], format="%Y-%m-%d", errors="coerce")

    # Group fwi dataframe by day and department and compute min, max, mean, std
    agg_fwi_df = (
        fwi_df.groupby(["day", "nom"])[cfg.FWI_VARS]
        .agg(["min", "max", "mean", "std"])
        .reset_index()
    )
    agg_fwi_df.columns = ["day", "nom"] + [
        x[0] + "_" + x[1] for x in agg_fwi_df.columns if x[1] != ""
    ]

    logger.info("Finished aggregationg of FWI")

    # Group weather dataframe by day and department and compute min, max, mean, std
    agg_wth_df = (
        weather.groupby(["time", "nom"])[cfg.WEATHER_ERA5T_VARS]
        .agg(["min", "max", "mean", "std"])
        .reset_index()
    )
    agg_wth_df.columns = ["day", "nom"] + [
        x[0] + "_" + x[1] for x in agg_wth_df.columns if x[1] != ""
    ]

    logger.info("Finished aggregationg of weather data")

    # Merge fwi and weather together
    res_df = pd.merge(agg_fwi_df, agg_wth_df, on=["day", "nom"], how="inner")
    logger.info("Finished merging")
    return res_df


[docs]class MergedEraFwiViirs(pd.DataFrame): """Create dataframe for modeling described in models/score_v0.py. Get weather, nasafirms viirs fires and fwi datasets, then filter some of the lines corresponding to vegetation fires excluding low confidence ones merges. Finally aggregated versions of the dataframes by department and by day. For each of the features of weather and fwi datasets creates min, max, mean and std. Fires are counted for each department and day. Returns: pd.DataFrame """ def __init__( self, era_source_path: Optional[str] = None, viirs_source_path: Optional[str] = None, fwi_source_path: Optional[str] = None, ) -> None: """Define the merged era-fwi-viirs dataframe. Args: era_source_path (str, optional): Era5 data source path. Defaults to None. viirs_source_path (str, optional): Viirs data source path. Defaults to None. fwi_source_path (str, optional): Fwi data source path. Defaults to None. """ weather = ERA5T(era_source_path) # ERA5Land(era_source_path) nasa_firms = NASAFIRMS_VIIRS(viirs_source_path) # Time span selection date_range = get_intersection_range(weather.time, nasa_firms.acq_date) weather = weather[weather.time.isin(date_range)] nasa_firms = nasa_firms[nasa_firms.acq_date.isin(date_range)] # Keep only vegetation wildfires and remove thermal anomalies with low confidence where = (nasa_firms["confidence"] != "l") & (nasa_firms["type"] == 0) nasa_firms = nasa_firms[where] # Get FWI dataset for year 2019 (1st september missing) if fwi_source_path is None: days = [ x.strftime("%Y%m%d") for x in pd.date_range(start="2019-01-01", end="2019-12-31") ] days.remove("20190901") fwi_df = GwisFwi(days_list=days) else: fwi_df = pd.read_csv(fwi_source_path) # Load FWI dataset fwi_df["day"] = pd.to_datetime(fwi_df["day"], format="%Y%m%d", errors="coerce") # Group fwi dataframe by day and department and compute min, max, mean, std agg_fwi_df = ( fwi_df.groupby(["day", "departement"])[cfg.FWI_VARS] .agg(["min", "max", "mean", "std"]) .reset_index() ) agg_fwi_df.columns = ["day", "departement"] + [ x[0] + "_" + x[1] for x in agg_fwi_df.columns if x[1] != "" ] # Group weather dataframe by day and department and compute min, max, mean, std agg_wth_df = ( weather.groupby(["time", "nom"])[cfg.WEATHER_ERA5T_VARS] .agg(["min", "max", "mean", "std"]) .reset_index() ) agg_wth_df.columns = ["day", "departement"] + [ x[0] + "_" + x[1] for x in agg_wth_df.columns if x[1] != "" ] # Merge fwi and weather together mid_df = pd.merge( agg_fwi_df, agg_wth_df, on=["day", "departement"], how="inner" ) # Count fires by day and department fires_count = ( nasa_firms.groupby(["acq_date", "nom"])["confidence"] .count() .to_frame() .reset_index() ) fires_count = fires_count.rename({"confidence": "fires"}, axis=1) # Merge fires final_df = pd.merge( mid_df, fires_count, left_on=["day", "departement"], right_on=["acq_date", "nom"], how="left", ).drop(["acq_date", "nom"], axis=1) # Fill lines with no fires with 0 final_df["fires"] = final_df["fires"].fillna(0) super().__init__(final_df)