Skip to content

Read from Delta sharing

SparkDeltaSharingSource

Bases: SourceInterface

The Spark Delta Sharing Source is used to read data from a Delta table where Delta sharing is configured

Example

#Delta Sharing Source for Streaming Queries

from rtdip_sdk.pipelines.sources import SparkDeltaSharingSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility

# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()

delta_sharing_source = SparkDeltaSharingSource(
    spark=spark,
    options={
        "maxFilesPerTrigger": 1000,
        "ignoreChanges: True,
        "startingVersion": 0
    },
    table_name="{YOUR-DELTA-TABLE-PATH}"
)

delta_sharing_source.read_stream()
#Delta Sharing Source for Batch Queries

from rtdip_sdk.pipelines.sources import SparkDeltaSharingSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility

# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()

delta_sharing_source = SparkDeltaSharingSource(
    spark=spark,
    options={
        "versionAsOf": 0,
        "timestampAsOf": "yyyy-mm-dd hh:mm:ss[.fffffffff]"
    },
    table_name="{YOUR-DELTA-TABLE-PATH}"
)

delta_sharing_source.read_batch()

Parameters:

Name Type Description Default
spark SparkSession

Spark Session required to read data from a Delta table

required
options dict

Options that can be specified for a Delta Table read operation (See Attributes table below). Further information on the options is available here

required
table_path str

Path to credentials file and Delta table to query

required

Attributes:

Name Type Description
ignoreDeletes bool str

Ignore transactions that delete data at partition boundaries. (Streaming)

ignoreChanges bool str

Pre-process updates if files had to be rewritten in the source table due to a data changing operation. (Streaming)

startingVersion int str

The Delta Lake version to start from. (Streaming)

startingTimestamp datetime str

The timestamp to start from. (Streaming)

maxFilesPerTrigger int

How many new files to be considered in every micro-batch. The default is 1000. (Streaming)

maxBytesPerTrigger int

How much data gets processed in each micro-batch. (Streaming)

readChangeFeed bool str

Stream read the change data feed of the shared table. (Batch & Streaming)

timestampAsOf datetime str

Query the Delta Table from a specific point in time. (Batch)

versionAsOf int str

Query the Delta Table from a specific version. (Batch)

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/delta_sharing.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
class SparkDeltaSharingSource(SourceInterface):
    """
    The Spark Delta Sharing Source is used to read data from a Delta table where Delta sharing is configured

    Example
    --------
    ```python
    #Delta Sharing Source for Streaming Queries

    from rtdip_sdk.pipelines.sources import SparkDeltaSharingSource
    from rtdip_sdk.pipelines.utilities import SparkSessionUtility

    # Not required if using Databricks
    spark = SparkSessionUtility(config={}).execute()

    delta_sharing_source = SparkDeltaSharingSource(
        spark=spark,
        options={
            "maxFilesPerTrigger": 1000,
            "ignoreChanges: True,
            "startingVersion": 0
        },
        table_name="{YOUR-DELTA-TABLE-PATH}"
    )

    delta_sharing_source.read_stream()
    ```
    ```python
    #Delta Sharing Source for Batch Queries

    from rtdip_sdk.pipelines.sources import SparkDeltaSharingSource
    from rtdip_sdk.pipelines.utilities import SparkSessionUtility

    # Not required if using Databricks
    spark = SparkSessionUtility(config={}).execute()

    delta_sharing_source = SparkDeltaSharingSource(
        spark=spark,
        options={
            "versionAsOf": 0,
            "timestampAsOf": "yyyy-mm-dd hh:mm:ss[.fffffffff]"
        },
        table_name="{YOUR-DELTA-TABLE-PATH}"
    )

    delta_sharing_source.read_batch()
    ```

    Parameters:
        spark (SparkSession): Spark Session required to read data from a Delta table
        options (dict): Options that can be specified for a Delta Table read operation (See Attributes table below). Further information on the options is available [here](https://docs.databricks.com/data-sharing/read-data-open.html#apache-spark-read-shared-data){ target="_blank" }
        table_path (str): Path to credentials file and Delta table to query

    Attributes:
        ignoreDeletes (bool str): Ignore transactions that delete data at partition boundaries. (Streaming)
        ignoreChanges (bool str): Pre-process updates if files had to be rewritten in the source table due to a data changing operation. (Streaming)
        startingVersion (int str): The Delta Lake version to start from. (Streaming)
        startingTimestamp (datetime str): The timestamp to start from. (Streaming)
        maxFilesPerTrigger (int): How many new files to be considered in every micro-batch. The default is 1000. (Streaming)
        maxBytesPerTrigger (int): How much data gets processed in each micro-batch. (Streaming)
        readChangeFeed (bool str): Stream read the change data feed of the shared table. (Batch & Streaming)
        timestampAsOf (datetime str): Query the Delta Table from a specific point in time. (Batch)
        versionAsOf (int str): Query the Delta Table from a specific version. (Batch)
    """

    spark: SparkSession
    options: dict
    table_path: str

    def __init__(self, spark: SparkSession, options: dict, table_path: str) -> None:
        self.spark = spark
        self.options = options
        self.table_path = table_path

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK
        """
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        libraries = Libraries()
        libraries.add_maven_library(get_default_package("spark_delta_sharing"))
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def pre_read_validation(self):
        return True

    def post_read_validation(self):
        return True

    def read_batch(self):
        """
        Reads batch data from Delta. Most of the options provided by the Apache Spark DataFrame read API are supported for performing batch reads on Delta tables.
        """
        try:
            return (
                self.spark.read.format("deltaSharing")
                .options(**self.options)
                .table(self.table_path)
            )

        except Py4JJavaError as e:
            logging.exception(e.errmsg)
            raise e
        except Exception as e:
            logging.exception(str(e))
            raise e

    def read_stream(self) -> DataFrame:
        """
        Reads streaming data from Delta. All of the data in the table is processed as well as any new data that arrives after the stream started. .load() can take table name or path.
        """
        try:
            return (
                self.spark.readStream.format("deltaSharing")
                .options(**self.options)
                .load(self.table_path)
            )

        except Py4JJavaError as e:
            logging.exception(e.errmsg)
            raise e
        except Exception as e:
            logging.exception(str(e))
            raise e

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/delta_sharing.py
 98
 99
100
101
102
103
104
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK
    """
    return SystemType.PYSPARK

read_batch()

Reads batch data from Delta. Most of the options provided by the Apache Spark DataFrame read API are supported for performing batch reads on Delta tables.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/delta_sharing.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def read_batch(self):
    """
    Reads batch data from Delta. Most of the options provided by the Apache Spark DataFrame read API are supported for performing batch reads on Delta tables.
    """
    try:
        return (
            self.spark.read.format("deltaSharing")
            .options(**self.options)
            .table(self.table_path)
        )

    except Py4JJavaError as e:
        logging.exception(e.errmsg)
        raise e
    except Exception as e:
        logging.exception(str(e))
        raise e

read_stream()

Reads streaming data from Delta. All of the data in the table is processed as well as any new data that arrives after the stream started. .load() can take table name or path.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/delta_sharing.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def read_stream(self) -> DataFrame:
    """
    Reads streaming data from Delta. All of the data in the table is processed as well as any new data that arrives after the stream started. .load() can take table name or path.
    """
    try:
        return (
            self.spark.readStream.format("deltaSharing")
            .options(**self.options)
            .load(self.table_path)
        )

    except Py4JJavaError as e:
        logging.exception(e.errmsg)
        raise e
    except Exception as e:
        logging.exception(str(e))
        raise e