Skip to content

Write to Delta

PythonDeltaDestination

Bases: DestinationInterface

The Python Delta Destination is used to write data to a Delta table from a Polars LazyFrame.

Example

from rtdip_sdk.pipelines.destinations import PythonDeltaDestination

path = "abfss://{FILE-SYSTEM}@{ACCOUNT-NAME}.dfs.core.windows.net/{PATH}/{FILE-NAME}

python_delta_destination = PythonDeltaDestination(
    data=LazyFrame
    path=path,
    storage_options={
        "azure_storage_account_name": "{AZURE-STORAGE-ACCOUNT-NAME}",
        "azure_storage_account_key": "{AZURE-STORAGE-ACCOUNT-KEY}"
    },
    mode=:error",
    overwrite_schema=False,
    delta_write_options=None
)

python_delta_destination.read_batch()
from rtdip_sdk.pipelines.destinations import PythonDeltaDestination

path = "https://s3.{REGION-CODE}.amazonaws.com/{BUCKET-NAME}/{KEY-NAME}"

python_delta_destination = PythonDeltaDestination(
    data=LazyFrame
    path=path,
    options={
        "aws_access_key_id": "{AWS-ACCESS-KEY-ID}",
        "aws_secret_access_key": "{AWS-SECRET-ACCESS-KEY}"
    },
    mode=:error",
    overwrite_schema=False,
    delta_write_options=None
)

python_delta_destination.read_batch()

Parameters:

Name Type Description Default
data LazyFrame

Polars LazyFrame to be written to Delta

required
path str

Path to Delta table to be written to; either local or remote. Locally if the Table does't exist one will be created, but to write to AWS or Azure, you must have an existing Delta Table

required
options Optional dict

Used if writing to a remote location. For AWS use format {"aws_access_key_id": "<>", "aws_secret_access_key": "<>"}. For Azure use format {"azure_storage_account_name": "storageaccountname", "azure_storage_access_key": "<>"}

None
mode Literal['error', 'append', 'overwrite', 'ignore']

Defaults to error if table exists, 'ignore' won't write anything if table exists

'error'
overwrite_schema bool

If True will allow for the table schema to be overwritten

False
delta_write_options dict

Options when writing to a Delta table. See here for all options

