Skip to content

Read from Delta

PythonDeltaSource

Bases: SourceInterface

The Python Delta Source is used to read data from a Delta table without using Apache Spark, returning a Polars LazyFrame.

Example

from rtdip_sdk.pipelines.sources import PythonDeltaSource

path = "abfss://{FILE-SYSTEM}@{ACCOUNT-NAME}.dfs.core.windows.net/{PATH}/{FILE-NAME}

python_delta_source = PythonDeltaSource(
    path=path,
    version=None,
    storage_options={
        "azure_storage_account_name": "{AZURE-STORAGE-ACCOUNT-NAME}",
        "azure_storage_account_key": "{AZURE-STORAGE-ACCOUNT-KEY}"
    },
    pyarrow_options=None,
    without_files=False
)

python_delta_source.read_batch()
from rtdip_sdk.pipelines.sources import PythonDeltaSource

path = "https://s3.{REGION-CODE}.amazonaws.com/{BUCKET-NAME}/{KEY-NAME}"

python_delta_source = PythonDeltaSource(
    path=path,
    version=None,
    storage_options={
        "aws_access_key_id": "{AWS-ACCESS-KEY-ID}",
        "aws_secret_access_key": "{AWS-SECRET-ACCESS-KEY}"
    },
    pyarrow_options=None,
    without_files=False
)

python_delta_source.read_batch()

Parameters:

Name Type Description Default
path str

Path to the Delta table. Can be local or in S3/Azure storage

required
version optional int

Specify the Delta table version to read from. Defaults to the latest version

None
storage_options optional dict

Used to read from AWS/Azure storage. For AWS use format {"aws_access_key_id": "<>", "aws_secret_access_key":"<>"}. For Azure use format {"azure_storage_account_name": "<>", "azure_storage_account_key": "<>"}.

None
pyarrow_options optional dict

Data Access and Efficiency options when reading from Delta. See to_pyarrow_dataset.

None
without_files optional bool

If True loads the table without tracking files

False
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/python/delta.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
class PythonDeltaSource(SourceInterface):
    """
    The Python Delta Source is used to read data from a Delta table without using Apache Spark, returning a Polars LazyFrame.

     Example
    --------
    === "Azure"

        ```python
        from rtdip_sdk.pipelines.sources import PythonDeltaSource

        path = "abfss://{FILE-SYSTEM}@{ACCOUNT-NAME}.dfs.core.windows.net/{PATH}/{FILE-NAME}

        python_delta_source = PythonDeltaSource(
            path=path,
            version=None,
            storage_options={
                "azure_storage_account_name": "{AZURE-STORAGE-ACCOUNT-NAME}",
                "azure_storage_account_key": "{AZURE-STORAGE-ACCOUNT-KEY}"
            },
            pyarrow_options=None,
            without_files=False
        )

        python_delta_source.read_batch()
        ```
    === "AWS"

        ```python
        from rtdip_sdk.pipelines.sources import PythonDeltaSource

        path = "https://s3.{REGION-CODE}.amazonaws.com/{BUCKET-NAME}/{KEY-NAME}"

        python_delta_source = PythonDeltaSource(
            path=path,
            version=None,
            storage_options={
                "aws_access_key_id": "{AWS-ACCESS-KEY-ID}",
                "aws_secret_access_key": "{AWS-SECRET-ACCESS-KEY}"
            },
            pyarrow_options=None,
            without_files=False
        )

        python_delta_source.read_batch()
        ```

    Parameters:
        path (str): Path to the Delta table. Can be local or in S3/Azure storage
        version (optional int): Specify the Delta table version to read from. Defaults to the latest version
        storage_options (optional dict): Used to read from AWS/Azure storage. For AWS use format {"aws_access_key_id": "<>", "aws_secret_access_key":"<>"}. For Azure use format {"azure_storage_account_name": "<>", "azure_storage_account_key": "<>"}.
        pyarrow_options (optional dict): Data Access and Efficiency options when reading from Delta. See [to_pyarrow_dataset](https://delta-io.github.io/delta-rs/python/api_reference.html#deltalake.table.DeltaTable.to_pyarrow_dataset){ target="_blank" }.
        without_files (optional bool): If True loads the table without tracking files
    """

    path: str
    version: int
    storage_options: dict
    pyarrow_options: dict
    without_files: bool

    def __init__(
        self,
        path: str,
        version: int = None,
        storage_options: dict = None,
        pyarrow_options: dict = None,
        without_files: bool = False,
    ):
        self.path = path
        self.version = version
        self.storage_options = storage_options
        self.pyarrow_options = pyarrow_options
        self.without_files = without_files

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYTHON
        """
        return SystemType.PYTHON

    @staticmethod
    def libraries():
        libraries = Libraries()
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def pre_read_validation(self):
        return True

    def post_read_validation(self):
        return True

    def read_batch(self) -> LazyFrame:
        """
        Reads data from a Delta table into a Polars LazyFrame
        """
        without_files_dict = {"without_files": self.without_files}
        lf = pl.scan_delta(
            source=self.path,
            version=self.version,
            storage_options=self.storage_options,
            delta_table_options=without_files_dict,
            pyarrow_options=self.pyarrow_options,
        )
        return lf

    def read_stream(self):
        """
        Raises:
            NotImplementedError: Reading from a Delta table using Python is only possible for batch reads. To perform a streaming read, use the read_stream method of the SparkDeltaSource component.
        """
        raise NotImplementedError(
            "Reading from a Delta table using Python is only possible for batch reads. To perform a streaming read, use the read_stream method of the SparkDeltaSource component"
        )

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYTHON

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/python/delta.py
 97
 98
 99
100
101
102
103
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYTHON
    """
    return SystemType.PYTHON

read_batch()

Reads data from a Delta table into a Polars LazyFrame

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/python/delta.py
120
121
122
123
124
125
126
127
128
129
130
131
132
def read_batch(self) -> LazyFrame:
    """
    Reads data from a Delta table into a Polars LazyFrame
    """
    without_files_dict = {"without_files": self.without_files}
    lf = pl.scan_delta(
        source=self.path,
        version=self.version,
        storage_options=self.storage_options,
        delta_table_options=without_files_dict,
        pyarrow_options=self.pyarrow_options,
    )
    return lf

read_stream()

Raises:

Type Description
NotImplementedError

Reading from a Delta table using Python is only possible for batch reads. To perform a streaming read, use the read_stream method of the SparkDeltaSource component.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/python/delta.py
134
135
136
137
138
139
140
141
def read_stream(self):
    """
    Raises:
        NotImplementedError: Reading from a Delta table using Python is only possible for batch reads. To perform a streaming read, use the read_stream method of the SparkDeltaSource component.
    """
    raise NotImplementedError(
        "Reading from a Delta table using Python is only possible for batch reads. To perform a streaming read, use the read_stream method of the SparkDeltaSource component"
    )