Skip to content

Fledge Pipeline using Dagster

This article provides a guide on how to deploy a pipeline in dagster using the RTDIP SDK. This pipeline was tested on an M2 Macbook Pro using VS Code in a Python (3.10) environment.


This pipeline job requires the packages:

Dagster Installation

For Mac users with an M1 or M2 chip, installation of dagster should be done as follows:

pip install dagster dagster-webserver --find-links=


Name Description
SparkEventhubSource Read data from an Eventhub.
BinaryToStringTransformer Converts a Spark DataFrame column from binary to string.
FledgeOPCUAJsonToPCDMTransformer Converts a Spark DataFrame column containing a json string to the Process Control Data Model.
SparkDeltaDestination Writes to a Delta table.


Below is an example of how to set up a pipeline to read Fledge data from an Eventhub, transform it to RTDIP's PCDM model and write it to a Delta table on your machine.

import json
from datetime import datetime as dt
from dagster import Definitions, graph, op
from dagster_pyspark.resources import pyspark_resource
from rtdip_sdk.pipelines.sources.spark.eventhub import SparkEventhubSource
from rtdip_sdk.pipelines.transformers.spark.binary_to_string import BinaryToStringTransformer
from rtdip_sdk.pipelines.transformers.spark.fledge_opcua_json_to_pcdm import FledgeOPCUAJsonToPCDMTransformer
from import SparkDeltaDestination

# PySpark cluster configuration
packages = ","
my_pyspark_resource = pyspark_resource.configured(
    {"spark_conf": {"spark.default.parallelism": 1,
                    "spark.jars.packages": packages,
                    "spark.sql.extensions": "", 
                    "spark.sql.catalog.spark_catalog": ""

# EventHub configuration
eventhub_connection_string = "{eventhub_connection_string}"
eventhub_consumer_group = "{eventhub_consumer_group}"

startOffset = "-1"
endTime ="%Y-%m-%dT%H:%M:%S.%fZ")

startingEventPosition = {
  "offset": startOffset,  
  "seqNo": -1,            
  "enqueuedTime": None,   
  "isInclusive": True

endingEventPosition = {
  "offset": None,           
  "seqNo": -1,              
  "enqueuedTime": endTime,
  "isInclusive": True

ehConf = {
'eventhubs.connectionString' : eventhub_connection_string,
'eventhubs.consumerGroup': eventhub_consumer_group,
'eventhubs.startingPosition' : json.dumps(startingEventPosition),
'eventhubs.endingPosition' : json.dumps(endingEventPosition),
'maxEventsPerTrigger': 1000

# Pipeline
def pipeline(context):
    spark = context.resources.pyspark.spark_session
    source = SparkEventhubSource(spark, ehConf).read_batch()
    transformer = BinaryToStringTransformer(source, "{source_column_name}", "{target_column_name}").transform()
    transformer = FledgeOPCUAJsonToPCDMTransformer(transformer, "{source_column_name}").transform()
    SparkDeltaDestination(transformer, {}, "{path_to_table}").write_batch()

def fledge_pipeline():

fledge_pipeline_job = fledge_pipeline.to_job(
                   "spark": my_pyspark_resource

defs = Definitions(jobs=[fledge_pipeline_job])


The following command deploys the pipeline to dagster: dagster dev -f <path/to/>

Using the link provided from the command above, click on Launchpad and hit run to run the pipeline.