Skip to content

Convert Mirico Json to Process Control Data Model

MiricoJsonToPCDMTransformer

Bases: TransformerInterface

Converts a Spark Dataframe column containing a json string created from Mirico to the Process Control Data Model.

Example

from rtdip_sdk.pipelines.transformers import MiricoJsonToPCDMTransformer

mirico_json_to_pcdm_transformer = MiricoJsonToPCDMTransformer(
    data=df
    source_column_name="body",
    status_null_value="Good",
    change_type_value="insert"
    tagname_field="test"
)

result = mirico_json_to_pcdm_transformer.transform()

Parameters:

Name Type Description Default
data DataFrame

Dataframe containing the column with SEM data

required
source_column_name str

Spark Dataframe column containing the OPC Publisher Json OPC UA data

required
status_null_value optional str

If populated, will replace 'Good' in the Status column with the specified value.

'Good'
change_type_value optional str

If populated, will replace 'insert' in the ChangeType column with the specified value.

'insert'
tagname_field optional str

If populated, will add the specified field to the TagName column.

None
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/mirico_json_to_pcdm.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
class MiricoJsonToPCDMTransformer(TransformerInterface):
    """
    Converts a Spark Dataframe column containing a json string created from Mirico to the Process Control Data Model.

    Example
    --------
    ```python
    from rtdip_sdk.pipelines.transformers import MiricoJsonToPCDMTransformer

    mirico_json_to_pcdm_transformer = MiricoJsonToPCDMTransformer(
        data=df
        source_column_name="body",
        status_null_value="Good",
        change_type_value="insert"
        tagname_field="test"
    )

    result = mirico_json_to_pcdm_transformer.transform()
    ```

    Parameters:
        data (DataFrame): Dataframe containing the column with SEM data
        source_column_name (str): Spark Dataframe column containing the OPC Publisher Json OPC UA data
        status_null_value (optional str): If populated, will replace 'Good' in the Status column with the specified value.
        change_type_value (optional str): If populated, will replace 'insert' in the ChangeType column with the specified value.
        tagname_field (optional str): If populated, will add the specified field to the TagName column.
    """

    data: DataFrame
    source_column_name: str
    status_null_value: str
    change_type_value: str

    def __init__(
        self,
        data: DataFrame,
        source_column_name: str,
        status_null_value: str = "Good",
        change_type_value: str = "insert",
        tagname_field: str = None,
    ) -> None:
        _package_version_meets_minimum("pyspark", "3.4.0")
        self.data = data
        self.source_column_name = source_column_name
        self.status_null_value = status_null_value
        self.change_type_value = change_type_value
        self.tagname_field = tagname_field

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK
        """
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        libraries = Libraries()
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def pre_transform_validation(self):
        return True

    def post_transform_validation(self):
        return True

    def transform(self) -> DataFrame:
        """
        Returns:
            DataFrame: A dataframe with the specified column converted to PCDM
        """

        mapping = mirico_field_mappings.MIRICO_FIELD_MAPPINGS
        df = (
            self.data.withColumn(
                self.source_column_name,
                from_json(self.source_column_name, "map<string,string>"),
            )
            .withColumn("TagName", map_keys("body"))
            .withColumn("Value", map_values("body"))
            .select(
                map_from_arrays("TagName", "Value").alias("x"),
                to_timestamp(col("x.timeStamp")).alias("EventTime"),
                col("x.siteName").alias("siteName"),
                col("x.gasType").alias("gasType"),
                col("x.retroName").alias("retroName"),
            )
            .select("EventTime", "siteName", "gasType", "retroName", posexplode("x"))
            .withColumn(
                "ValueType", udf(lambda row: mapping[row]["ValueType"])(col("pos"))
            )
            .withColumn("Status", lit("Good"))
            .withColumn("ChangeType", lit("insert"))
            .withColumn(
                "TagName",
                when(
                    lit(self.tagname_field).isNotNull(),
                    concat_ws(
                        ":",
                        *[
                            upper(lit(self.tagname_field)),
                            concat_ws(
                                "_",
                                *[
                                    upper(col("siteName")),
                                    upper(col("retroName")),
                                    when(
                                        upper(col("key")) == "GASPPM",
                                        concat_ws(
                                            "_",
                                            *[upper(col("key")), upper(col("gasType"))]
                                        ),
                                    ).otherwise(upper(col("key"))),
                                ]
                            ),
                        ]
                    ),
                ).otherwise(
                    concat_ws(
                        "_",
                        *[
                            upper(col("siteName")),
                            upper(col("retroName")),
                            when(
                                upper(col("key")) == "GASPPM",
                                concat_ws(
                                    "_", *[upper(col("key")), upper(col("gasType"))]
                                ),
                            ).otherwise(upper(col("key"))),
                        ]
                    )
                ),
            )
            .filter(
                ~col("key").isin(
                    "timeStamp",
                    "gasType",
                    "retroLongitude",
                    "retroLatitude",
                    "retroAltitude",
                    "sensorLongitude",
                    "sensorLatitude",
                    "sensorAltitude",
                    "siteName",
                    "siteKey",
                    "retroName",
                )
            )
        )
        return df.select(
            "EventTime", "TagName", "Status", "Value", "ValueType", "ChangeType"
        )

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK

Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/mirico_json_to_pcdm.py
86
87
88
89
90
91
92
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK
    """
    return SystemType.PYSPARK

