Source code for pyro_risks.datasets.fwi

# Copyright (C) 2021, Pyronear contributors.

# This program is licensed under the GNU Affero General Public License version 3.
# See LICENSE or go to <https://www.gnu.org/licenses/agpl-3.0.txt> for full license details.

import pandas as pd
import numpy as np
from netCDF4 import Dataset
import geopandas as gpd
from typing import Optional, List, Dict, Any

import requests
import zipfile
import os
import urllib.request
import json
import logging
import tempfile

from shapely.geometry import Point
from shapely import geometry

from pyro_risks import config as cfg
from pyro_risks.datasets.queries_api import call_fwi
from pyro_risks.datasets.masks import get_french_geom


[docs]def load_data(output_path: str) -> None: """Load FWI zipped data from github repo and unzip data in folder output_path. Args: output_path (str): absolute, relative or temporary path """ results = requests.get(cfg.FR_FWI_2019_FALLBACK) os.makedirs(output_path, exist_ok=True) with open(os.path.join(output_path, "fwi_folder.zip"), "wb") as f: f.write(results.content) file = zipfile.ZipFile(os.path.join(output_path, "fwi_folder.zip")) file.extractall(path=os.path.join(output_path, "fwi_unzipped"))
[docs]def include_department(row: pd.Series, polygons_json: Dict[str, Any]) -> str: """Given a row of a dataframe containing longitude and latitude returns name of french department. This function makes use of shapely to return if a polygon contains a point. Args: row (pd.Series): row of dataframe polygons_json (dict): dict with polygons of the departments Returns: str: name of department or empty string """ for i_dep in range(len(polygons_json["features"])): geom = geometry.shape(polygons_json["features"][i_dep]["geometry"]) if geom.contains(Point((row["longitude"], row["latitude"]))): return polygons_json["features"][i_dep]["properties"]["nom"] return ""
[docs]def get_fwi_from_api(date: str) -> gpd.GeoDataFrame: """Call the CDS API and return all fwi variables as a dataframe with geo coordinates and departments. When calling the API we get a zip file that must be extracted (in a tmp directory), then handle each queried variable which is in a separate netcdf file. A dataframe is created with all the variables and then finally we join codes and departments with geopandas. Args: date (str) Returns: pd.DataFrame """ year, month, day = date.split("-") date_concat = date.replace("-", "") with tempfile.TemporaryDirectory() as tmp: call_fwi(tmp, year, month, day) file = zipfile.ZipFile(os.path.join(tmp, f"fwi_{year}_{month}_{day}.zip")) file.extractall(path=os.path.join(tmp, f"fwi_{year}_{month}_{day}")) df0 = pd.DataFrame({}) for var_name in ["BUI", "DC", "DMC", "DSR", "FFMC", "FWI", "ISI"]: var_path = os.path.join( tmp, f"fwi_{year}_{month}_{day}/ECMWF_FWI_{var_name}_{date_concat}_1200_hr_v3.1_int.nc", ) nc = Dataset(var_path, "r") lats = nc.variables["latitude"][:] var = nc.variables[var_name.lower()][:] nc.close() lons = np.arange(-180, 180.25, 0.25) var_cyclic = np.ma.hstack([var[0][:, 720:], var[0][:, :721]]) lon2d, lat2d = np.meshgrid(lons, lats) df = pd.DataFrame( { "latitude": lat2d.flatten(), "longitude": lon2d.flatten(), var_name.lower(): var_cyclic.flatten(), } ) df = df.dropna(subset=[var_name.lower()]) df = df.reset_index(drop=True) if var_name == "BUI": df0 = pd.concat([df0, df], axis=1) else: df0 = pd.merge(df0, df, on=["latitude", "longitude"], how="inner") geo_data = gpd.GeoDataFrame( df0, geometry=gpd.points_from_xy(df0["longitude"], df0["latitude"]), crs="EPSG:4326", ) geo_masks = get_french_geom() geo_df = gpd.sjoin(geo_masks, geo_data, how="inner") return geo_df.drop(["index_right", "geometry"], axis=1)
[docs]def get_fwi_data_for_predict(date: str) -> pd.DataFrame: """Run CDS API queries for dates required by the model and return fwi dataset for predict step. This takes care principally of the lags required for the modelling step. Args: date (str) Returns: pd.DataFrame """ data = get_fwi_from_api(date) data["day"] = date # Lag J-1 lag = np.datetime64(date) - np.timedelta64(1, "D") dataJ1 = get_fwi_from_api(str(lag)) dataJ1["day"] = str(lag) # Lag J-3 lag = np.datetime64(date) - np.timedelta64(3, "D") dataJ3 = get_fwi_from_api(str(lag)) dataJ3["day"] = str(lag) # Lag J-7 lag = np.datetime64(date) - np.timedelta64(7, "D") dataJ7 = get_fwi_from_api(str(lag)) dataJ7["day"] = str(lag) merged_data = pd.concat([data, dataJ1, dataJ3, dataJ7], ignore_index=True) return merged_data
[docs]def get_fwi_data(source_path: str, day: Optional[str] = "20190101") -> pd.DataFrame: """Load and handle netcdf data for selected day. Return pandas dataframe with longitude, latitude, day and fwi indices (fwi, ffmc, dmc, dc, isi, bui, dsr, dr). Args: source_path (str): path with unzipped netcdf fwi data, usually got from load_data. day (str, optional): which day to load. Defaults to '20190101'. Returns: pd.DataFrame: dataframe with all fwi indices for selected day """ nc = Dataset( os.path.join(source_path, "fwi_unzipped/JRC_FWI_{}.nc".format(day)), "r" ) try: lons = nc.variables["lon"][:] lats = nc.variables["lat"][:] fwi = nc.variables["fwi"][:] ffmc = nc.variables["ffmc"][:] dmc = nc.variables["dmc"][:] dc = nc.variables["dc"][:] isi = nc.variables["isi"][:] bui = nc.variables["bui"][:] dsr = nc.variables["dsr"][:] dr = nc.variables["danger_risk"][:] except KeyError: print("Some reading error with: ", day) nc.close() lon2d, lat2d = np.meshgrid(lons, lats) df = pd.DataFrame( { "latitude": lat2d.flatten(), "longitude": lon2d.flatten(), "day": day, "fwi": fwi[0, :, :].flatten(), "ffmc": ffmc[0, :, :].flatten(), "dmc": dmc[0, :, :].flatten(), "dc": dc[0, :, :].flatten(), "isi": isi[0, :, :].flatten(), "bui": bui[0, :, :].flatten(), "dsr": dsr[0, :, :].flatten(), "dr": dr[0, :, :].flatten(), } ) df = df.dropna(subset=["fwi", "ffmc", "dmc", "dc", "isi", "bui", "dsr", "dr"]) df = df.reset_index(drop=True) return df
[docs]def create_departement_df(day_data: pd.DataFrame) -> pd.DataFrame: """Create dataframe with lon, lat coordinates and corresponding departments. Load json with the department polygons and run function include_department to get the name of departments corresponding to each row of input data, typically one day of FWI data got with load_data. This may take a few minutes due to the shapely process. Args: day_data (pd.Dataframe): df with longitudes and latitudes Returns: pd.DataFrame: dataframe with lat, lon and department """ df = day_data.copy() with urllib.request.urlopen(cfg.FR_GEOJSON) as url: dep_polygons = json.loads(url.read().decode()) deps = [include_department(df.iloc[i], dep_polygons) for i in range(df.shape[0])] df["departement"] = deps df = df[df["departement"] != ""] dep_geo_df = df[["latitude", "longitude", "departement"]] return dep_geo_df
[docs]class GwisFwi(pd.DataFrame): """GWIS FWI dataframe (8 km resolution) on French territory based on 2019-2020 data.""" def __init__(self, days_list: Optional[List[str]] = None) -> None: """Create dataframe with fwi indices data corresponding to days_list and join french department. Args: days_list: list of str, format year month day (all concatenated) """ days_list = ["20190101"] if days_list is None else days_list fwi_df = pd.DataFrame( columns=[ "latitude", "longitude", "day", "fwi", "ffmc", "dmc", "dc", "isi", "bui", "dsr", "dr", ] ) with tempfile.TemporaryDirectory() as tmp: load_data(output_path=tmp) for day in days_list: df = get_fwi_data(source_path=tmp, day=day) fwi_df = pd.concat([fwi_df, df]) dep_geo_df = create_departement_df(df) fwi_df = pd.merge(fwi_df, dep_geo_df, on=["latitude", "longitude"]) super().__init__(fwi_df)