Skip to content

Convert Fledge Json to Process Control Data Model

FledgeOPCUAJsonToPCDMTransformer

Bases: TransformerInterface

Converts a Spark Dataframe column containing a json string created by Fledge to the Process Control Data Model

Parameters:

Name Type Description Default
data DataFrame

Dataframe containing the column with Json Fledge data

required
source_column_name str

Spark Dataframe column containing the OPC Publisher Json OPC UA data

required
status_null_value str

If populated, will replace 'Good' in the Status column with the specified value.

'Good'
timestamp_formats list[str]

Specifies the timestamp formats to be used for converting the timestamp string to a Timestamp Type. For more information on formats, refer to this documentation.

["yyyy-MM-dd'T'HH:mm:ss.SSSX", "yyyy-MM-dd'T'HH:mm:ssX"]
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/fledge_opcua_json_to_pcdm.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
class FledgeOPCUAJsonToPCDMTransformer(TransformerInterface):
    """
    Converts a Spark Dataframe column containing a json string created by Fledge to the Process Control Data Model

    Args:
        data (DataFrame): Dataframe containing the column with Json Fledge data
        source_column_name (str): Spark Dataframe column containing the OPC Publisher Json OPC UA data
        status_null_value (str): If populated, will replace 'Good' in the Status column with the specified value.
        timestamp_formats (list[str]): Specifies the timestamp formats to be used for converting the timestamp string to a Timestamp Type. For more information on formats, refer to this [documentation.](https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html)
    """

    data: DataFrame
    source_column_name: str
    status_null_value: str
    change_type_value: str
    timestamp_formats: list

    def __init__(
        self,
        data: DataFrame,
        source_column_name: str,
        status_null_value: str = "Good",
        change_type_value: str = "insert",
        timestamp_formats: list = [
            "yyyy-MM-dd'T'HH:mm:ss.SSSX",
            "yyyy-MM-dd'T'HH:mm:ssX",
        ],
    ) -> None:  # NOSONAR
        self.data = data
        self.source_column_name = source_column_name
        self.status_null_value = status_null_value
        self.change_type_value = change_type_value
        self.timestamp_formats = timestamp_formats

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK
        """
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        libraries = Libraries()
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def pre_transform_validation(self):
        return True

    def post_transform_validation(self):
        return True

    def transform(self) -> DataFrame:
        """
        Returns:
            DataFrame: A dataframe with the specified column converted to PCDM
        """
        df = (
            self.data.withColumn(
                self.source_column_name,
                from_json(self.source_column_name, FLEDGE_SCHEMA),
            )
            .selectExpr("inline({})".format(self.source_column_name))
            .select(explode("readings"), "timestamp")
            .withColumn(
                "EventTime",
                coalesce(
                    *[to_timestamp(col("timestamp"), f) for f in self.timestamp_formats]
                ),
            )
            .withColumnRenamed("key", "TagName")
            .withColumnRenamed("value", "Value")
            .withColumn("Status", lit(self.status_null_value))
            .withColumn(
                "ValueType",
                when(col("value").cast("float").isNotNull(), "float").when(
                    col("value").cast("float").isNull(), "string"
                ),
            )
            .withColumn("ChangeType", lit(self.change_type_value))
        )

        return df.select(
            "TagName", "EventTime", "Status", "Value", "ValueType", "ChangeType"
        )

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK

Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/fledge_opcua_json_to_pcdm.py
65
66
67
68
69
70
71
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK
    """
    return SystemType.PYSPARK

transform()

Returns:

Name Type Description
DataFrame DataFrame

A dataframe with the specified column converted to PCDM

Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/fledge_opcua_json_to_pcdm.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def transform(self) -> DataFrame:
    """
    Returns:
        DataFrame: A dataframe with the specified column converted to PCDM
    """
    df = (
        self.data.withColumn(
            self.source_column_name,
            from_json(self.source_column_name, FLEDGE_SCHEMA),
        )
        .selectExpr("inline({})".format(self.source_column_name))
        .select(explode("readings"), "timestamp")
        .withColumn(
            "EventTime",
            coalesce(
                *[to_timestamp(col("timestamp"), f) for f in self.timestamp_formats]
            ),
        )
        .withColumnRenamed("key", "TagName")
        .withColumnRenamed("value", "Value")
        .withColumn("Status", lit(self.status_null_value))
        .withColumn(
            "ValueType",
            when(col("value").cast("float").isNotNull(), "float").when(
                col("value").cast("float").isNull(), "string"
            ),
        )
        .withColumn("ChangeType", lit(self.change_type_value))
    )

    return df.select(
        "TagName", "EventTime", "Status", "Value", "ValueType", "ChangeType"
    )