Skip to content

Databricks SQL Connector

DatabricksSQLConnection

Bases: ConnectionInterface

The Databricks SQL Connector for Python is a Python library that allows you to use Python code to run SQL commands on Databricks clusters and Databricks SQL warehouses.

The connection class represents a connection to a database and uses the Databricks SQL Connector API's for Python to interact with cluster/jobs. To find details for SQL warehouses server_hostname and http_path location to the SQL Warehouse tab in the documentation.

Parameters:

Name Type Description Default
server_hostname str

Server hostname for the cluster or SQL Warehouse

required
http_path str

Http path for the cluster or SQL Warehouse

required
access_token str

Azure AD or Databricks PAT token

required
Source code in src/sdk/python/rtdip_sdk/connectors/odbc/db_sql_connector.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
class DatabricksSQLConnection(ConnectionInterface):
    """
    The Databricks SQL Connector for Python is a Python library that allows you to use Python code to run SQL commands on Databricks clusters and Databricks SQL warehouses.

    The connection class represents a connection to a database and uses the Databricks SQL Connector API's for Python to interact with cluster/jobs.
    To find details for SQL warehouses server_hostname and http_path location to the SQL Warehouse tab in the documentation.

    Args:
        server_hostname: Server hostname for the cluster or SQL Warehouse
        http_path: Http path for the cluster or SQL Warehouse
        access_token: Azure AD or Databricks PAT token
    """

    def __init__(
        self,
        server_hostname: str,
        http_path: str,
        access_token: str,
        return_type=ConnectionReturnType.Pandas,
    ) -> None:
        self.server_hostname = server_hostname
        self.http_path = http_path
        self.access_token = access_token
        self.return_type = return_type
        # call auth method
        self.connection = self._connect()

    def _connect(self):
        """Connects to the endpoint"""
        try:
            return sql.connect(
                server_hostname=self.server_hostname,
                http_path=self.http_path,
                access_token=self.access_token,
                _user_agent_entry="RTDIP",
            )
        except Exception as e:
            logging.exception("error while connecting to the endpoint")
            raise e

    def close(self) -> None:
        """Closes connection to database."""
        try:
            self.connection.close()
        except Exception as e:
            logging.exception("error while closing connection")
            raise e

    def cursor(self) -> object:
        """
        Initiates the cursor and returns it.

        Returns:
          DatabricksSQLCursor: Object to represent a databricks workspace with methods to interact with clusters/jobs.
        """
        try:
            if self.connection.open == False:
                self.connection = self._connect()
            return DatabricksSQLCursor(self.connection.cursor(), self.return_type)
        except Exception as e:
            logging.exception("error with cursor object")
            raise e

close()

Closes connection to database.

Source code in src/sdk/python/rtdip_sdk/connectors/odbc/db_sql_connector.py
64
65
66
67
68
69
70
def close(self) -> None:
    """Closes connection to database."""
    try:
        self.connection.close()
    except Exception as e:
        logging.exception("error while closing connection")
        raise e

cursor()

Initiates the cursor and returns it.

Returns:

Name Type Description
DatabricksSQLCursor object

Object to represent a databricks workspace with methods to interact with clusters/jobs.

Source code in src/sdk/python/rtdip_sdk/connectors/odbc/db_sql_connector.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def cursor(self) -> object:
    """
    Initiates the cursor and returns it.

    Returns:
      DatabricksSQLCursor: Object to represent a databricks workspace with methods to interact with clusters/jobs.
    """
    try:
        if self.connection.open == False:
            self.connection = self._connect()
        return DatabricksSQLCursor(self.connection.cursor(), self.return_type)
    except Exception as e:
        logging.exception("error with cursor object")
        raise e

DatabricksSQLCursor

Bases: CursorInterface

Object to represent a databricks workspace with methods to interact with clusters/jobs.

Parameters:

Name Type Description Default
cursor object

controls execution of commands on cluster or SQL Warehouse

