Skip to content

Convert OPC Publisher Json to Process Control Data Model

OPCPublisherOPCUAJsonToPCDMTransformer

Bases: TransformerInterface

Converts a Spark Dataframe column containing a json string created by OPC Publisher to the Process Control Data Model

Parameters:

Name Type Description Default
data DataFrame

Dataframe containing the column with Json OPC UA data

required
source_column_name str

Spark Dataframe column containing the OPC Publisher Json OPC UA data

required
multiple_rows_per_message optional bool

Each Dataframe Row contains an array of/multiple OPC UA messages. The list of Json will be exploded into rows in the Dataframe.

True
status_null_value optional str

If populated, will replace null values in the Status column with the specified value.

None
timestamp_formats optional list[str]

Specifies the timestamp formats to be used for converting the timestamp string to a Timestamp Type. For more information on formats, refer to this documentation.

["yyyy-MM-dd'T'HH:mm:ss.SSSX", "yyyy-MM-dd'T'HH:mm:ssX"]
filter optional str

Enables providing a filter to the data which can be required in certain scenarios. For example, it would be possible to filter on IoT Hub Device Id and Module by providing a filter in SQL format such as systemProperties.iothub-connection-device-id = "<Device Id>" AND systemProperties.iothub-connection-module-id = "<Module>"

None
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/opc_publisher_opcua_json_to_pcdm.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
class OPCPublisherOPCUAJsonToPCDMTransformer(TransformerInterface):
    """
    Converts a Spark Dataframe column containing a json string created by OPC Publisher to the Process Control Data Model

    Args:
        data (DataFrame): Dataframe containing the column with Json OPC UA data
        source_column_name (str): Spark Dataframe column containing the OPC Publisher Json OPC UA data
        multiple_rows_per_message (optional bool): Each Dataframe Row contains an array of/multiple OPC UA messages. The list of Json will be exploded into rows in the Dataframe.
        status_null_value (optional str): If populated, will replace null values in the Status column with the specified value.
        timestamp_formats (optional list[str]): Specifies the timestamp formats to be used for converting the timestamp string to a Timestamp Type. For more information on formats, refer to this [documentation.](https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html)
        filter (optional str): Enables providing a filter to the data which can be required in certain scenarios. For example, it would be possible to filter on IoT Hub Device Id and Module by providing a filter in SQL format such as `systemProperties.iothub-connection-device-id = "<Device Id>" AND systemProperties.iothub-connection-module-id = "<Module>"`
    """

    data: DataFrame
    source_column_name: str
    multiple_rows_per_message: bool
    tagname_field: str
    status_null_value: str
    change_type_value: str
    timestamp_formats: list
    filter: str

    def __init__(
        self,
        data: DataFrame,
        source_column_name: str,
        multiple_rows_per_message: bool = True,
        tagname_field: str = "DisplayName",
        status_null_value: str = None,
        change_type_value: str = "insert",
        timestamp_formats: list = [
            "yyyy-MM-dd'T'HH:mm:ss.SSSX",
            "yyyy-MM-dd'T'HH:mm:ssX",
        ],
        filter: str = None,
    ) -> None:  # NOSONAR
        self.data = data
        self.source_column_name = source_column_name
        self.multiple_rows_per_message = multiple_rows_per_message
        self.tagname_field = tagname_field
        self.status_null_value = status_null_value
        self.change_type_value = change_type_value
        self.timestamp_formats = timestamp_formats
        self.filter = filter

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK
        """
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        libraries = Libraries()
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def pre_transform_validation(self):
        return True

    def post_transform_validation(self):
        return True

    def transform(self) -> DataFrame:
        """
        Returns:
            DataFrame: A dataframe with the specified column converted to PCDM
        """
        if self.multiple_rows_per_message:
            df = self.data.withColumn(
                self.source_column_name,
                from_json(col(self.source_column_name), ArrayType(StringType())),
            ).withColumn(self.source_column_name, explode(self.source_column_name))
        else:
            df = self.data.withColumn(
                self.source_column_name,
                from_json(col(self.source_column_name), StringType()),
            )

        if self.filter != None:
            df = df.where(self.filter)

        df = (
            df.withColumn(
                "OPCUA", from_json(col(self.source_column_name), OPC_PUBLISHER_SCHEMA)
            )
            .withColumn("TagName", (col("OPCUA.{}".format(self.tagname_field))))
            .withColumn(
                "EventTime",
                coalesce(
                    *[
                        to_timestamp(col("OPCUA.Value.SourceTimestamp"), f)
                        for f in self.timestamp_formats
                    ]
                ),
            )
            .withColumn("Value", col("OPCUA.Value.Value"))
            .withColumn(
                "ValueType",
                when(col("Value").cast("float").isNotNull(), "float")
                .when(col("Value").cast("float").isNull(), "string")
                .otherwise("unknown"),
            )
            .withColumn("ChangeType", lit(self.change_type_value))
        )

        status_col_name = "OPCUA.Value.StatusCode.Symbol"
        if self.status_null_value != None:
            df = df.withColumn(
                "Status",
                when(col(status_col_name).isNotNull(), col(status_col_name)).otherwise(
                    lit(self.status_null_value)
                ),
            )
        else:
            df = df.withColumn("Status", col(status_col_name))

        return df.select(
            "TagName", "EventTime", "Status", "Value", "ValueType", "ChangeType"
        )

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK

Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/opc_publisher_opcua_json_to_pcdm.py
77
78
79
80
81
82
83
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK
    """
    return SystemType.PYSPARK

transform()

Returns:

Name Type Description
DataFrame DataFrame

A dataframe with the specified column converted to PCDM

Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/opc_publisher_opcua_json_to_pcdm.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def transform(self) -> DataFrame:
    """
    Returns:
        DataFrame: A dataframe with the specified column converted to PCDM
    """
    if self.multiple_rows_per_message:
        df = self.data.withColumn(
            self.source_column_name,
            from_json(col(self.source_column_name), ArrayType(StringType())),
        ).withColumn(self.source_column_name, explode(self.source_column_name))
    else:
        df = self.data.withColumn(
            self.source_column_name,
            from_json(col(self.source_column_name), StringType()),
        )

    if self.filter != None:
        df = df.where(self.filter)

    df = (
        df.withColumn(
            "OPCUA", from_json(col(self.source_column_name), OPC_PUBLISHER_SCHEMA)
        )
        .withColumn("TagName", (col("OPCUA.{}".format(self.tagname_field))))
        .withColumn(
            "EventTime",
            coalesce(
                *[
                    to_timestamp(col("OPCUA.Value.SourceTimestamp"), f)
                    for f in self.timestamp_formats
                ]
            ),
        )
        .withColumn("Value", col("OPCUA.Value.Value"))
        .withColumn(
            "ValueType",
            when(col("Value").cast("float").isNotNull(), "float")
            .when(col("Value").cast("float").isNull(), "string")
            .otherwise("unknown"),
        )
        .withColumn("ChangeType", lit(self.change_type_value))
    )

    status_col_name = "OPCUA.Value.StatusCode.Symbol"
    if self.status_null_value != None:
        df = df.withColumn(
            "Status",
            when(col(status_col_name).isNotNull(), col(status_col_name)).otherwise(
                lit(self.status_null_value)
            ),
        )
    else:
        df = df.withColumn("Status", col(status_col_name))

    return df.select(
        "TagName", "EventTime", "Status", "Value", "ValueType", "ChangeType"
    )