Skip to content

MISO Pipeline using RTDIP and Databricks

This article provides a guide on how to deploy a MISO pipeline from a local file to a Databricks workflow using the RTDIP SDK and was tested on an M2 Macbook Pro using VS Code in a Conda (3.11) environment. RTDIP Pipeline Components provide Databricks with all the required Python packages and JARs to execute each component, this will automatically be set up during workflow creation.

Prerequisites

This pipeline assumes you have a Databricks workspace and have followed the installation instructions as specified in the Getting Started section. In particular ensure you have installed the following:

RTDIP SDK Installation

Ensure you have installed the RTDIP SDK as follows:

pip install "rtdip-sdk[pipelines]"

Components

Name Description
MISODailyLoadISOSource Read daily load data from MISO API.
MISOToMDMTransformer Converts MISO Raw data into Meters Data Model.
SparkDeltaDestination Writes to a Delta table.
DatabricksSDKDeploy Deploys an RTDIP Pipeline to Databricks Workflows leveraging the Databricks SDK.
DeltaTableOptimizeUtility Optimizes a Delta Table
DeltaTableVacuumUtility Vacuums a Delta Table

Example

Below is an example of how to set up a pipeline job to read daily load data from the MISO API, transform it into the Meters Data Model and write it to a Delta table.

from rtdip_sdk.pipelines.sources import MISODailyLoadISOSource
from rtdip_sdk.pipelines.transformers import MISOToMDMTransformer
from rtdip_sdk.pipelines.destinations import SparkDeltaDestination

def pipeline():   
    source_df = MISODailyLoadISOSource(
        spark = spark,
        options = {
        "load_type": "actual",
        "date": "20230520",
        }
    ).read_batch()

    transform_value_df = MISOToMDMTransformer(
        spark=spark,
        data=source_df,
        output_type= "usage"
    ).transform()

    transform_meta_df = MISOToMDMTransformer(
        spark=spark,
        data=source_df,
        output_type= "meta"
    ).transform()

    SparkDeltaDestination(
        data=transform_value_df,
        options={
            "partitionBy":"timestamp"
        },   
        destination="miso_usage_data" 
    ).write_batch()    

    SparkDeltaDestination(
        data=transform_meta_df,
        options={
            "partitionBy":"timestamp"
        },   
        destination="miso_meta_data",
        mode="overwrite"
    ).write_batch() 

if __name__ == "__main__":
    pipeline()

Maintenance

The RTDIP SDK can be used to maintain Delta tables in Databricks, an example of how to set up a maintenance job to optimize and vacuum the MISO tables written from the previous example is provided below.

from rtdip_sdk.pipelines.utilities import DeltaTableOptimizeUtility, DeltaTableVacuumUtility

def maintenance():
    TABLE_NAMES = [
        "{path.to.table.miso_usage_data}",
        "{path.to.table.miso_meta_data}"
    ]

    for table in TABLE_NAMES:

        DeltaTableOptimizeUtility(
            spark=spark, 
            table_name=table
        ).execute()

        DeltaTableVacuumUtility(
            spark=spark,
            table_name=table
        ).execute()

if __name__ == "__main__":
    maintenance()

Deploy

Deployment to Databricks uses the Databricks SDK. Users have the option to control the job's configurations including the cluster and schedule.

from rtdip_sdk.pipelines.deploy import DatabricksSDKDeploy, CreateJob, JobCluster, ClusterSpec, Task, NotebookTask, AutoScale, RuntimeEngine, DataSecurityMode, CronSchedule, Continuous, PauseStatus
from rtdip_sdk.authentication.azure import DefaultAuth

def deploy():
    credential = DefaultAuth().authenticate()
    access_token = credential.get_token("2ff814a6-3304-4ab8-85cb-cd0e6f879c1d/.default").token

    DATABRICKS_WORKSPACE = "{databricks-workspace-url}"

    # Create clusters
    cluster_list = []
    cluster_list.append(JobCluster(
        job_cluster_key="pipeline-cluster",
        new_cluster=ClusterSpec(
            node_type_id="Standard_E4ds_v5",
            autoscale=AutoScale(min_workers=1, max_workers=8),
            spark_version="13.3.x-scala2.12",
            data_security_mode=DataSecurityMode.SINGLE_USER,
            runtime_engine=RuntimeEngine.STANDARD
        )
    ))

    # Create tasks
    task_list = []
    task_list.append(Task(
        task_key="pipeline",
        job_cluster_key="pipeline-cluster",
        notebook_task=NotebookTask(
            notebook_path="{path/to/pipeline.py}"
        )
    ))

    # Create a Databricks Job for the Task
    job = CreateJob(
        name="rtdip-miso-batch-pipeline-job",
        job_clusters=cluster_list,
        tasks=task_list,
        continuous=Continuous(pause_status=PauseStatus.UNPAUSED)
    )

    # Deploy to Databricks
    databricks_pipeline_job = DatabricksSDKDeploy(databricks_job=job, host=DATABRICKS_WORKSPACE, token=access_token, workspace_directory="{path/to/databricks/workspace/directory}")
    databricks_pipeline_job.deploy()

    cluster_list = []
    cluster_list.append(JobCluster(
        job_cluster_key="maintenance-cluster",
        new_cluster=ClusterSpec(
            node_type_id="Standard_E4ds_v5",
            autoscale=AutoScale(min_workers=1, max_workers=3),
            spark_version="13.3.x-scala2.12",
            data_security_mode=DataSecurityMode.SINGLE_USER,
            runtime_engine=RuntimeEngine.PHOTON
        )
    ))

    task_list = []
    task_list.append(Task(
        task_key="rtdip-miso-maintenance-task",
        job_cluster_key="maintenance-cluster",
        notebook_task=NotebookTask(
            notebook_path="{path/to/maintenance.py}"
        )
    ))

    # Create a Databricks Job for the Task
    job = CreateJob(
        name="rtdip-miso-maintenance-job",
        job_clusters=cluster_list,
        tasks=task_list,
        schedule=CronSchedule(
            quartz_cron_expression="4 * * * * ?",
            timezone_id="UTC",
            pause_status=PauseStatus.UNPAUSED
        )
    )

    # Deploy to Databricks
    databricks_pipeline_job = DatabricksSDKDeploy(databricks_job=job, host=DATABRICKS_WORKSPACE, token=access_token, workspace_directory="{path/to/databricks/workspace/directory}")
    databricks_pipeline_job.deploy()

if __name__ == "__main__":
    deploy()