Skip to content

Convert SSIP PI Binary File data to the Process Control Data Model

SSIPPIBinaryFileToPCDMTransformer

Bases: TransformerInterface

Converts a Spark DataFrame column containing binaryFile parquet data to the Process Control Data Model.

This DataFrame should contain a path and the binary data. Typically this can be done using the Autoloader source component and specify "binaryFile" as the format.

For more information about the SSIP PI Batch Connector, please see here.

Parameters:

Name Type Description Default
data DataFrame

DataFrame containing the path and binaryFile data

required
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/ssip_pi_binary_file_to_pcdm.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
class SSIPPIBinaryFileToPCDMTransformer(TransformerInterface):
    """
    Converts a Spark DataFrame column containing binaryFile parquet data to the Process Control Data Model.

    This DataFrame should contain a path and the binary data. Typically this can be done using the Autoloader source component and specify "binaryFile" as the format.

    For more information about the SSIP PI Batch Connector, please see [here.](https://bakerhughesc3.ai/oai-solution/shell-sensor-intelligence-platform/)

    Args:
        data (DataFrame): DataFrame containing the path and binaryFile data
    """

    data: DataFrame

    def __init__(self, data: DataFrame) -> None:
        self.data = data

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK
        """
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        libraries = Libraries()
        libraries.add_pypi_library(get_default_package("pyarrow"))
        libraries.add_pypi_library(get_default_package("pandas"))
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def pre_transform_validation(self):
        return True

    def post_transform_validation(self):
        return True

    @staticmethod
    def _convert_binary_to_pandas(pdf):
        try:
            binary_list = pdf.values.tolist()
            binary_data = binary_list[0][3]
            buf = pa.py_buffer(binary_data)
            table = pq.read_table(buf)
        except Exception:
            return pd.DataFrame(
                {
                    "EventDate": pd.Series([], dtype="datetime64[ns]"),
                    "TagName": pd.Series([], dtype="str"),
                    "EventTime": pd.Series([], dtype="datetime64[ns]"),
                    "Status": pd.Series([], dtype="str"),
                    "Value": pd.Series([], dtype="str"),
                    "ValueType": pd.Series([], dtype="str"),
                    "ChangeType": pd.Series([], dtype="str"),
                }
            )

        value_type = str(table.schema.field("Value").type)
        if value_type == "int16" or value_type == "int32":
            value_type = "integer"

        output_pdf = table.to_pandas()

        output_pdf["EventDate"] = output_pdf["EventTime"].dt.date
        output_pdf["Value"] = output_pdf["Value"].astype(str)
        output_pdf["ChangeType"] = "insert"
        output_pdf["ValueType"] = value_type
        output_pdf = output_pdf[
            [
                "EventDate",
                "TagName",
                "EventTime",
                "Status",
                "Value",
                "ValueType",
                "ChangeType",
            ]
        ]
        return output_pdf

    def transform(self) -> DataFrame:
        """
        Returns:
            DataFrame: A dataframe with the provided Binary data convert to PCDM
        """
        return self.data.groupBy("path").applyInPandas(
            SSIPPIBinaryFileToPCDMTransformer._convert_binary_to_pandas,
            schema="EventDate DATE, TagName STRING, EventTime TIMESTAMP, Status STRING, Value STRING, ValueType STRING, ChangeType STRING",
        )

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK

Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/ssip_pi_binary_file_to_pcdm.py
42
43
44
45
46
47
48
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK
    """
    return SystemType.PYSPARK

transform()

Returns:

Name Type Description
DataFrame DataFrame

A dataframe with the provided Binary data convert to PCDM

Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/ssip_pi_binary_file_to_pcdm.py
110
111
112
113
114
115
116
117
118
def transform(self) -> DataFrame:
    """
    Returns:
        DataFrame: A dataframe with the provided Binary data convert to PCDM
    """
    return self.data.groupBy("path").applyInPandas(
        SSIPPIBinaryFileToPCDMTransformer._convert_binary_to_pandas,
        schema="EventDate DATE, TagName STRING, EventTime TIMESTAMP, Status STRING, Value STRING, ValueType STRING, ChangeType STRING",
    )