Skip to content

Spark Connector

SparkConnection

Bases: ConnectionInterface

The Spark Connector enables running Spark Sql queries via a Spark Session.

Additionally, this connector supports Spark Connect which was introduced in Pyspark 3.4.0 and allows Spark Sessions to connect to remote Spark Clusters. This enables Spark SQL to be constructed locally, but executed remotely. To find out more about Spark Connect and the connection string to be provided to the spark_remote parameter of the Spark Session, please see here.

Parameters:

Name Type Description Default
spark (optional, SparkSession)

Provide an existing spark session if one exists. A new Spark Session will be created if not populated

None
spark_configuration (optional, dict)

Spark configuration to be provided to the spark session

None
spark_libraries (optional, Libraries)

Additional JARs to be included in the Spark Session.

None
spark_remote (optional, str)

Remote connection string of Spark Server and any authentication details. The Spark Connect introduced in Pyspark 3.4.0 allows Spark Sessions to connect to remote Spark Clusters. This enables Spark SQL to be constructed locally, but executed remotely.

None
Source code in src/sdk/python/rtdip_sdk/connectors/grpc/spark_connector.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class SparkConnection(ConnectionInterface):
    """
    The Spark Connector enables running Spark Sql queries via a Spark Session.

    Additionally, this connector supports Spark Connect which was introduced in Pyspark 3.4.0 and allows Spark Sessions to connect to remote Spark Clusters. This enables Spark SQL to be constructed locally, but executed remotely.
    To find out more about Spark Connect and the connection string to be provided to the `spark_remote` parameter of the Spark Session, please see [here.](https://spark.apache.org/docs/latest/spark-connect-overview.html#specify-spark-connect-when-creating-spark-session)

    Args:
        spark (optional, SparkSession): Provide an existing spark session if one exists. A new Spark Session will be created if not populated
        spark_configuration (optional, dict): Spark configuration to be provided to the spark session
        spark_libraries (optional, Libraries): Additional JARs to be included in the Spark Session.
        spark_remote (optional, str): Remote connection string of Spark Server and any authentication details. The Spark Connect introduced in Pyspark 3.4.0 allows Spark Sessions to connect to remote Spark Clusters. This enables Spark SQL to be constructed locally, but executed remotely.
    """

    def __init__(
        self,
        spark: SparkSession = None,
        spark_configuration: dict = None,
        spark_libraries: Libraries = None,
        spark_remote: str = None,
    ) -> None:
        if spark_remote != None:
            _package_version_meets_minimum("pyspark", "3.4.0")

        if spark is None:
            self.connection = SparkClient(
                spark_configuration=(
                    {} if spark_configuration is None else spark_configuration
                ),
                spark_libraries=(
                    Libraries() if spark_libraries is None else spark_libraries
                ),
                spark_remote=spark_remote,
            ).spark_session
        else:
            self.connection = spark

    def close(self) -> None:
        """Not relevant for spark sessions"""
        pass

    def cursor(self) -> object:
        """
        Intiates the cursor and returns it.

        Returns:
          DatabricksSQLCursor: Object to represent a databricks workspace with methods to interact with clusters/jobs.
        """
        try:
            return SparkCursor(self.connection)
        except Exception as e:
            logging.exception("error with cursor object")
            raise e

close()

Not relevant for spark sessions

Source code in src/sdk/python/rtdip_sdk/connectors/grpc/spark_connector.py
63
64
65
def close(self) -> None:
    """Not relevant for spark sessions"""
    pass

cursor()

Intiates the cursor and returns it.

Returns:

Name Type Description
DatabricksSQLCursor object

Object to represent a databricks workspace with methods to interact with clusters/jobs.

Source code in src/sdk/python/rtdip_sdk/connectors/grpc/spark_connector.py
67
68
69
70
71
72
73
74
75
76
77
78
def cursor(self) -> object:
    """
    Intiates the cursor and returns it.

    Returns:
      DatabricksSQLCursor: Object to represent a databricks workspace with methods to interact with clusters/jobs.
    """
    try:
        return SparkCursor(self.connection)
    except Exception as e:
        logging.exception("error with cursor object")
        raise e

SparkCursor

Bases: CursorInterface

Object to represent a spark session with methods to interact with clusters/jobs using the remote connection information.

Parameters:

Name Type Description Default
cursor object

controls execution of commands on Spark Cluster

required
Source code in src/sdk/python/rtdip_sdk/connectors/grpc/spark_connector.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
class SparkCursor(CursorInterface):
    """
    Object to represent a spark session with methods to interact with clusters/jobs using the remote connection information.

    Args:
        cursor: controls execution of commands on Spark Cluster
    """

    execute_result: DataFrame

    def __init__(self, cursor: object) -> None:
        self.cursor = cursor

    def execute(self, query: str) -> None:
        """
        Prepares and runs a database query.

        Args:
            query: sql query to execute on the cluster or SQL Warehouse
        """
        try:
            self.execute_result = self.cursor.sql(query)
        except Exception as e:
            logging.exception("error while executing the query")
            raise e

    def fetch_all(self) -> DataFrame:
        """
        Gets all rows of a query.

        Returns:
          DataFrame: Spark DataFrame of results
        """
        try:
            df = self.execute_result
            return df
        except Exception as e:
            logging.exception("error while fetching the rows of a query")
            raise e

    def close(self) -> None:
        """Not relevant for dataframes"""
        pass

execute(query)

Prepares and runs a database query.

Parameters:

Name Type Description Default
query str

sql query to execute on the cluster or SQL Warehouse

required
Source code in src/sdk/python/rtdip_sdk/connectors/grpc/spark_connector.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def execute(self, query: str) -> None:
    """
    Prepares and runs a database query.

    Args:
        query: sql query to execute on the cluster or SQL Warehouse
    """
    try:
        self.execute_result = self.cursor.sql(query)
    except Exception as e:
        logging.exception("error while executing the query")
        raise e

fetch_all()

Gets all rows of a query.

Returns:

Name Type Description
DataFrame DataFrame

Spark DataFrame of results

Source code in src/sdk/python/rtdip_sdk/connectors/grpc/spark_connector.py
107
108
109
110
111
112
113
114
115
116
117
118
119
def fetch_all(self) -> DataFrame:
    """
    Gets all rows of a query.

    Returns:
      DataFrame: Spark DataFrame of results
    """
    try:
        df = self.execute_result
        return df
    except Exception as e:
        logging.exception("error while fetching the rows of a query")
        raise e

close()

Not relevant for dataframes

Source code in src/sdk/python/rtdip_sdk/connectors/grpc/spark_connector.py
121
122
123
def close(self) -> None:
    """Not relevant for dataframes"""
    pass