Skip to content

Fledge Pipeline using Dagster and Databricks Connect

This article provides a guide on how to deploy a pipeline in dagster using the RTDIP SDK and Databricks Connect. This pipeline was tested on an M2 Macbook Pro using VS Code in a Python (3.10) environment.

Note

Reading from Eventhubs is currently not supported on Databricks Connect.

Prerequisites

Deployment using Databricks Connect requires:

  • a Databricks workspace

  • a cluster in the same workspace

  • a personal access token

Further information on Databricks requirements can be found here.

This pipeline job requires the packages:

Dagster Installation

For Mac users with an M1 or M2 chip, installation of dagster should be done as follows:

pip install dagster dagster-webserver --find-links=https://github.com/dagster-io/build-grpcio/wiki/Wheels

Components

Name Description
SparkDeltaSource Read data from a Delta table.
BinaryToStringTransformer Converts a Spark DataFrame column from binary to string.
FledgeOPCUAJsonToPCDMTransformer Converts a Spark DataFrame column containing a json string to the Process Control Data Model.
SparkDeltaDestination Writes to a Delta table.

Authentication

For Databricks authentication, the following fields should be added to a configuration profile in your .databrickscfg file:

[PROFILE]
host = https://{workspace_instance}
token = dapi...
cluster_id = {cluster_id}

This profile should match the configurations in your DatabricksSession in the example below as it will be used by the Databricks extension in VS Code for authenticating your Databricks cluster.

Example

Below is an example of how to set up a pipeline to read Fledge data from a Delta table, transform it to RTDIP's PCDM model and write it to a Delta table.

from dagster import Definitions, ResourceDefinition, graph, op
from databricks.connect import DatabricksSession
from rtdip_sdk.pipelines.sources.spark.delta import SparkDeltaSource
from rtdip_sdk.pipelines.transformers.spark.binary_to_string import BinaryToStringTransformer
from rtdip_sdk.pipelines.transformers.spark.fledge_opcua_json_to_pcdm import FledgeOPCUAJsonToPCDMTransformer
from rtdip_sdk.pipelines.destinations.spark.delta import SparkDeltaDestination

# Databricks cluster configuration
databricks_resource = ResourceDefinition.hardcoded_resource(
                    DatabricksSession.builder.remote(
                        host       = "https://{workspace_instance_name}",
                        token      = "{token}",
                        cluster_id = "{cluster_id}"
                        ).getOrCreate()
)

# Pipeline
@op(required_resource_keys={"databricks"})
def pipeline(context):
    spark = context.resources.databricks
    source = SparkDeltaSource(spark, {}, "{path_to_table}").read_batch()
    transformer = BinaryToStringTransformer(source, "{source_column_name}", "{target_column_name}").transform()
    transformer = FledgeOPCUAJsonToPCDMTransformer(transformer, "{source_column_name}").transform()
    SparkDeltaDestination(transformer, {}, "{path_to_table}").write_batch()

@graph
def fledge_pipeline():
    pipeline()

fledge_pipeline_job = fledge_pipeline.to_job(
    resource_defs={ 
                    "databricks": databricks_resource
                   }
)

defs = Definitions(jobs=[fledge_pipeline_job])

Deploy

The following command deploys the pipeline to dagster: dagster dev -f <path/to/file.py>

Using the link provided from the command above, click on Launchpad and hit run to run the pipeline.