Skip to content

Linear Regression

LinearRegression

Bases: MachineLearningInterface

This class uses pyspark.ml.LinearRegression to train a linear regression model on time data and then uses the model to predict next values in the time series.

Parameters:

Name Type Description Default
features_col str

Name of the column containing the features (the input). Default is 'features'.

'features'
label_col str

Name of the column containing the label (the input). Default is 'label'.

'label'
prediction_col str

Name of the column to which the prediction will be written. Default is 'prediction'.

'prediction'
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from rtdip_sdk.pipelines.forecasting.spark.linear_regression import LinearRegression

spark = SparkSession.builder.master("local[2]").appName("LinearRegressionExample").getOrCreate()

data = [
    (1, 2.0, 3.0),
    (2, 3.0, 4.0),
    (3, 4.0, 5.0),
    (4, 5.0, 6.0),
    (5, 6.0, 7.0),
]
columns = ["id", "feature1", "label"]
df = spark.createDataFrame(data, columns)

assembler = VectorAssembler(inputCols=["feature1"], outputCol="features")
df = assembler.transform(df)

lr = LinearRegression(features_col="features", label_col="label", prediction_col="prediction")
train_df, test_df = lr.split_data(df, train_ratio=0.8)
lr.train(train_df)
predictions = lr.predict(test_df)
rmse, r2 = lr.evaluate(predictions)
print(f"RMSE: {rmse}, R²: {r2}")
Source code in src/sdk/python/rtdip_sdk/pipelines/forecasting/spark/linear_regression.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
class LinearRegression(MachineLearningInterface):
    """
    This class uses pyspark.ml.LinearRegression to train a linear regression model on time data
    and then uses the model to predict next values in the time series.

    Args:
        features_col (str): Name of the column containing the features (the input). Default is 'features'.
        label_col (str): Name of the column containing the label (the input). Default is 'label'.
        prediction_col (str): Name of the column to which the prediction will be written. Default is 'prediction'.

    Example:
    --------
    ```python
    from pyspark.sql import SparkSession
    from pyspark.ml.feature import VectorAssembler
    from rtdip_sdk.pipelines.forecasting.spark.linear_regression import LinearRegression

    spark = SparkSession.builder.master("local[2]").appName("LinearRegressionExample").getOrCreate()

    data = [
        (1, 2.0, 3.0),
        (2, 3.0, 4.0),
        (3, 4.0, 5.0),
        (4, 5.0, 6.0),
        (5, 6.0, 7.0),
    ]
    columns = ["id", "feature1", "label"]
    df = spark.createDataFrame(data, columns)

    assembler = VectorAssembler(inputCols=["feature1"], outputCol="features")
    df = assembler.transform(df)

    lr = LinearRegression(features_col="features", label_col="label", prediction_col="prediction")
    train_df, test_df = lr.split_data(df, train_ratio=0.8)
    lr.train(train_df)
    predictions = lr.predict(test_df)
    rmse, r2 = lr.evaluate(predictions)
    print(f"RMSE: {rmse}, R²: {r2}")
    ```

    """

    def __init__(
        self,
        features_col: str = "features",
        label_col: str = "label",
        prediction_col: str = "prediction",
    ) -> None:
        self.features_col = features_col
        self.label_col = label_col
        self.prediction_col = prediction_col

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK
        """
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        libraries = Libraries()
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def split_data(
        self, df: DataFrame, train_ratio: float = 0.8
    ) -> tuple[DataFrame, DataFrame]:
        """
        Splits the dataset into training and testing sets.

        Args:
            train_ratio (float): The ratio of the data to be used for training. Default is 0.8 (80% for training).

        Returns:
            tuple[DataFrame, DataFrame]: Returns the training and testing datasets.
        """
        train_df, test_df = df.randomSplit([train_ratio, 1 - train_ratio], seed=42)
        return train_df, test_df

    def train(self, train_df: DataFrame):
        """
        Trains a linear regression model on the provided data.
        """
        linear_regression = ml.regression.LinearRegression(
            featuresCol=self.features_col,
            labelCol=self.label_col,
            predictionCol=self.prediction_col,
        )

        self.model = linear_regression.fit(train_df)
        return self

    def predict(self, prediction_df: DataFrame):
        """
        Predicts the next values in the time series.
        """

        return self.model.transform(
            prediction_df,
        )

    def evaluate(self, test_df: DataFrame) -> Optional[float]:
        """
        Evaluates the trained model using RMSE.

        Args:
            test_df (DataFrame): The testing dataset to evaluate the model.

        Returns:
            Optional[float]: The Root Mean Squared Error (RMSE) of the model or None if the prediction columnd doesn't exist.
        """

        if self.prediction_col not in test_df.columns:
            print(
                f"Error: '{self.prediction_col}' column is missing in the test DataFrame."
            )
            return None

        # Evaluator for RMSE
        evaluator_rmse = RegressionEvaluator(
            labelCol=self.label_col,
            predictionCol=self.prediction_col,
            metricName="rmse",
        )
        rmse = evaluator_rmse.evaluate(test_df)

        # Evaluator for R²
        evaluator_r2 = RegressionEvaluator(
            labelCol=self.label_col, predictionCol=self.prediction_col, metricName="r2"
        )
        r2 = evaluator_r2.evaluate(test_df)

        return rmse, r2

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK

Source code in src/sdk/python/rtdip_sdk/pipelines/forecasting/spark/linear_regression.py
74
75
76
77
78
79
80
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK
    """
    return SystemType.PYSPARK

