Source code for pyro_risks.datasets.ERA5

# Copyright (C) 2021, Pyronear contributors.

# This program is licensed under the GNU Affero General Public License version 3.
# See LICENSE or go to <https://www.gnu.org/licenses/agpl-3.0.txt> for full license details.

import logging
from typing import Optional

import os
import geopandas as gpd
import pandas as pd
import numpy as np
import requests
import xarray as xr
import tempfile

from pyro_risks import config as cfg
from .masks import get_french_geom
from pyro_risks.datasets.queries_api import call_era5land, call_era5t


__all__ = ["ERA5Land", "ERA5T"]


def get_data_era5land_for_predict(date: str) -> pd.DataFrame:
    """
        Get ERA5Land dataframe for given date using call to cdsapi
        and appropriate class.

        Args:
            date: str
        Date with the following format: "YEAR-MONTH-DAY" eg. "2020-05-12"

        Returns: pd.DataFrame
            Dataframe containing ERA5 Land data for the requested day.
    """
    with tempfile.TemporaryDirectory() as tmp:
        year, month, day = date.split("-")
        call_era5land(tmp, year, month, day)
        # TODO: make sure that the directory works when on server
        data = ERA5Land(
            source_path=os.path.join(tmp, f"era5land_{year}_{month}_{day}.nc")
        )

        # Lag J-1
        lag = np.datetime64(date) - np.timedelta64(1, "D")
        year, month, day = str(lag).split("-")
        call_era5land(tmp, year, month, day)
        dataJ1 = ERA5Land(
            source_path=os.path.join(tmp, f"era5land_{year}_{month}_{day}.nc")
        )

        # Lag J-3
        lag = np.datetime64(date) - np.timedelta64(3, "D")
        year, month, day = str(lag).split("-")
        call_era5land(tmp, year, month, day)
        dataJ3 = ERA5Land(
            source_path=os.path.join(tmp, f"era5land_{year}_{month}_{day}.nc")
        )

        # Lag J-7
        lag = np.datetime64(date) - np.timedelta64(7, "D")
        year, month, day = str(lag).split("-")
        call_era5land(tmp, year, month, day)
        dataJ7 = ERA5Land(
            source_path=os.path.join(tmp, f"era5land_{year}_{month}_{day}.nc")
        )

        merged_data = pd.concat([data, dataJ1, dataJ3, dataJ7], ignore_index=True)
        return merged_data


def get_data_era5t_for_predict(date: str) -> pd.DataFrame:
    """
        Get ERA5T dataframe for given date using call to cdsapi
        and appropriate class.

        Args:
            date: str
        Date with the following format: "YEAR-MONTH-DAY" eg. "2020-05-12"

        Returns: pd.DataFrame
            Dataframe containing ERA5T data for the requested day.
    """
    with tempfile.TemporaryDirectory() as tmp:
        year, month, day = date.split("-")
        call_era5t(tmp, year, month, day)
        # TODO: make sure that the directory works when on server
        data = ERA5T(source_path=os.path.join(tmp, f"era5t_{year}_{month}_{day}.nc"))
        # Lag J-1
        lag = np.datetime64(f"{year}-{month}-{day}") - np.timedelta64(1, "D")
        year, month, day = str(lag).split("-")
        call_era5t(tmp, year, month, day)
        dataJ1 = ERA5T(source_path=os.path.join(tmp, f"era5t_{year}_{month}_{day}.nc"))
        # Lag J-3
        lag = np.datetime64(f"{year}-{month}-{day}") - np.timedelta64(3, "D")
        year, month, day = str(lag).split("-")
        call_era5t(tmp, year, month, day)
        dataJ3 = ERA5T(source_path=os.path.join(tmp, f"era5t_{year}_{month}_{day}.nc"))
        # Lag J-7
        lag = np.datetime64(f"{year}-{month}-{day}") - np.timedelta64(7, "D")
        year, month, day = str(lag).split("-")
        call_era5t(tmp, year, month, day)
        dataJ7 = ERA5T(source_path=os.path.join(tmp, f"era5t_{year}_{month}_{day}.nc"))
        merged_data = pd.concat([data, dataJ1, dataJ3, dataJ7], ignore_index=True)
        return merged_data


[docs]class ERA5Land(pd.DataFrame): """Provides ERA5-Land clean dataset as a pandas dataframe. ERA5-Land is a reanalysis dataset providing a consistent view of the evolution of land variables over several decades at an enhanced resolution compared to ERA5. ERA5-Land uses as input to control the simulated land fields ERA5 atmospheric variables, such as air temperature and air humidity. Using cdaspi https://pypi.org/project/cdsapi/ with access key, the user can get the dataset at https://cds.climate.copernicus.eu/cdsapp#!/dataset/reanalysis-era5-land?tab=overview The provided dataset has to be in netCDF4 format here. Args: source_path: str Path or URL to your version of the source data """ def __init__(self, source_path: Optional[str] = None) -> None: """ Args: source_path: Optional[str] Path or URL to your version of the source data """ if not isinstance(source_path, str): # Download in cache logging.warning( f"No data source specified for {self.__class__.__name__}, trying fallback." ) source_path = cfg.FR_ERA5LAND_FALLBACK if source_path.startswith("http"): with requests.get(source_path) as resp: ds = xr.open_dataset(resp.content) data = ds.to_dataframe() else: ds = xr.open_dataset(source_path) data = ds.to_dataframe() # Drop NaNs which correspond to no land data = data.dropna() data = data.reset_index() data["time"] = pd.to_datetime( data["time"], format="%Y-%m-%d %H:%M:%S", errors="coerce" ) data["time"] = data["time"].dt.normalize() # Transform into geopandas dataframe geo_data = gpd.GeoDataFrame( data, geometry=gpd.points_from_xy(data["longitude"], data["latitude"]), crs="EPSG:4326", ) # Match the polygons using the ones of each predefined country area geo_masks = get_french_geom() geo_df = gpd.sjoin(geo_masks, geo_data, how="inner") super().__init__(geo_df.drop(["index_right", "geometry"], axis=1))
[docs]class ERA5T(pd.DataFrame): """Provides ERA5T clean dataset as a pandas dataframe. The provided dataset has to be in netCDF4 format here. Args: source_path: str Path or URL to your version of the source data """ def __init__(self, source_path: Optional[str] = None) -> None: """ Args: source_path: Optional[str] Path or URL to your version of the source data """ if not isinstance(source_path, str): # Download in cache logging.warning( f"No data source specified for {self.__class__.__name__}, trying fallback." ) source_path = cfg.FR_ERA5T_FALLBACK if source_path.startswith("http"): with requests.get(source_path) as resp: ds = xr.open_dataset(resp.content) data = ds.to_dataframe() else: ds = xr.open_dataset(source_path) data = ds.to_dataframe() # Drop columns with NaNs data = data.dropna(axis=1) data = data.reset_index() data["time"] = pd.to_datetime( data["time"], format="%Y-%m-%d %H:%M:%S", errors="coerce" ) data["time"] = data["time"].dt.normalize() # Transform into geopandas dataframe geo_data = gpd.GeoDataFrame( data, geometry=gpd.points_from_xy(data["longitude"], data["latitude"]), crs="EPSG:4326", ) # Match the polygons using the ones of each predefined country area geo_masks = get_french_geom() geo_df = gpd.sjoin(geo_masks, geo_data, how="inner") super().__init__(geo_df.drop(["index_right", "geometry"], axis=1))