Out of Range Value Filter
OutOfRangeValueFilter
Bases: DataManipulationBaseInterface
Filters data in a DataFrame by checking the 'Value' column against expected ranges for specified TagNames. Logs events when 'Value' exceeds the defined ranges for any TagName and deletes the rows.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df |
DataFrame
|
The DataFrame to monitor. |
required |
tag_ranges |
dict
|
A dictionary where keys are TagNames and values are dictionaries specifying 'min' and/or 'max', and optionally 'inclusive_bounds' values. Example: { 'A2PS64V0J.:ZUX09R': {'min': 0, 'max': 100, 'inclusive_bounds': True}, 'B3TS64V0K.:ZUX09R': {'min': 10, 'max': 200, 'inclusive_bounds': False}, } |
required |
Example
from pyspark.sql import SparkSession
from rtdip_sdk.pipelines.data_quality.data_manipulation.spark.out_of_range_value_filter import OutOfRangeValueFilter
spark = SparkSession.builder.master("local[1]").appName("DeleteOutOfRangeValuesExample").getOrCreate()
data = [
("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:45.000", "Good", 25.0),
("A2PS64V0J.:ZUX09R", "2024-01-02 07:53:11.000", "Good", -5.0),
("A2PS64V0J.:ZUX09R", "2024-01-02 11:56:42.000", "Good", 50.0),
("B3TS64V0K.:ZUX09R", "2024-01-02 16:00:12.000", "Good", 80.0),
("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.000", "Good", 100.0),
]
columns = ["TagName", "EventTime", "Status", "Value"]
df = spark.createDataFrame(data, columns)
tag_ranges = {
"A2PS64V0J.:ZUX09R": {"min": 0, "max": 50, "inclusive_bounds": True},
"B3TS64V0K.:ZUX09R": {"min": 50, "max": 100, "inclusive_bounds": False},
}
out_of_range_value_filter = OutOfRangeValueFilter(
df=df,
tag_ranges=tag_ranges,
)
result_df = out_of_range_value_filter.filter_data()
Source code in src/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/out_of_range_value_filter.py
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 | |
system_type()
staticmethod
Attributes:
| Name | Type | Description |
|---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/out_of_range_value_filter.py
95 96 97 98 99 100 101 | |
filter_data()
Executes the value range checking logic for the specified TagNames. Identifies, logs and deletes any rows where 'Value' exceeds the defined ranges for each TagName.
Returns:
| Type | Description |
|---|---|
DataFrame
|
pyspark.sql.DataFrame: Returns a PySpark DataFrame without the rows that were out of range. |
Source code in src/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/out_of_range_value_filter.py
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 | |