Skip to content

Convert Honeywell APM Json to Process Control Data Model

HoneywellAPMJsonToPCDMTransformer

Bases: TransformerInterface

Converts a Spark Dataframe column containing a json string created by Honeywell APM to the Process Control Data Model.

Example

from rtdip_sdk.pipelines.transformers import HoneywellAPMJsonToPCDMTransformer

honeywell_apm_json_to_pcdm_transformer = HoneywellAPMJsonToPCDMTransformer(
    data=df,
    souce_column_name="body",
    status_null_value="Good",
    change_type_value="insert"
)

result = honeywell_apm_json_to_pcdm_transformer.transform()

Parameters:

Name Type Description Default
data DataFrame

Dataframe containing the column with EdgeX data

required
source_column_name str

Spark Dataframe column containing the OPC Publisher Json OPC UA data

required
status_null_value optional str

If populated, will replace 'Good' in the Status column with the specified value.

'Good'
change_type_value optional str

If populated, will replace 'insert' in the ChangeType column with the specified value.

'insert'
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/honeywell_apm_to_pcdm.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
class HoneywellAPMJsonToPCDMTransformer(TransformerInterface):
    """
    Converts a Spark Dataframe column containing a json string created by Honeywell APM to the Process Control Data Model.

    Example
    --------
    ```python
    from rtdip_sdk.pipelines.transformers import HoneywellAPMJsonToPCDMTransformer

    honeywell_apm_json_to_pcdm_transformer = HoneywellAPMJsonToPCDMTransformer(
        data=df,
        souce_column_name="body",
        status_null_value="Good",
        change_type_value="insert"
    )

    result = honeywell_apm_json_to_pcdm_transformer.transform()
    ```

    Parameters:
        data (DataFrame): Dataframe containing the column with EdgeX data
        source_column_name (str): Spark Dataframe column containing the OPC Publisher Json OPC UA data
        status_null_value (optional str): If populated, will replace 'Good' in the Status column with the specified value.
        change_type_value (optional str): If populated, will replace 'insert' in the ChangeType column with the specified value.
    """

    data: DataFrame
    source_column_name: str
    status_null_value: str
    change_type_value: str

    def __init__(
        self,
        data: DataFrame,
        source_column_name: str,
        status_null_value: str = "Good",
        change_type_value: str = "insert",
    ) -> None:
        self.data = data
        self.source_column_name = source_column_name
        self.status_null_value = status_null_value
        self.change_type_value = change_type_value

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK
        """
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        libraries = Libraries()
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def pre_transform_validation(self):
        return True

    def post_transform_validation(self):
        return True

    def transform(self) -> DataFrame:
        """
        Returns:
            DataFrame: A dataframe with the specified column converted to PCDM
        """
        df = (
            self.data.withColumn("body", from_json(self.source_column_name, APM_SCHEMA))
            .select(explode("body.SystemTimeSeries.Samples"))
            .selectExpr("*", "to_timestamp(col.Time) as EventTime")
            .withColumn("TagName", col("col.Itemname"))
            .withColumn("Status", lit(self.status_null_value))
            .withColumn("Value", col("col.Value"))
            .withColumn(
                "ValueType",
                when(col("value").cast("float").isNotNull(), "float").when(
                    col("value").cast("float").isNull(), "string"
                ),
            )
            .withColumn("ChangeType", lit(self.change_type_value))
        )

        return df.select(
            "TagName", "EventTime", "Status", "Value", "ValueType", "ChangeType"
        )

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK

Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/honeywell_apm_to_pcdm.py
66
67
68
69
70
71
72
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK
    """
    return SystemType.PYSPARK

transform()

Returns:

Name Type Description
DataFrame DataFrame

A dataframe with the specified column converted to PCDM

Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/honeywell_apm_to_pcdm.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def transform(self) -> DataFrame:
    """
    Returns:
        DataFrame: A dataframe with the specified column converted to PCDM
    """
    df = (
        self.data.withColumn("body", from_json(self.source_column_name, APM_SCHEMA))
        .select(explode("body.SystemTimeSeries.Samples"))
        .selectExpr("*", "to_timestamp(col.Time) as EventTime")
        .withColumn("TagName", col("col.Itemname"))
        .withColumn("Status", lit(self.status_null_value))
        .withColumn("Value", col("col.Value"))
        .withColumn(
            "ValueType",
            when(col("value").cast("float").isNotNull(), "float").when(
                col("value").cast("float").isNull(), "string"
            ),
        )
        .withColumn("ChangeType", lit(self.change_type_value))
    )

    return df.select(
        "TagName", "EventTime", "Status", "Value", "ValueType", "ChangeType"
    )