split_data(df, train_ratio=0.8)

Splits the dataset into training and testing sets.

Parameters:

Name Type Description Default
train_ratio float

The ratio of the data to be used for training. Default is 0.8 (80% for training).

0.8

Returns:

Type Description
tuple[DataFrame, DataFrame]

tuple[DataFrame, DataFrame]: Returns the training and testing datasets.

Source code in src/sdk/python/rtdip_sdk/pipelines/forecasting/spark/linear_regression.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def split_data(
    self, df: DataFrame, train_ratio: float = 0.8
) -> tuple[DataFrame, DataFrame]:
    """
    Splits the dataset into training and testing sets.

    Args:
        train_ratio (float): The ratio of the data to be used for training. Default is 0.8 (80% for training).

    Returns:
        tuple[DataFrame, DataFrame]: Returns the training and testing datasets.
    """
    train_df, test_df = df.randomSplit([train_ratio, 1 - train_ratio], seed=42)
    return train_df, test_df

train(train_df)

Trains a linear regression model on the provided data.

Source code in src/sdk/python/rtdip_sdk/pipelines/forecasting/spark/linear_regression.py
106
107
108
109
110
111
112
113
114
115
116
117
def train(self, train_df: DataFrame):
    """
    Trains a linear regression model on the provided data.
    """
    linear_regression = ml.regression.LinearRegression(
        featuresCol=self.features_col,
        labelCol=self.label_col,
        predictionCol=self.prediction_col,
    )

    self.model = linear_regression.fit(train_df)
    return self

predict(prediction_df)

Predicts the next values in the time series.

Source code in src/sdk/python/rtdip_sdk/pipelines/forecasting/spark/linear_regression.py
119
120
121
122
123
124
125
126
def predict(self, prediction_df: DataFrame):
    """
    Predicts the next values in the time series.
    """

    return self.model.transform(
        prediction_df,
    )

evaluate(test_df)

Evaluates the trained model using RMSE.

Parameters:

Name Type Description Default
test_df DataFrame

The testing dataset to evaluate the model.

required

Returns:

Type Description
Optional[float]

Optional[float]: The Root Mean Squared Error (RMSE) of the model or None if the prediction columnd doesn't exist.

Source code in src/sdk/python/rtdip_sdk/pipelines/forecasting/spark/linear_regression.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def evaluate(self, test_df: DataFrame) -> Optional[float]:
    """
    Evaluates the trained model using RMSE.

    Args:
        test_df (DataFrame): The testing dataset to evaluate the model.

    Returns:
        Optional[float]: The Root Mean Squared Error (RMSE) of the model or None if the prediction columnd doesn't exist.
    """

    if self.prediction_col not in test_df.columns:
        print(
            f"Error: '{self.prediction_col}' column is missing in the test DataFrame."
        )
        return None

    # Evaluator for RMSE
    evaluator_rmse = RegressionEvaluator(
        labelCol=self.label_col,
        predictionCol=self.prediction_col,
        metricName="rmse",
    )
    rmse = evaluator_rmse.evaluate(test_df)

    # Evaluator for R²
    evaluator_r2 = RegressionEvaluator(
        labelCol=self.label_col, predictionCol=self.prediction_col, metricName="r2"
    )
    r2 = evaluator_r2.evaluate(test_df)

    return rmse, r2