Skip to content

Session

SparkSessionUtility

Bases: UtilitiesInterface

Creates or Gets a Spark Session and uses settings and libraries of the imported RTDIP components to populate the spark configuration and jars in the spark session.

Call this component after all imports of the RTDIP components to ensure that the spark session is configured correctly.

Example

from rtdip_sdk.pipelines.utilities import SparkSessionUtility

spark_session_utility = SparkSessionUtility(
    config={},
    module=None,
    remote=None
)

result = spark_session_utility.execute()

Parameters:

Name Type Description Default
config optional dict

Dictionary of spark configuration to be applied to the spark session

None
module optional str

Provide the module to use for imports of rtdip-sdk components. If not populated, it will use the calling module to check for imports

None
remote optional str

Specify the remote parameters if intending to use Spark Connect

None
Source code in src/sdk/python/rtdip_sdk/pipelines/utilities/spark/session.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
class SparkSessionUtility(UtilitiesInterface):
    """
    Creates or Gets a Spark Session and uses settings and libraries of the imported RTDIP components to populate the spark configuration and jars in the spark session.

    Call this component after all imports of the RTDIP components to ensure that the spark session is configured correctly.

    Example
    --------
    ```python
    from rtdip_sdk.pipelines.utilities import SparkSessionUtility

    spark_session_utility = SparkSessionUtility(
        config={},
        module=None,
        remote=None
    )

    result = spark_session_utility.execute()
    ```

    Parameters:
        config (optional dict): Dictionary of spark configuration to be applied to the spark session
        module (optional str): Provide the module to use for imports of rtdip-sdk components. If not populated, it will use the calling module to check for imports
        remote (optional str): Specify the remote parameters if intending to use Spark Connect
    """

    spark: SparkSession
    config: dict
    module: str

    def __init__(
        self, config: dict = None, module: str = None, remote: str = None
    ) -> None:
        self.config = config
        if module == None:
            frm = inspect.stack()[1]
            mod = inspect.getmodule(frm[0])
            self.module = mod.__name__
        else:
            self.module = module
        self.remote = remote

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK
        """
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        libraries = Libraries()
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def execute(self) -> SparkSession:
        """To execute"""
        try:
            (task_libraries, spark_configuration) = PipelineComponentsGetUtility(
                self.module, self.config
            ).execute()
            self.spark = SparkClient(
                spark_configuration=spark_configuration,
                spark_libraries=task_libraries,
                spark_remote=self.remote,
            ).spark_session
            return self.spark

        except Exception as e:
            logging.exception(str(e))
            raise e

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK

Source code in src/sdk/python/rtdip_sdk/pipelines/utilities/spark/session.py
69
70
71
72
73
74
75
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK
    """
    return SystemType.PYSPARK

execute()

To execute

Source code in src/sdk/python/rtdip_sdk/pipelines/utilities/spark/session.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def execute(self) -> SparkSession:
    """To execute"""
    try:
        (task_libraries, spark_configuration) = PipelineComponentsGetUtility(
            self.module, self.config
        ).execute()
        self.spark = SparkClient(
            spark_configuration=spark_configuration,
            spark_libraries=task_libraries,
            spark_remote=self.remote,
        ).spark_session
        return self.spark

    except Exception as e:
        logging.exception(str(e))
        raise e