required
Source code in src/sdk/python/rtdip_sdk/connectors/odbc/db_sql_connector.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
class DatabricksSQLCursor(CursorInterface):
    """
    Object to represent a databricks workspace with methods to interact with clusters/jobs.

    Args:
        cursor: controls execution of commands on cluster or SQL Warehouse
    """

    def __init__(self, cursor: object, return_type=ConnectionReturnType.Pandas) -> None:
        self.cursor = cursor
        self.return_type = return_type

    def execute(self, query: str) -> None:
        """
        Prepares and runs a database query.

        Args:
            query: sql query to execute on the cluster or SQL Warehouse
        """
        try:
            self.cursor.execute(query)
        except Exception as e:
            logging.exception("error while executing the query")
            raise e

    def fetch_all(self, fetch_size=5_000_000) -> Union[list, dict]:
        """
        Gets all rows of a query.

        Returns:
            list: list of results
        """
        try:
            get_next_result = True
            results = None if self.return_type == ConnectionReturnType.String else []
            count = 0
            sample_row = None
            while get_next_result:
                result = self.cursor.fetchmany_arrow(fetch_size)
                count += result.num_rows
                if self.return_type == ConnectionReturnType.List:
                    column_list = []
                    for column in result.columns:
                        column_list.append(column.to_pylist())
                    results.extend(zip(*column_list))
                elif self.return_type == ConnectionReturnType.String:
                    column_list = []
                    for column in result.columns:
                        column_list.append(column.to_pylist())
                    rows = [str(item[0]) for item in zip(*column_list)]
                    if len(rows) > 0:
                        sample_row = rows[0]
                    strings = ",".join(rows)
                    if results is None:
                        results = strings
                    else:
                        results = ",".join([results, strings])
                else:
                    results.append(result)
                if result.num_rows < fetch_size:
                    get_next_result = False

            if self.return_type == ConnectionReturnType.Pandas:
                pyarrow_table = pa.concat_tables(results)
                return pyarrow_table.to_pandas()
            elif self.return_type == ConnectionReturnType.Pyarrow:
                pyarrow_table = pa.concat_tables(results)
                return pyarrow_table
            elif self.return_type == ConnectionReturnType.List:
                return results
            elif self.return_type == ConnectionReturnType.String:
                return {
                    "data": results,
                    "sample_row": sample_row,
                    "count": count,
                }
        except Exception as e:
            logging.exception("error while fetching the rows of a query")
            raise e

    def close(self) -> None:
        """Closes the cursor."""
        try:
            self.cursor.close()
        except Exception as e:
            logging.exception("error while closing the cursor")
            raise e

execute(query)

Prepares and runs a database query.

Parameters:

Name Type Description Default
query str

sql query to execute on the cluster or SQL Warehouse

required
Source code in src/sdk/python/rtdip_sdk/connectors/odbc/db_sql_connector.py
100
101
102
103
104
105
106
107
108
109
110
111
def execute(self, query: str) -> None:
    """
    Prepares and runs a database query.

    Args:
        query: sql query to execute on the cluster or SQL Warehouse
    """
    try:
        self.cursor.execute(query)
    except Exception as e:
        logging.exception("error while executing the query")
        raise e

fetch_all(fetch_size=5000000)

Gets all rows of a query.

Returns:

Name Type Description
list Union[list, dict]

list of results

Source code in src/sdk/python/rtdip_sdk/connectors/odbc/db_sql_connector.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def fetch_all(self, fetch_size=5_000_000) -> Union[list, dict]:
    """
    Gets all rows of a query.

    Returns:
        list: list of results
    """
    try:
        get_next_result = True
        results = None if self.return_type == ConnectionReturnType.String else []
        count = 0
        sample_row = None
        while get_next_result:
            result = self.cursor.fetchmany_arrow(fetch_size)
            count += result.num_rows
            if self.return_type == ConnectionReturnType.List:
                column_list = []
                for column in result.columns:
                    column_list.append(column.to_pylist())
                results.extend(zip(*column_list))
            elif self.return_type == ConnectionReturnType.String:
                column_list = []
                for column in result.columns:
                    column_list.append(column.to_pylist())
                rows = [str(item[0]) for item in zip(*column_list)]
                if len(rows) > 0:
                    sample_row = rows[0]
                strings = ",".join(rows)
                if results is None:
                    results = strings
                else:
                    results = ",".join([results, strings])
            else:
                results.append(result)
            if result.num_rows < fetch_size:
                get_next_result = False

        if self.return_type == ConnectionReturnType.Pandas:
            pyarrow_table = pa.concat_tables(results)
            return pyarrow_table.to_pandas()
        elif self.return_type == ConnectionReturnType.Pyarrow:
            pyarrow_table = pa.concat_tables(results)
            return pyarrow_table
        elif self.return_type == ConnectionReturnType.List:
            return results
        elif self.return_type == ConnectionReturnType.String:
            return {
                "data": results,
                "sample_row": sample_row,
                "count": count,
            }
    except Exception as e:
        logging.exception("error while fetching the rows of a query")
        raise e

close()

Closes the cursor.

Source code in src/sdk/python/rtdip_sdk/connectors/odbc/db_sql_connector.py
168
169
170
171
172
173
174
def close(self) -> None:
    """Closes the cursor."""
    try:
        self.cursor.close()
    except Exception as e:
        logging.exception("error while closing the cursor")
        raise e