Skip to content

Fledge Pipeline using Dagster and Databricks Connect

This article provides a guide on how to deploy a pipeline in dagster using the RTDIP SDK and Databricks Connect. This pipeline was tested on an M2 Macbook Pro using VS Code in a Python (3.10) environment.


Reading from Eventhubs is currently not supported on Databricks Connect.


Deployment using Databricks Connect requires:

  • a Databricks workspace

  • a cluster in the same workspace

  • a personal access token

Further information on Databricks requirements can be found here.

This pipeline job requires the packages:

Dagster Installation

For Mac users with an M1 or M2 chip, installation of dagster should be done as follows:

pip install dagster dagster-webserver --find-links=


Name Description
SparkDeltaSource Read data from a Delta table.
BinaryToStringTransformer Converts a Spark DataFrame column from binary to string.
FledgeOPCUAJsonToPCDMTransformer Converts a Spark DataFrame column containing a json string to the Process Control Data Model.
SparkDeltaDestination Writes to a Delta table.


For Databricks authentication, the following fields should be added to a configuration profile in your .databrickscfg file:

host = https://{workspace_instance}
token = dapi...
cluster_id = {cluster_id}

This profile should match the configurations in your DatabricksSession in the example below as it will be used by the Databricks extension in VS Code for authenticating your Databricks cluster.


Below is an example of how to set up a pipeline to read Fledge data from a Delta table, transform it to RTDIP's PCDM model and write it to a Delta table.

from dagster import Definitions, ResourceDefinition, graph, op
from databricks.connect import DatabricksSession
from import SparkDeltaSource
from rtdip_sdk.pipelines.transformers.spark.binary_to_string import BinaryToStringTransformer
from rtdip_sdk.pipelines.transformers.spark.fledge_opcua_json_to_pcdm import FledgeOPCUAJsonToPCDMTransformer
from import SparkDeltaDestination

# Databricks cluster configuration
databricks_resource = ResourceDefinition.hardcoded_resource(
                        host       = "https://{workspace_instance_name}",
                        token      = "{token}",
                        cluster_id = "{cluster_id}"

# Pipeline
def pipeline(context):
    spark = context.resources.databricks
    source = SparkDeltaSource(spark, {}, "{path_to_table}").read_batch()
    transformer = BinaryToStringTransformer(source, "{source_column_name}", "{target_column_name}").transform()
    transformer = FledgeOPCUAJsonToPCDMTransformer(transformer, "{source_column_name}").transform()
    SparkDeltaDestination(transformer, {}, "{path_to_table}").write_batch()

def fledge_pipeline():

fledge_pipeline_job = fledge_pipeline.to_job(
                    "databricks": databricks_resource

defs = Definitions(jobs=[fledge_pipeline_job])


The following command deploys the pipeline to dagster: dagster dev -f <path/to/>

Using the link provided from the command above, click on Launchpad and hit run to run the pipeline.