Skip to content

Convert SSIP PI Binary File data to the Process Control Data Model

SSIPPIBinaryFileToPCDMTransformer

Bases: TransformerInterface

Converts a Spark DataFrame column containing binaryFile parquet data to the Process Control Data Model.

This DataFrame should contain a path and the binary data. Typically this can be done using the Autoloader source component and specify "binaryFile" as the format.

For more information about the SSIP PI Batch Connector, please see here.

Example

from rtdip_sdk.pipelines.transformers import SSIPPIBinaryFileToPCDMTransformer

ssip_pi_binary_file_to_pcdm_transformer = SSIPPIBinaryFileToPCDMTransformer(
    data=df
)

result = ssip_pi_binary_file_to_pcdm_transformer.transform()

Parameters:

Name Type Description Default
data DataFrame

DataFrame containing the path and binaryFile data

required
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/ssip_pi_binary_file_to_pcdm.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
class SSIPPIBinaryFileToPCDMTransformer(TransformerInterface):
    """
    Converts a Spark DataFrame column containing binaryFile parquet data to the Process Control Data Model.

    This DataFrame should contain a path and the binary data. Typically this can be done using the Autoloader source component and specify "binaryFile" as the format.

    For more information about the SSIP PI Batch Connector, please see [here.](https://bakerhughesc3.ai/oai-solution/shell-sensor-intelligence-platform/)

    Example
    --------
    ```python
    from rtdip_sdk.pipelines.transformers import SSIPPIBinaryFileToPCDMTransformer

    ssip_pi_binary_file_to_pcdm_transformer = SSIPPIBinaryFileToPCDMTransformer(
        data=df
    )

    result = ssip_pi_binary_file_to_pcdm_transformer.transform()
    ```

    Parameters:
        data (DataFrame): DataFrame containing the path and binaryFile data
    """

    data: DataFrame

    def __init__(self, data: DataFrame) -> None:
        self.data = data

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK
        """
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        libraries = Libraries()
        libraries.add_pypi_library(get_default_package("pyarrow"))
        libraries.add_pypi_library(get_default_package("pandas"))
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def pre_transform_validation(self):
        return True

    def post_transform_validation(self):
        return True

    @staticmethod
    def _convert_binary_to_pandas(pdf):
        try:
            binary_list = pdf.values.tolist()
            binary_data = binary_list[0][3]
            buf = pa.py_buffer(binary_data)
            table = pq.read_table(buf)
        except Exception:
            return pd.DataFrame(
                {
                    "EventDate": pd.Series([], dtype="datetime64[ns]"),
                    "TagName": pd.Series([], dtype="str"),
                    "EventTime": pd.Series([], dtype="datetime64[ns]"),
                    "Status": pd.Series([], dtype="str"),
                    "Value": pd.Series([], dtype="str"),
                    "ValueType": pd.Series([], dtype="str"),
                    "ChangeType": pd.Series([], dtype="str"),
                }
            )

        output_pdf = table.to_pandas()

        if "ValueType" not in output_pdf.columns:
            value_type = str(table.schema.field("Value").type)
            if value_type == "int16" or value_type == "int32":
                value_type = "integer"
            output_pdf["ValueType"] = value_type

        if "ChangeType" not in output_pdf.columns:
            output_pdf["ChangeType"] = "insert"

        output_pdf["EventDate"] = output_pdf["EventTime"].dt.date
        output_pdf["Value"] = output_pdf["Value"].astype(str)
        output_pdf = output_pdf[
            [
                "EventDate",
                "TagName",
                "EventTime",
                "Status",
                "Value",
                "ValueType",
                "ChangeType",
            ]
        ]
        return output_pdf

    def transform(self) -> DataFrame:
        """
        Returns:
            DataFrame: A dataframe with the provided Binary data convert to PCDM
        """
        return self.data.groupBy("path").applyInPandas(
            SSIPPIBinaryFileToPCDMTransformer._convert_binary_to_pandas,
            schema="EventDate DATE, TagName STRING, EventTime TIMESTAMP, Status STRING, Value STRING, ValueType STRING, ChangeType STRING",
        )

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK

Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/ssip_pi_binary_file_to_pcdm.py
54
55
56
57
58
59
60
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK
    """
    return SystemType.PYSPARK

transform()

Returns:

Name Type Description
DataFrame DataFrame

A dataframe with the provided Binary data convert to PCDM

Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/ssip_pi_binary_file_to_pcdm.py
125
126
127
128
129
130
131
132
133
def transform(self) -> DataFrame:
    """
    Returns:
        DataFrame: A dataframe with the provided Binary data convert to PCDM
    """
    return self.data.groupBy("path").applyInPandas(
        SSIPPIBinaryFileToPCDMTransformer._convert_binary_to_pandas,
        schema="EventDate DATE, TagName STRING, EventTime TIMESTAMP, Status STRING, Value STRING, ValueType STRING, ChangeType STRING",
    )