Skip to content

ECMWF to Delta Pipeline

This article provides a guide on how to execute a pipeline that makes an API request to pull the ECMWF MARS Data as a .nc file, transform the .nc file to a dataframe from a grid range and writes to a Delta Table locally using the RTDIP SDK.

This pipeline was tested on an M2 Macbook Pro using VS Code in a Python (3.11) environment.

Prerequisites

This pipeline job requires the packages:

Components

Name Description
SparkECMWFWeatherForecastSource Pulls data from ECMWF MARS API and stores as a .nc file.
ECMWFExtractGridToWeatherDataModel Transforms ECMWF .nc file to a dataframe ingesting Grid Data.
SparkDeltaDestination Writes to Delta.

Common Errors

Error Solution
[com.google.common.util.concurrent.ExecutionError: java.lang.NoClassDefFoundError: org/apache/spark/ErrorClassesJsonReader] The Delta version in the Spark Session must be compatible with your local Pyspark version. See here for version compatibility

Example

Below is an example of how to read from and write to Delta Tables locally without the need for Spark

from rtdip_sdk.pipelines.sources.spark.ecmwf.weather_forecast import SparkECMWFWeatherForecastSource
from rtdip_sdk.pipelines.transformers.spark.ecmwf.nc_extractgrid_to_weather_data_model import ECMWFExtractGridToWeatherDataModel
from rtdip_sdk.pipelines.destinations.spark.delta import SparkDeltaDestination
from rtdip_sdk.pipelines.utilities import SparkSessionUtility


ecmwf_api_key = "xxxxx"
ecmwf_api_email = "xxxxx"

date_start = "2020-10-01 00:00:00"
date_end = "2020-10-02 00:00:00"

ecmwf_class = "od"
stream = "oper"
expver = "1"
leveltype = "sfc"
run_interval = "12"
run_frequency = "H"
grid_step = 0.1
ec_vars = [
    "cbh", "dsrp", "sp", "tcwv", "tcc"
]

tag_prefix = "US:[55, -130, 20, -60]:"
method = "nearest"
path = '/dbfs/forecast/nc/US/' # Path to save the data can be changed
forecast_area = [55, -130, 20, -60]  # N/W/S/E
lat_max = 50
lat_min = 25
lon_max = -65
lon_min = -75


def pipeline():

    spark = SparkSessionUtility(config={}).execute()


    weather_source = SparkECMWFWeatherForecastSource(
        spark=spark,
        date_start=date_start,
        date_end=date_end,
        save_path=path,
        ecmwf_class=ecmwf_class,
        stream=stream,
        expver=expver,
        leveltype=leveltype,
        ec_vars=ec_vars,
        forecast_area=forecast_area,
        ecmwf_api_key=ecmwf_api_key,
        ecmwf_api_email=ecmwf_api_email,
    )

    weather_source.read_batch()

    extract = ECMWFExtractGridToWeatherDataModel(
            lat_min=lat_min,
            lat_max=lat_max,
            lon_min=lon_min,
            lon_max=lon_max,
            grid_step=grid_step,
            load_path=path,
            date_start=date_start,
            date_end=date_end,
            run_interval=run_interval,
            run_frequency=run_frequency
    )

    df = extract.transform(tag_prefix, ec_vars, method)

    sparkdf = spark.createDataFrame(df) 

    SparkDeltaDestination(
        data=sparkdf, options={}, destination="{path/to/table}"
    ).write_batch()


if __name__ == "__main__":
    pipeline()