Flatline Filter
FlatlineFilter
Bases: DataManipulationBaseInterface
Removes and logs rows with flatlining detected in specified columns of a PySpark DataFrame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df |
DataFrame
|
The input DataFrame to process. |
required |
watch_columns |
list
|
List of column names to monitor for flatlining (null or zero values). |
required |
tolerance_timespan |
int
|
Maximum allowed consecutive flatlining period. Rows exceeding this period are removed. |
required |
Example
from pyspark.sql import SparkSession
from rtdip_sdk.pipelines.data_quality.data_manipulation.spark.flatline_filter import FlatlineFilter
spark = SparkSession.builder.master("local[1]").appName("FlatlineFilterExample").getOrCreate()
# Example DataFrame
data = [
(1, "2024-01-02 03:49:45.000", 0.0),
(1, "2024-01-02 03:50:45.000", 0.0),
(1, "2024-01-02 03:51:45.000", 0.0),
(2, "2024-01-02 03:49:45.000", 5.0),
]
columns = ["TagName", "EventTime", "Value"]
df = spark.createDataFrame(data, columns)
filter_flatlining_rows = FlatlineFilter(
df=df,
watch_columns=["Value"],
tolerance_timespan=2,
)
result_df = filter_flatlining_rows.filter_data()
result_df.show()
Source code in src/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/flatline_filter.py
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 | |
filter_data()
Removes rows with flatlining detected.
Returns:
| Type | Description |
|---|---|
DataFrame
|
pyspark.sql.DataFrame: A DataFrame without rows with flatlining detected. |
Source code in src/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/flatline_filter.py
83 84 85 86 87 88 89 90 91 92 | |