Skip to content

MISO Pipeline using RTDIP

This article provides a guide on how to execute a MISO pipeline using RTDIP. This pipeline was tested on an M2 Macbook Pro using VS Code in a Conda (3.11) environment.

Prerequisites

This pipeline assumes you have followed the installation instructions as specified in the Getting Started section. In particular ensure you have installed the following:

RTDIP SDK Installation

Ensure you have installed the RTDIP SDK as follows:

pip install "rtdip-sdk[pipelines,pyspark]"

Components

Name Description
MISODailyLoadISOSource Read daily load data from MISO API.
MISOToMDMTransformer Converts MISO Raw data into Meters Data Model.
SparkDeltaDestination Writes to a Delta table.

Example

Below is an example of how to set up a pipeline to read daily load data from the MISO API, transform it into the Meters Data Model and write it to a Delta table.

from rtdip_sdk.pipelines.sources import MISODailyLoadISOSource
from rtdip_sdk.pipelines.transformers import MISOToMDMTransformer
from rtdip_sdk.pipelines.destinations import SparkDeltaDestination
from pyspark.sql import SparkSession

def pipeline():
    spark = SparkSession.builder.config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0")\
                                .config("spark.sql.extensions","io.delta.sql.DeltaSparkSessionExtension")\
                                .config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog").getOrCreate()

    source_df = MISODailyLoadISOSource(
        spark = spark,
        options = {
        "load_type": "actual",
        "date": "20230520",
        }
    ).read_batch()

    transform_value_df = MISOToMDMTransformer(
        spark=spark,
        data=source_df,
        output_type= "usage"
    ).transform()

    transform_meta_df = MISOToMDMTransformer(
        spark=spark,
        data=source_df,
        output_type= "meta"
    ).transform()

    SparkDeltaDestination(
        data=transform_value_df,
        options={
            "partitionBy":"timestamp"
        },   
        destination="miso_usage_data"
    ).write_batch()    

    SparkDeltaDestination(
        data=transform_meta_df,
        options={
            "partitionBy":"timestamp"
        },   
        destination="miso_meta_data"
    ).write_batch() 

if __name__ == "__main__":
    pipeline()

Using environments

If using an environment, include the following lines at the top of your script to prevent a difference in Python versions in worker and driver:

import sys, os

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable