Skip to content

MISO Pipeline using RTDIP

This article provides a guide on how to execute a MISO pipeline using RTDIP. This pipeline was tested on an M2 Macbook Pro using VS Code in a Conda (3.11) environment.

Prerequisites

This pipeline assumes you have a valid API key from PJM and have followed the installation instructions as specified in the Getting Started section. In particular ensure you have installed the following:

RTDIP SDK Installation

Ensure you have installed the RTDIP SDK as follows:

pip install "rtdip-sdk[pipelines,pyspark]"

Components

Name Description
PJMDailyLoadISOSource Read daily load data from MISO API.
PJMToMDMTransformer Converts PJM Raw data into Meters Data Model.
SparkDeltaDestination Writes to a Delta table.

Example

Below is an example of how to set up a pipeline to read daily load data from the PJM API, transform it into the Meters Data Model and write it to a Delta table.

from rtdip_sdk.pipelines.sources import PJMDailyLoadISOSource
from rtdip_sdk.pipelines.transformers import PJMToMDMTransformer
from rtdip_sdk.pipelines.destinations import SparkDeltaDestination
from pyspark.sql import SparkSession

def pipeline():
    spark = SparkSession.builder.config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0")\
                                .config("spark.sql.extensions","io.delta.sql.DeltaSparkSessionExtension")\
                                .config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog").getOrCreate()

    source_df = PJMDailyLoadISOSource(
        spark = spark,
        options = {
            "api_key": "{api_key}", 
            "load_type": "actual"
        }
    ).read_batch()

    transform_value_df = PJMToMDMTransformer(
        spark=spark,
        data=source_df,
        output_type= "usage"
    ).transform()

    transform_meta_df = PJMToMDMTransformer(
        spark=spark,
        data=source_df,
        output_type= "meta"
    ).transform()

    SparkDeltaDestination(
        data=transform_value_df,
        options={
            "partitionBy":"timestamp"
        },   
        destination="pjm_usage_data"
    ).write_batch()    

    SparkDeltaDestination(
        data=transform_meta_df,
        options={
            "partitionBy":"timestamp"
        },   
        destination="pjm_meta_data"
    ).write_batch() 

if __name__ == "__main__":
    pipeline()

Using environments

If using an environment, include the following lines at the top of your script to prevent a difference in Python versions in worker and driver:

import sys, os

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable