Skip to content

Configuration

SparkConfigurationUtility

Bases: UtilitiesInterface

Sets configuration key value pairs to a Spark Session

Example

from rtdip_sdk.pipelines.sources import SparkConfigurationUtility
from rtdip_sdk.pipelines.utilities import SparkSessionUtility

# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()

configuration_utility = SparkConfigurationUtility(
    spark=spark,
    config={}
)

result = configuration_utility.execute()

Parameters:

Name Type Description Default
spark SparkSession

Spark Session required to read data from cloud storage

required
config dict

Dictionary of spark configuration to be applied to the spark session

required
Source code in src/sdk/python/rtdip_sdk/pipelines/utilities/spark/configuration.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
class SparkConfigurationUtility(UtilitiesInterface):
    """
    Sets configuration key value pairs to a Spark Session

    Example
    --------
    ```python
    from rtdip_sdk.pipelines.sources import SparkConfigurationUtility
    from rtdip_sdk.pipelines.utilities import SparkSessionUtility

    # Not required if using Databricks
    spark = SparkSessionUtility(config={}).execute()

    configuration_utility = SparkConfigurationUtility(
        spark=spark,
        config={}
    )

    result = configuration_utility.execute()
    ```

    Parameters:
        spark (SparkSession): Spark Session required to read data from cloud storage
        config (dict): Dictionary of spark configuration to be applied to the spark session
    """

    spark: SparkSession
    config: dict
    columns: List[StructField]
    partitioned_by: List[str]
    location: str
    properties: dict
    comment: str

    def __init__(self, spark: SparkSession, config: dict) -> None:
        self.spark = spark
        self.config = config

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK
        """
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        libraries = Libraries()
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def execute(self) -> bool:
        """Executes configuration key value pairs to a Spark Session"""
        try:
            for configuration in self.config.items():
                self.spark.conf.set(configuration[0], configuration[1])
            return True

        except Py4JJavaError as e:
            logging.exception(e.errmsg)
            raise e
        except Exception as e:
            logging.exception(str(e))
            raise e

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK

Source code in src/sdk/python/rtdip_sdk/pipelines/utilities/spark/configuration.py
63
64
65
66
67
68
69
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK
    """
    return SystemType.PYSPARK

execute()

Executes configuration key value pairs to a Spark Session

Source code in src/sdk/python/rtdip_sdk/pipelines/utilities/spark/configuration.py
80
81
82
83
84
85
86
87
88
89
90
91
92
def execute(self) -> bool:
    """Executes configuration key value pairs to a Spark Session"""
    try:
        for configuration in self.config.items():
            self.spark.conf.set(configuration[0], configuration[1])
        return True

    except Py4JJavaError as e:
        logging.exception(e.errmsg)
        raise e
    except Exception as e:
        logging.exception(str(e))
        raise e