Skip to content

Normalization

NormalizationBaseClass

Bases: DataManipulationBaseInterface, InputValidator

A base class for applying normalization techniques to multiple columns in a PySpark DataFrame. This class serves as a framework to support various normalization methods (e.g., Z-Score, Min-Max, and Mean), with specific implementations in separate subclasses for each normalization type.

Subclasses should implement specific normalization and denormalization methods by inheriting from this base class.

Example

from rtdip_sdk.pipelines.data_quality.data_manipulation.spark.normalization.normalization import NormalizationZScore

from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame

normalization = NormalizationZScore(df, column_names=["value_column_1", "value_column_2"], in_place=False)
normalized_df = normalization.filter_data()

Parameters:

Name Type Description Default
df DataFrame

PySpark DataFrame to be normalized.

required
column_names List[str]

List of columns in the DataFrame to be normalized.

required
in_place bool

If true, then result of normalization is stored in the same column.

False

NORMALIZATION_NAME_POSTFIX : str Suffix added to the column name if a new column is created for normalized values.

Source code in src/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/normalization/normalization.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
class NormalizationBaseClass(DataManipulationBaseInterface, InputValidator):
    """
    A base class for applying normalization techniques to multiple columns in a PySpark DataFrame.
    This class serves as a framework to support various normalization methods (e.g., Z-Score, Min-Max, and Mean),
    with specific implementations in separate subclasses for each normalization type.

    Subclasses should implement specific normalization and denormalization methods by inheriting from this base class.


    Example
    --------
    ```python
    from rtdip_sdk.pipelines.data_quality.data_manipulation.spark.normalization.normalization import NormalizationZScore

    from pyspark.sql import SparkSession
    from pyspark.sql.dataframe import DataFrame

    normalization = NormalizationZScore(df, column_names=["value_column_1", "value_column_2"], in_place=False)
    normalized_df = normalization.filter_data()
    ```

    Parameters:
        df (DataFrame): PySpark DataFrame to be normalized.
        column_names (List[str]): List of columns in the DataFrame to be normalized.
        in_place (bool): If true, then result of normalization is stored in the same column.

    Attributes:
    NORMALIZATION_NAME_POSTFIX : str
        Suffix added to the column name if a new column is created for normalized values.

    """

    df: PySparkDataFrame
    column_names: List[str]
    in_place: bool

    reversal_value: List[float]

    # Appended to column name if new column is added
    NORMALIZATION_NAME_POSTFIX: str = "normalization"

    def __init__(
        self, df: PySparkDataFrame, column_names: List[str], in_place: bool = False
    ) -> None:
        self.df = df
        self.column_names = column_names
        self.in_place = in_place

        EXPECTED_SCHEMA = StructType(
            [StructField(column_name, DoubleType()) for column_name in column_names]
        )
        self.validate(EXPECTED_SCHEMA)

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK
        """
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        libraries = Libraries()
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def filter_data(self):
        return self.normalize()

    def normalize(self) -> PySparkDataFrame:
        """
        Applies the specified normalization to each column in column_names.

        Returns:
            DataFrame: A PySpark DataFrame with the normalized values.
        """
        normalized_df = self.df
        for column in self.column_names:
            normalized_df = self._normalize_column(normalized_df, column)
        return normalized_df

    def denormalize(self, input_df) -> PySparkDataFrame:
        """
        Denormalizes the input DataFrame. Intended to be used by the denormalization component.

        Parameters:
            input_df (DataFrame): Dataframe containing the current data.
        """
        denormalized_df = input_df
        if not self.in_place:
            for column in self.column_names:
                denormalized_df = denormalized_df.drop(
                    self._get_norm_column_name(column)
                )
        else:
            for column in self.column_names:
                denormalized_df = self._denormalize_column(denormalized_df, column)
        return denormalized_df

    @property
    @abstractmethod
    def NORMALIZED_COLUMN_NAME(self): ...

    @abstractmethod
    def _normalize_column(self, df: PySparkDataFrame, column: str) -> PySparkDataFrame:
        pass

    @abstractmethod
    def _denormalize_column(
        self, df: PySparkDataFrame, column: str
    ) -> PySparkDataFrame:
        pass

    def _get_norm_column_name(self, column_name: str) -> str:
        if not self.in_place:
            return f"{column_name}_{self.NORMALIZED_COLUMN_NAME}_{self.NORMALIZATION_NAME_POSTFIX}"
        else:
            return column_name

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK

Source code in src/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/normalization/normalization.py
81
82
83
84
85
86
87
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK
    """
    return SystemType.PYSPARK

normalize()

Applies the specified normalization to each column in column_names.

Returns:

Name Type Description
DataFrame DataFrame

A PySpark DataFrame with the normalized values.

Source code in src/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/normalization/normalization.py
101
102
103
104
105
106
107
108
109
110
111
def normalize(self) -> PySparkDataFrame:
    """
    Applies the specified normalization to each column in column_names.

    Returns:
        DataFrame: A PySpark DataFrame with the normalized values.
    """
    normalized_df = self.df
    for column in self.column_names:
        normalized_df = self._normalize_column(normalized_df, column)
    return normalized_df

denormalize(input_df)

Denormalizes the input DataFrame. Intended to be used by the denormalization component.

Parameters:

Name Type Description Default
input_df DataFrame

Dataframe containing the current data.

required
Source code in src/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/normalization/normalization.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def denormalize(self, input_df) -> PySparkDataFrame:
    """
    Denormalizes the input DataFrame. Intended to be used by the denormalization component.

    Parameters:
        input_df (DataFrame): Dataframe containing the current data.
    """
    denormalized_df = input_df
    if not self.in_place:
        for column in self.column_names:
            denormalized_df = denormalized_df.drop(
                self._get_norm_column_name(column)
            )
    else:
        for column in self.column_names:
            denormalized_df = self._denormalize_column(denormalized_df, column)
    return denormalized_df