Skip to content

Out of Range Value Filter

OutOfRangeValueFilter

Bases: DataManipulationBaseInterface

Filters data in a DataFrame by checking the 'Value' column against expected ranges for specified TagNames. Logs events when 'Value' exceeds the defined ranges for any TagName and deletes the rows.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame to monitor.

required
tag_ranges dict

A dictionary where keys are TagNames and values are dictionaries specifying 'min' and/or 'max', and optionally 'inclusive_bounds' values. Example: { 'A2PS64V0J.:ZUX09R': {'min': 0, 'max': 100, 'inclusive_bounds': True}, 'B3TS64V0K.:ZUX09R': {'min': 10, 'max': 200, 'inclusive_bounds': False}, }

required
Example
from pyspark.sql import SparkSession
from rtdip_sdk.pipelines.data_quality.data_manipulation.spark.out_of_range_value_filter import OutOfRangeValueFilter


spark = SparkSession.builder.master("local[1]").appName("DeleteOutOfRangeValuesExample").getOrCreate()

data = [
    ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:45.000", "Good", 25.0),
    ("A2PS64V0J.:ZUX09R", "2024-01-02 07:53:11.000", "Good", -5.0),
    ("A2PS64V0J.:ZUX09R", "2024-01-02 11:56:42.000", "Good", 50.0),
    ("B3TS64V0K.:ZUX09R", "2024-01-02 16:00:12.000", "Good", 80.0),
    ("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.000", "Good", 100.0),
]

columns = ["TagName", "EventTime", "Status", "Value"]

df = spark.createDataFrame(data, columns)

tag_ranges = {
    "A2PS64V0J.:ZUX09R": {"min": 0, "max": 50, "inclusive_bounds": True},
    "B3TS64V0K.:ZUX09R": {"min": 50, "max": 100, "inclusive_bounds": False},
}

out_of_range_value_filter = OutOfRangeValueFilter(
    df=df,
    tag_ranges=tag_ranges,
)

result_df = out_of_range_value_filter.filter_data()
Source code in src/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/out_of_range_value_filter.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
class OutOfRangeValueFilter(DataManipulationBaseInterface):
    """
    Filters data in a DataFrame by checking the 'Value' column against expected ranges for specified TagNames.
    Logs events when 'Value' exceeds the defined ranges for any TagName and deletes the rows.

    Args:
        df (pyspark.sql.DataFrame): The DataFrame to monitor.
        tag_ranges (dict): A dictionary where keys are TagNames and values are dictionaries specifying 'min' and/or
            'max', and optionally 'inclusive_bounds' values.
            Example:
                {
                    'A2PS64V0J.:ZUX09R': {'min': 0, 'max': 100, 'inclusive_bounds': True},
                    'B3TS64V0K.:ZUX09R': {'min': 10, 'max': 200, 'inclusive_bounds': False},
                }

    Example:
        ```python
        from pyspark.sql import SparkSession
        from rtdip_sdk.pipelines.data_quality.data_manipulation.spark.out_of_range_value_filter import OutOfRangeValueFilter


        spark = SparkSession.builder.master("local[1]").appName("DeleteOutOfRangeValuesExample").getOrCreate()

        data = [
            ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:45.000", "Good", 25.0),
            ("A2PS64V0J.:ZUX09R", "2024-01-02 07:53:11.000", "Good", -5.0),
            ("A2PS64V0J.:ZUX09R", "2024-01-02 11:56:42.000", "Good", 50.0),
            ("B3TS64V0K.:ZUX09R", "2024-01-02 16:00:12.000", "Good", 80.0),
            ("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.000", "Good", 100.0),
        ]

        columns = ["TagName", "EventTime", "Status", "Value"]

        df = spark.createDataFrame(data, columns)

        tag_ranges = {
            "A2PS64V0J.:ZUX09R": {"min": 0, "max": 50, "inclusive_bounds": True},
            "B3TS64V0K.:ZUX09R": {"min": 50, "max": 100, "inclusive_bounds": False},
        }

        out_of_range_value_filter = OutOfRangeValueFilter(
            df=df,
            tag_ranges=tag_ranges,
        )

        result_df = out_of_range_value_filter.filter_data()
        ```
    """

    df: PySparkDataFrame

    def __init__(
        self,
        df: PySparkDataFrame,
        tag_ranges: dict,
    ) -> None:
        self.df = df
        self.check_value_ranges = CheckValueRanges(df=df, tag_ranges=tag_ranges)

        # Configure logging
        self.logger = logging.getLogger(self.__class__.__name__)
        if not self.logger.handlers:
            handler = logging.StreamHandler()
            formatter = logging.Formatter(
                "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
            )
            handler.setFormatter(formatter)
            self.logger.addHandler(handler)
            self.logger.setLevel(logging.INFO)

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK
        """
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        libraries = Libraries()
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def filter_data(self) -> PySparkDataFrame:
        """
        Executes the value range checking logic for the specified TagNames. Identifies, logs and deletes any rows
        where 'Value' exceeds the defined ranges for each TagName.

        Returns:
            pyspark.sql.DataFrame:
                Returns a PySpark DataFrame without the rows that were out of range.
        """
        out_of_range_df = self.check_value_ranges.check_for_out_of_range()

        if out_of_range_df.count() > 0:
            self.check_value_ranges.log_out_of_range_values(out_of_range_df)
        else:
            self.logger.info(f"No out of range values found in 'Value' column.")
        return self.df.subtract(out_of_range_df)

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK

Source code in src/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/out_of_range_value_filter.py
 95
 96
 97
 98
 99
100
101
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK
    """
    return SystemType.PYSPARK

filter_data()

Executes the value range checking logic for the specified TagNames. Identifies, logs and deletes any rows where 'Value' exceeds the defined ranges for each TagName.

Returns:

Type Description
DataFrame

pyspark.sql.DataFrame: Returns a PySpark DataFrame without the rows that were out of range.

Source code in src/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/out_of_range_value_filter.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def filter_data(self) -> PySparkDataFrame:
    """
    Executes the value range checking logic for the specified TagNames. Identifies, logs and deletes any rows
    where 'Value' exceeds the defined ranges for each TagName.

    Returns:
        pyspark.sql.DataFrame:
            Returns a PySpark DataFrame without the rows that were out of range.
    """
    out_of_range_df = self.check_value_ranges.check_for_out_of_range()

    if out_of_range_df.count() > 0:
        self.check_value_ranges.log_out_of_range_values(out_of_range_df)
    else:
        self.logger.info(f"No out of range values found in 'Value' column.")
    return self.df.subtract(out_of_range_df)