Skip to content

Pandas to PySpark DataFrame Conversion

PandasToPySparkTransformer

Bases: TransformerInterface

Converts a Pandas DataFrame to a PySpark DataFrame.

Example

from rtdip_sdk.pipelines.transformers import PandasToPySparkTransformer
from rtdip_sdk.pipelines.utilities import SparkSessionUtility

# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()

pandas_to_pyspark = PandasToPySparkTransformer(
    spark=spark,
    df=df,
)

result = pandas_to_pyspark.transform()

Parameters:

Name Type Description Default
spark SparkSession

Spark Session required to convert DataFrame

required
df DataFrame

Pandas DataFrame to be converted

required
Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/pandas_to_pyspark.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
class PandasToPySparkTransformer(TransformerInterface):
    """
    Converts a Pandas DataFrame to a PySpark DataFrame.

    Example
    --------
    ```python
    from rtdip_sdk.pipelines.transformers import PandasToPySparkTransformer
    from rtdip_sdk.pipelines.utilities import SparkSessionUtility

    # Not required if using Databricks
    spark = SparkSessionUtility(config={}).execute()

    pandas_to_pyspark = PandasToPySparkTransformer(
        spark=spark,
        df=df,
    )

    result = pandas_to_pyspark.transform()
    ```

    Parameters:
        spark (SparkSession): Spark Session required to convert DataFrame
        df (DataFrame): Pandas DataFrame to be converted
    """

    spark: SparkSession
    df: PandasDataFrame

    def __init__(self, spark: SparkSession, df: PandasDataFrame) -> None:
        self.spark = spark
        self.df = df

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK
        """
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        libraries = Libraries()
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def pre_transform_validation(self):
        return True

    def post_transform_validation(self):
        return True

    def transform(self) -> PySparkDataFrame:
        """
        Returns:
            DataFrame: A PySpark dataframe converted from a Pandas DataFrame.
        """

        self.df = _prepare_pandas_to_convert_to_spark(self.df)
        df = self.spark.createDataFrame(self.df)

        return df

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK

Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/pandas_to_pyspark.py
57
58
59
60
61
62
63
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK
    """
    return SystemType.PYSPARK

transform()

Returns:

Name Type Description
DataFrame DataFrame

A PySpark dataframe converted from a Pandas DataFrame.

Source code in src/sdk/python/rtdip_sdk/pipelines/transformers/spark/pandas_to_pyspark.py
80
81
82
83
84
85
86
87
88
89
def transform(self) -> PySparkDataFrame:
    """
    Returns:
        DataFrame: A PySpark dataframe converted from a Pandas DataFrame.
    """

    self.df = _prepare_pandas_to_convert_to_spark(self.df)
    df = self.spark.createDataFrame(self.df)

    return df