transform()

Returns:

Name Type Description
DataFrame DataFrame

A dataframe with the specified column converted to PCDM

Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/mirico_json_to_pcdm.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def transform(self) -> DataFrame:
    """
    Returns:
        DataFrame: A dataframe with the specified column converted to PCDM
    """

    mapping = mirico_field_mappings.MIRICO_FIELD_MAPPINGS
    df = (
        self.data.withColumn(
            self.source_column_name,
            from_json(self.source_column_name, "map<string,string>"),
        )
        .withColumn("TagName", map_keys("body"))
        .withColumn("Value", map_values("body"))
        .select(
            map_from_arrays("TagName", "Value").alias("x"),
            to_timestamp(col("x.timeStamp")).alias("EventTime"),
            col("x.siteName").alias("siteName"),
            col("x.gasType").alias("gasType"),
            col("x.retroName").alias("retroName"),
        )
        .select("EventTime", "siteName", "gasType", "retroName", posexplode("x"))
        .withColumn(
            "ValueType", udf(lambda row: mapping[row]["ValueType"])(col("pos"))
        )
        .withColumn("Status", lit("Good"))
        .withColumn("ChangeType", lit("insert"))
        .withColumn(
            "TagName",
            when(
                lit(self.tagname_field).isNotNull(),
                concat_ws(
                    ":",
                    *[
                        upper(lit(self.tagname_field)),
                        concat_ws(
                            "_",
                            *[
                                upper(col("siteName")),
                                upper(col("retroName")),
                                when(
                                    upper(col("key")) == "GASPPM",
                                    concat_ws(
                                        "_",
                                        *[upper(col("key")), upper(col("gasType"))]
                                    ),
                                ).otherwise(upper(col("key"))),
                            ]
                        ),
                    ]
                ),
            ).otherwise(
                concat_ws(
                    "_",
                    *[
                        upper(col("siteName")),
                        upper(col("retroName")),
                        when(
                            upper(col("key")) == "GASPPM",
                            concat_ws(
                                "_", *[upper(col("key")), upper(col("gasType"))]
                            ),
                        ).otherwise(upper(col("key"))),
                    ]
                )
            ),
        )
        .filter(
            ~col("key").isin(
                "timeStamp",
                "gasType",
                "retroLongitude",
                "retroLatitude",
                "retroAltitude",
                "sensorLongitude",
                "sensorLatitude",
                "sensorAltitude",
                "siteName",
                "siteKey",
                "retroName",
            )
        )
    )
    return df.select(
        "EventTime", "TagName", "Status", "Value", "ValueType", "ChangeType"
    )