Skip to content

Databricks SQL Connector

DatabricksSQLConnection

Bases: ConnectionInterface

The Databricks SQL Connector for Python is a Python library that allows you to use Python code to run SQL commands on Databricks clusters and Databricks SQL warehouses.

The connection class represents a connection to a database and uses the Databricks SQL Connector API's for Python to interact with cluster/jobs. To find details for SQL warehouses server_hostname and http_path location to the SQL Warehouse tab in the documentation.

Parameters:

Name Type Description Default
server_hostname str

Server hostname for the cluster or SQL Warehouse

required
http_path str

Http path for the cluster or SQL Warehouse

required
access_token str

Azure AD or Databricks PAT token

required
Source code in src/sdk/python/rtdip_sdk/connectors/odbc/db_sql_connector.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
class DatabricksSQLConnection(ConnectionInterface):
    """
    The Databricks SQL Connector for Python is a Python library that allows you to use Python code to run SQL commands on Databricks clusters and Databricks SQL warehouses.

    The connection class represents a connection to a database and uses the Databricks SQL Connector API's for Python to interact with cluster/jobs.
    To find details for SQL warehouses server_hostname and http_path location to the SQL Warehouse tab in the documentation.

    Args:
        server_hostname: Server hostname for the cluster or SQL Warehouse
        http_path: Http path for the cluster or SQL Warehouse
        access_token: Azure AD or Databricks PAT token
    """

    def __init__(self, server_hostname: str, http_path: str, access_token: str) -> None:
        self.server_hostname = server_hostname
        self.http_path = http_path
        self.access_token = access_token
        # call auth method
        self.connection = self._connect()

    def _connect(self):
        """Connects to the endpoint"""
        try:
            return sql.connect(
                server_hostname=self.server_hostname,
                http_path=self.http_path,
                access_token=self.access_token,
                _user_agent_entry="RTDIP",
            )
        except Exception as e:
            logging.exception("error while connecting to the endpoint")
            raise e

    def close(self) -> None:
        """Closes connection to database."""
        try:
            self.connection.close()
        except Exception as e:
            logging.exception("error while closing connection")
            raise e

    def cursor(self) -> object:
        """
        Initiates the cursor and returns it.

        Returns:
          DatabricksSQLCursor: Object to represent a databricks workspace with methods to interact with clusters/jobs.
        """
        try:
            if self.connection.open == False:
                self.connection = self._connect()
            return DatabricksSQLCursor(self.connection.cursor())
        except Exception as e:
            logging.exception("error with cursor object")
            raise e

close()

Closes connection to database.

Source code in src/sdk/python/rtdip_sdk/connectors/odbc/db_sql_connector.py
55
56
57
58
59
60
61
def close(self) -> None:
    """Closes connection to database."""
    try:
        self.connection.close()
    except Exception as e:
        logging.exception("error while closing connection")
        raise e

cursor()

Initiates the cursor and returns it.

Returns:

Name Type Description
DatabricksSQLCursor object

Object to represent a databricks workspace with methods to interact with clusters/jobs.

Source code in src/sdk/python/rtdip_sdk/connectors/odbc/db_sql_connector.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def cursor(self) -> object:
    """
    Initiates the cursor and returns it.

    Returns:
      DatabricksSQLCursor: Object to represent a databricks workspace with methods to interact with clusters/jobs.
    """
    try:
        if self.connection.open == False:
            self.connection = self._connect()
        return DatabricksSQLCursor(self.connection.cursor())
    except Exception as e:
        logging.exception("error with cursor object")
        raise e

DatabricksSQLCursor

Bases: CursorInterface

Object to represent a databricks workspace with methods to interact with clusters/jobs.

Parameters:

Name Type Description Default
cursor object

controls execution of commands on cluster or SQL Warehouse

required
Source code in src/sdk/python/rtdip_sdk/connectors/odbc/db_sql_connector.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
class DatabricksSQLCursor(CursorInterface):
    """
    Object to represent a databricks workspace with methods to interact with clusters/jobs.

    Args:
        cursor: controls execution of commands on cluster or SQL Warehouse
    """

    def __init__(self, cursor: object) -> None:
        self.cursor = cursor

    def execute(self, query: str) -> None:
        """
        Prepares and runs a database query.

        Args:
            query: sql query to execute on the cluster or SQL Warehouse
        """
        try:
            self.cursor.execute(query)
        except Exception as e:
            logging.exception("error while executing the query")
            raise e

    def fetch_all(self, fetch_size=5_000_000) -> list:
        """
        Gets all rows of a query.

        Returns:
            list: list of results
        """
        try:
            get_next_result = True
            results = []
            while get_next_result:
                result = self.cursor.fetchmany_arrow(fetch_size)
                results.append(result)
                if result.num_rows < fetch_size:
                    get_next_result = False

            pyarrow_table = pa.concat_tables(results)
            df = pyarrow_table.to_pandas()
            return df
        except Exception as e:
            logging.exception("error while fetching the rows of a query")
            raise e

    def close(self) -> None:
        """Closes the cursor."""
        try:
            self.cursor.close()
        except Exception as e:
            logging.exception("error while closing the cursor")
            raise e

execute(query)

Prepares and runs a database query.

Parameters:

Name Type Description Default
query str

sql query to execute on the cluster or SQL Warehouse

required
Source code in src/sdk/python/rtdip_sdk/connectors/odbc/db_sql_connector.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def execute(self, query: str) -> None:
    """
    Prepares and runs a database query.

    Args:
        query: sql query to execute on the cluster or SQL Warehouse
    """
    try:
        self.cursor.execute(query)
    except Exception as e:
        logging.exception("error while executing the query")
        raise e

fetch_all(fetch_size=5000000)

Gets all rows of a query.

Returns:

Name Type Description
list list

list of results

Source code in src/sdk/python/rtdip_sdk/connectors/odbc/db_sql_connector.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def fetch_all(self, fetch_size=5_000_000) -> list:
    """
    Gets all rows of a query.

    Returns:
        list: list of results
    """
    try:
        get_next_result = True
        results = []
        while get_next_result:
            result = self.cursor.fetchmany_arrow(fetch_size)
            results.append(result)
            if result.num_rows < fetch_size:
                get_next_result = False

        pyarrow_table = pa.concat_tables(results)
        df = pyarrow_table.to_pandas()
        return df
    except Exception as e:
        logging.exception("error while fetching the rows of a query")
        raise e

close()

Closes the cursor.

Source code in src/sdk/python/rtdip_sdk/connectors/odbc/db_sql_connector.py
126
127
128
129
130
131
132
def close(self) -> None:
    """Closes the cursor."""
    try:
        self.cursor.close()
    except Exception as e:
        logging.exception("error while closing the cursor")
        raise e