Skip to content

Convert Edge Xpert Json to Process Control Data Model

EdgeXOPCUAJsonToPCDMTransformer

Bases: TransformerInterface

Converts a Spark Dataframe column containing a json string created by EdgeX to the Process Control Data Model

Parameters:

Name Type Description Default
data DataFrame

Dataframe containing the column with EdgeX data

required
source_column_name str

Spark Dataframe column containing the OPC Publisher Json OPC UA data

required
status_null_value optional str

If populated, will replace 'Good' in the Status column with the specified value.

'Good'
change_type_value optional str

If populated, will replace 'insert' in the ChangeType column with the specified value.

'insert'
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/edgex_opcua_json_to_pcdm.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
class EdgeXOPCUAJsonToPCDMTransformer(TransformerInterface):
    """
    Converts a Spark Dataframe column containing a json string created by EdgeX to the Process Control Data Model

    Args:
        data (DataFrame): Dataframe containing the column with EdgeX data
        source_column_name (str): Spark Dataframe column containing the OPC Publisher Json OPC UA data
        status_null_value (optional str): If populated, will replace 'Good' in the Status column with the specified value.
        change_type_value (optional str): If populated, will replace 'insert' in the ChangeType column with the specified value.
    """

    data: DataFrame
    source_column_name: str
    status_null_value: str
    change_type_value: str
    tagname_field: str

    def __init__(
        self,
        data: DataFrame,
        source_column_name: str,
        status_null_value: str = "Good",
        change_type_value: str = "insert",
        tagname_field="resourceName",
    ) -> None:
        self.data = data
        self.source_column_name = source_column_name
        self.status_null_value = status_null_value
        self.change_type_value = change_type_value
        self.tagname_field = tagname_field

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK
        """
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        libraries = Libraries()
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def pre_transform_validation(self):
        return True

    def post_transform_validation(self):
        return True

    def transform(self) -> DataFrame:
        """
        Returns:
            DataFrame: A dataframe with the specified column converted to PCDM
        """
        df = (
            self.data.withColumn(
                self.source_column_name,
                from_json(self.source_column_name, EDGEX_SCHEMA),
            )
            .select("*", explode("{}.readings".format(self.source_column_name)))
            .selectExpr(
                "explode({}.readings.{}) as TagName".format(
                    self.source_column_name, self.tagname_field
                ),
                "to_utc_timestamp(to_timestamp((col.origin / 1000000000)), current_timezone()) as EventTime",
                "col.value as Value",
                "col.valueType as ValueType",
            )
            .withColumn("Status", lit(self.status_null_value))
            .withColumn("ChangeType", lit(self.change_type_value))
            .withColumn(
                "ValueType",
                (
                    when(col("ValueType") == "Int8", "integer")
                    .when(col("ValueType") == "Int16", "integer")
                    .when(col("ValueType") == "Int32", "integer")
                    .when(col("ValueType") == "Int64", "integer")
                    .when(col("ValueType") == "Uint8", "integer")
                    .when(col("ValueType") == "Uint16", "integer")
                    .when(col("ValueType") == "Uint32", "integer")
                    .when(col("ValueType") == "Uint64", "integer")
                    .when(col("ValueType") == "Float32", "float")
                    .when(col("ValueType") == "Float64", "float")
                    .when(col("ValueType") == "Bool", "bool")
                    .otherwise("string")
                ),
            )
        )

        return df.select(
            "TagName", "EventTime", "Status", "Value", "ValueType", "ChangeType"
        )

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK

Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/edgex_opcua_json_to_pcdm.py
54
55
56
57
58
59
60
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK
    """
    return SystemType.PYSPARK

transform()

Returns:

Name Type Description
DataFrame DataFrame

A dataframe with the specified column converted to PCDM

Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/edgex_opcua_json_to_pcdm.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def transform(self) -> DataFrame:
    """
    Returns:
        DataFrame: A dataframe with the specified column converted to PCDM
    """
    df = (
        self.data.withColumn(
            self.source_column_name,
            from_json(self.source_column_name, EDGEX_SCHEMA),
        )
        .select("*", explode("{}.readings".format(self.source_column_name)))
        .selectExpr(
            "explode({}.readings.{}) as TagName".format(
                self.source_column_name, self.tagname_field
            ),
            "to_utc_timestamp(to_timestamp((col.origin / 1000000000)), current_timezone()) as EventTime",
            "col.value as Value",
            "col.valueType as ValueType",
        )
        .withColumn("Status", lit(self.status_null_value))
        .withColumn("ChangeType", lit(self.change_type_value))
        .withColumn(
            "ValueType",
            (
                when(col("ValueType") == "Int8", "integer")
                .when(col("ValueType") == "Int16", "integer")
                .when(col("ValueType") == "Int32", "integer")
                .when(col("ValueType") == "Int64", "integer")
                .when(col("ValueType") == "Uint8", "integer")
                .when(col("ValueType") == "Uint16", "integer")
                .when(col("ValueType") == "Uint32", "integer")
                .when(col("ValueType") == "Uint64", "integer")
                .when(col("ValueType") == "Float32", "float")
                .when(col("ValueType") == "Float64", "float")
                .when(col("ValueType") == "Bool", "bool")
                .otherwise("string")
            ),
        )
    )

    return df.select(
        "TagName", "EventTime", "Status", "Value", "ValueType", "ChangeType"
    )