Source code for pyro_risks.datasets.datasets_mergers

# Copyright (C) 2021, Pyronear contributors.

# This program is licensed under the GNU Affero General Public License version 3.
# See LICENSE or go to <https://www.gnu.org/licenses/agpl-3.0.txt> for full license details.

import pandas as pd

from .utils import (
    find_closest_weather_station,
    find_closest_location,
    get_nearest_points,
)


[docs]def merge_datasets_by_departements( dataframe1: pd.DataFrame, time_col1: str, geometry_col1: str, dataframe2: pd.DataFrame, time_col2: str, geometry_col2: str, how: str, ) -> pd.DataFrame: """ Merge two datasets containing some kind of geometry and date columns. The merge is down on [time_col1, time_col2] and [geometry_col1, geometry_col2]. Here the geometry is based on French departements. Therefore the geometry columns should contains either the code on the departement or its geometry (should be consistent throughout both datasets). Finally the merge is done according to the `how` parameter. Keep me mind that this parameter must be so that the merged dataframe keeps similar dimensions to the weather dataframe. This is because if there is an inner join, we will keep only the days where wildfires were declared. Therefore if the weather dataframe is the left frame, then `how` must be left, if it is the right frame, `how` must be right. Args: dataframe1: pd.DataFrame First dataframe, containing a time column and a geometry one. time_col1: str Name of the time column of dataframe1 on which the merge will be done. geometry_col1: str Name of the geometry column of dataframe1 on which the merge will be done. dataframe2: pd.DataFrame Second dataframe, containing a time column and a geometry one. time_col2: str Name of the time column of dataframe2 on which the merge will be done. geometry_col2: str Name of the geometry column of dataframe2 on which the merge will be done. how: Parameter of the merge, should correspond to which of the left or right frame the weather dataframe is. Returns: pd.DataFrame Merged dataset on French departement. """ merged_data = pd.merge( dataframe1, dataframe2, left_on=[time_col1, geometry_col1], right_on=[time_col2, geometry_col2], how=how, ) return merged_data
[docs]def merge_datasets_by_closest_weather_station( df_weather: pd.DataFrame, time_col_weather: str, df_fires: pd.DataFrame, time_col_fires: str, ) -> pd.DataFrame: """ Merge two datasets: one of weather conditions and the other of wildfires history data. Each dataset must contain a time column, and the weather dataset must have a `STATION` column which allows to identify uniquely each station. The merge is done by finding the closest weather station to each (lat, lon) point of the wildfires history dataset. The latter is then grouped by date and closest_weather_station, which then allows to join it with the weather conditions dataframe. Args: df_weather: pd.DataFrame Weather conditions dataframe. Must have a `STATION` column to identify each weather station. time_col_weather: str Name of the time column in `df_weather`. df_fires: pd.DataFrame Wildfires history dataset, must have points described by their latitude and longitude. time_col_fires: str Name of the time column in `df_fires`. Returns: pd.DataFrame Merged dataset by weather station proximity. """ # For wildfires dataframe, need to find for each point the closest weather station df_fires["closest_weather_station"] = df_fires.apply( lambda row: find_closest_weather_station( df_weather, row["latitude"], row["longitude"] ), axis=1, ) grouped_fires = ( df_fires.groupby(["closest_weather_station", "acq_date"], observed=True) .first() .reset_index() ) merged_data = pd.merge( df_weather, grouped_fires, left_on=[time_col_weather, "STATION"], right_on=[time_col_fires, "closest_weather_station"], how="left", ) return merged_data
[docs]def merge_datasets_by_closest_weather_point( df_weather: pd.DataFrame, time_col_weather: str, df_fires: pd.DataFrame, time_col_fires: str, ) -> pd.DataFrame: """ Merge weather and fire datasets when the weather dataset is provided using satellite data such as ERA5 Land hourly dataset provided here https://cds.climate.copernicus.eu/cdsapp#!/dataset/reanalysis-era5-land?tab=form and accessible through cdsapi. Args: df_weather: pd.DataFrame Weather conditions dataframe, must have "latitude" and "longitude" columns. time_col_weather: str Name of the time column in `df_weather`. df_fires: pd.DataFrame Wildfires history dataset, must have points described by their latitude and longitude. time_col_fires: str Name of the time column in `df_fires`. Returns: pd.DataFrame Merged dataset by weather station proximity. """ # For wildfires dataframe, need to find for each point the closest weather station df_fires["closest_weather_point"] = df_fires.apply( lambda row: find_closest_location( df_weather, row["latitude"], row["longitude"] ), axis=1, ) grouped_fires = ( df_fires.groupby(["closest_weather_point", "acq_date"], observed=True) .first() .reset_index() ) grouped_fires["weather_lat"], grouped_fires["weather_lon"] = ( grouped_fires["closest_weather_point"].str[0], grouped_fires["closest_weather_point"].str[1], ) merged_data = pd.merge( df_weather, grouped_fires, left_on=[time_col_weather, "latitude", "longitude"], right_on=[time_col_fires, "weather_lat", "weather_lon"], how="left", ) return merged_data
[docs]def merge_by_proximity( df_left: pd.DataFrame, time_col_left: str, df_right: pd.DataFrame, time_col_right: str, how: str, ) -> pd.DataFrame: """ Merge df_left and df_right by finding in among all points in df_left, the closest point in df_right. For instance, df_left can be a history wildfires dataset and df_right a weather conditions datasets and we want to match each wildfire with its closest weather point. This can also be used if, for instance, we want to merge FWI dataset (df_left) with ERA5/VIIRS datatset (df_right). Args: df_left: pd.DataFrame Left dataframe, must have "latitude" and "longitude" columns. time_col_left: str Name of the time column in `df_left`. df_right: pd.DataFrame Right dataset, must have points described by their latitude and longitude. time_col_right: str Name of the time column in `df_right`. how: str How the pandas merge needs to be done. Returns: Merged dataset by point (lat/lon) proximity. """ # get all df_right points in adequate format df_tmp = df_right.drop_duplicates(subset=["latitude", "longitude"]) df_tmp = df_tmp.reset_index(drop=True) lat_right = df_tmp["latitude"].values lon_right = df_tmp["longitude"].values candidates = list(zip(lat_right, lon_right)) df_tmp2 = df_left.drop_duplicates(subset=["latitude", "longitude"]) source_points = list(zip(df_tmp2["latitude"].values, df_tmp2["longitude"].values)) indices, _ = get_nearest_points(source_points, candidates) dict_idx_lat_lon = {} for idx in set(indices): df_tmp3 = df_tmp[df_tmp.index == idx] dict_idx_lat_lon[idx] = ( df_tmp3["latitude"].values[0], df_tmp3["longitude"].values[0], ) dict_source_idx = dict(zip(source_points, indices)) df_left["point"] = list(zip(df_left["latitude"], df_left["longitude"])) df_left["corresponding_index"] = df_left["point"].map(dict_source_idx) df_left["closest_point"] = df_left["corresponding_index"].map(dict_idx_lat_lon) df_left["closest_lat"], df_left["closest_lon"] = ( df_left["closest_point"].str[0], df_left["closest_point"].str[1], ) merged_data = pd.merge( df_left, df_right, left_on=[time_col_left, "closest_lat", "closest_lon"], right_on=[time_col_right, "latitude", "longitude"], how=how, ) merged_data = merged_data.drop( ["point", "closest_point", "corresponding_index"], axis=1 ) return merged_data