None
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/python/delta.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
class PythonDeltaDestination(DestinationInterface):
    """
    The Python Delta Destination is used to write data to a Delta table from a Polars LazyFrame.

     Example
    --------
    === "Azure"

        ```python
        from rtdip_sdk.pipelines.destinations import PythonDeltaDestination

        path = "abfss://{FILE-SYSTEM}@{ACCOUNT-NAME}.dfs.core.windows.net/{PATH}/{FILE-NAME}

        python_delta_destination = PythonDeltaDestination(
            data=LazyFrame
            path=path,
            storage_options={
                "azure_storage_account_name": "{AZURE-STORAGE-ACCOUNT-NAME}",
                "azure_storage_account_key": "{AZURE-STORAGE-ACCOUNT-KEY}"
            },
            mode=:error",
            overwrite_schema=False,
            delta_write_options=None
        )

        python_delta_destination.read_batch()

        ```
    === "AWS"

        ```python
        from rtdip_sdk.pipelines.destinations import PythonDeltaDestination

        path = "https://s3.{REGION-CODE}.amazonaws.com/{BUCKET-NAME}/{KEY-NAME}"

        python_delta_destination = PythonDeltaDestination(
            data=LazyFrame
            path=path,
            options={
                "aws_access_key_id": "{AWS-ACCESS-KEY-ID}",
                "aws_secret_access_key": "{AWS-SECRET-ACCESS-KEY}"
            },
            mode=:error",
            overwrite_schema=False,
            delta_write_options=None
        )

        python_delta_destination.read_batch()
        ```

    Parameters:
        data (LazyFrame): Polars LazyFrame to be written to Delta
        path (str): Path to Delta table to be written to; either local or [remote](https://delta-io.github.io/delta-rs/python/usage.html#loading-a-delta-table){ target="_blank" }. **Locally** if the Table does't exist one will be created, but to write to AWS or Azure, you must have an existing Delta Table
        options (Optional dict): Used if writing to a remote location. For AWS use format {"aws_access_key_id": "<>", "aws_secret_access_key": "<>"}. For Azure use format {"azure_storage_account_name": "storageaccountname", "azure_storage_access_key": "<>"}
        mode (Literal['error', 'append', 'overwrite', 'ignore']): Defaults to error if table exists, 'ignore' won't write anything if table exists
        overwrite_schema (bool): If True will allow for the table schema to be overwritten
        delta_write_options (dict): Options when writing to a Delta table. See [here](https://delta-io.github.io/delta-rs/python/api_reference.html#writing-deltatables){ target="_blank" } for all options
    """

    data: LazyFrame
    path: str
    options: dict
    mode: Literal["error", "append", "overwrite", "ignore"]
    overwrite_schema: bool
    delta_write_options: dict

    def __init__(
        self,
        data: LazyFrame,
        path: str,
        options: dict = None,
        mode: Literal["error", "append", "overwrite", "ignore"] = "error",
        overwrite_schema: bool = False,
        delta_write_options: dict = None,
    ) -> None:
        self.data = data
        self.path = path
        self.options = options
        self.mode = mode
        self.overwrite_schema = overwrite_schema
        self.delta_write_options = delta_write_options

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYTHON
        """
        return SystemType.PYTHON

    @staticmethod
    def libraries():
        libraries = Libraries()
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def pre_write_validation(self):
        return True

    def post_write_validation(self):
        return True

    def write_batch(self):
        """
        Writes batch data to Delta without using Spark.
        """
        if isinstance(self.data, pl.LazyFrame):
            df = self.data.collect()
            df.write_delta(
                self.path,
                mode=self.mode,
                overwrite_schema=self.overwrite_schema,
                storage_options=self.options,
                delta_write_options=self.delta_write_options,
            )
        else:
            raise ValueError(
                "Data must be a Polars LazyFrame. See https://pola-rs.github.io/polars/py-polars/html/reference/lazyframe/index.html"
            )

    def write_stream(self):
        """
        Raises:
            NotImplementedError: Writing to a Delta table using Python is only possible for batch writes. To perform a streaming read, use the write_stream method of the SparkDeltaDestination component.
        """
        raise NotImplementedError(
            "Writing to a Delta table using Python is only possible for batch writes. To perform a streaming read, use the write_stream method of the SparkDeltaDestination component"
        )

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYTHON

Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/python/delta.py
105
106
107
108
109
110
111
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYTHON
    """
    return SystemType.PYTHON

write_batch()

Writes batch data to Delta without using Spark.

Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/python/delta.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def write_batch(self):
    """
    Writes batch data to Delta without using Spark.
    """
    if isinstance(self.data, pl.LazyFrame):
        df = self.data.collect()
        df.write_delta(
            self.path,
            mode=self.mode,
            overwrite_schema=self.overwrite_schema,
            storage_options=self.options,
            delta_write_options=self.delta_write_options,
        )
    else:
        raise ValueError(
            "Data must be a Polars LazyFrame. See https://pola-rs.github.io/polars/py-polars/html/reference/lazyframe/index.html"
        )

write_stream()

Raises:

Type Description
NotImplementedError

Writing to a Delta table using Python is only possible for batch writes. To perform a streaming read, use the write_stream method of the SparkDeltaDestination component.

Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/python/delta.py
146
147
148
149
150
151
152
153
def write_stream(self):
    """
    Raises:
        NotImplementedError: Writing to a Delta table using Python is only possible for batch writes. To perform a streaming read, use the write_stream method of the SparkDeltaDestination component.
    """
    raise NotImplementedError(
        "Writing to a Delta table using Python is only possible for batch writes. To perform a streaming read, use the write_stream method of the SparkDeltaDestination component"
    )