Skip to content

Flatline Filter

FlatlineFilter

Bases: DataManipulationBaseInterface

Removes and logs rows with flatlining detected in specified columns of a PySpark DataFrame.

Parameters:

Name Type Description Default
df DataFrame

The input DataFrame to process.

required
watch_columns list

List of column names to monitor for flatlining (null or zero values).

required
tolerance_timespan int

Maximum allowed consecutive flatlining period. Rows exceeding this period are removed.

required
Example
from pyspark.sql import SparkSession
from rtdip_sdk.pipelines.data_quality.data_manipulation.spark.flatline_filter import FlatlineFilter


spark = SparkSession.builder.master("local[1]").appName("FlatlineFilterExample").getOrCreate()

# Example DataFrame
data = [
    (1, "2024-01-02 03:49:45.000", 0.0),
    (1, "2024-01-02 03:50:45.000", 0.0),
    (1, "2024-01-02 03:51:45.000", 0.0),
    (2, "2024-01-02 03:49:45.000", 5.0),
]
columns = ["TagName", "EventTime", "Value"]
df = spark.createDataFrame(data, columns)

filter_flatlining_rows = FlatlineFilter(
    df=df,
    watch_columns=["Value"],
    tolerance_timespan=2,
)

result_df = filter_flatlining_rows.filter_data()
result_df.show()
Source code in src/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/flatline_filter.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
class FlatlineFilter(DataManipulationBaseInterface):
    """
    Removes and logs rows with flatlining detected in specified columns of a PySpark DataFrame.

    Args:
        df (pyspark.sql.DataFrame): The input DataFrame to process.
        watch_columns (list): List of column names to monitor for flatlining (null or zero values).
        tolerance_timespan (int): Maximum allowed consecutive flatlining period. Rows exceeding this period are removed.

    Example:
        ```python
        from pyspark.sql import SparkSession
        from rtdip_sdk.pipelines.data_quality.data_manipulation.spark.flatline_filter import FlatlineFilter


        spark = SparkSession.builder.master("local[1]").appName("FlatlineFilterExample").getOrCreate()

        # Example DataFrame
        data = [
            (1, "2024-01-02 03:49:45.000", 0.0),
            (1, "2024-01-02 03:50:45.000", 0.0),
            (1, "2024-01-02 03:51:45.000", 0.0),
            (2, "2024-01-02 03:49:45.000", 5.0),
        ]
        columns = ["TagName", "EventTime", "Value"]
        df = spark.createDataFrame(data, columns)

        filter_flatlining_rows = FlatlineFilter(
            df=df,
            watch_columns=["Value"],
            tolerance_timespan=2,
        )

        result_df = filter_flatlining_rows.filter_data()
        result_df.show()
        ```
    """

    def __init__(
        self, df: PySparkDataFrame, watch_columns: list, tolerance_timespan: int
    ) -> None:
        self.df = df
        self.flatline_detection = FlatlineDetection(
            df=df, watch_columns=watch_columns, tolerance_timespan=tolerance_timespan
        )

    @staticmethod
    def system_type():
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        libraries = Libraries()
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def filter_data(self) -> PySparkDataFrame:
        """
        Removes rows with flatlining detected.

        Returns:
            pyspark.sql.DataFrame: A DataFrame without rows with flatlining detected.
        """
        flatlined_rows = self.flatline_detection.check_for_flatlining()
        flatlined_rows = flatlined_rows.select(*self.df.columns)
        return self.df.subtract(flatlined_rows)

filter_data()

Removes rows with flatlining detected.

Returns:

Type Description
DataFrame

pyspark.sql.DataFrame: A DataFrame without rows with flatlining detected.

Source code in src/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/flatline_filter.py
83
84
85
86
87
88
89
90
91
92
def filter_data(self) -> PySparkDataFrame:
    """
    Removes rows with flatlining detected.

    Returns:
        pyspark.sql.DataFrame: A DataFrame without rows with flatlining detected.
    """
    flatlined_rows = self.flatline_detection.check_for_flatlining()
    flatlined_rows = flatlined_rows.select(*self.df.columns)
    return self.df.subtract(flatlined_rows)