Skip to content

Delta Table Optimize

DeltaTableOptimizeUtility

Bases: UtilitiesInterface

Optimizes a Delta Table.

Example

from rtdip_sdk.pipelines.utilities.spark.delta_table_optimize import DeltaTableOptimizeUtility

table_optimize_utility = DeltaTableOptimizeUtility(
    spark=spark_session,
    table_name="delta_table",
    where="EventDate<=current_date()",
    zorder_by=["EventDate"]
)

result = table_optimize_utility.execute()

Parameters:

Name Type Description Default
spark SparkSession

Spark Session required to read data from cloud storage

required
table_name str

Name of the table, including catalog and schema if table is to be created in Unity Catalog

required
where str

Apply a partition filter to limit optimize to specific partitions. Example, "date='2021-11-18'" or "EventDate<=current_date()"

None
zorder_by list[str]

List of column names to zorder the table by. For more information, see here.

None
Source code in src/sdk/python/rtdip_sdk/pipelines/utilities/spark/delta_table_optimize.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
class DeltaTableOptimizeUtility(UtilitiesInterface):
    """
    [Optimizes](https://docs.delta.io/latest/optimizations-oss.html) a Delta Table.

    Example
    -------
    ```python
    from rtdip_sdk.pipelines.utilities.spark.delta_table_optimize import DeltaTableOptimizeUtility

    table_optimize_utility = DeltaTableOptimizeUtility(
        spark=spark_session,
        table_name="delta_table",
        where="EventDate<=current_date()",
        zorder_by=["EventDate"]
    )

    result = table_optimize_utility.execute()
    ```

    Parameters:
        spark (SparkSession): Spark Session required to read data from cloud storage
        table_name (str): Name of the table, including catalog and schema if table is to be created in Unity Catalog
        where (str, optional): Apply a partition filter to limit optimize to specific partitions. Example, "date='2021-11-18'" or "EventDate<=current_date()"
        zorder_by (list[str], optional): List of column names to zorder the table by. For more information, see [here.](https://docs.delta.io/latest/optimizations-oss.html#optimize-performance-with-file-management&language-python)
    """

    spark: SparkSession
    table_name: str
    where: Optional[str]
    zorder_by: Optional[List[str]]

    def __init__(
        self,
        spark: SparkSession,
        table_name: str,
        where: str = None,
        zorder_by: List[str] = None,
    ) -> None:
        self.spark = spark
        self.table_name = table_name
        self.where = where
        self.zorder_by = zorder_by

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK
        """
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        libraries = Libraries()
        libraries.add_maven_library(get_default_package("spark_delta_core"))
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def execute(self) -> bool:
        try:
            delta_table = DeltaTable.forName(self.spark, self.table_name).optimize()

            if self.where is not None:
                delta_table = delta_table.where(self.where)

            if self.zorder_by is not None:
                delta_table = delta_table.executeZOrderBy(self.zorder_by)
            else:
                delta_table.executeCompaction()

            return True

        except Py4JJavaError as e:
            logging.exception(e.errmsg)
            raise e
        except Exception as e:
            logging.exception(str(e))
            raise e

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK

Source code in src/sdk/python/rtdip_sdk/pipelines/utilities/spark/delta_table_optimize.py
69
70
71
72
73
74
75
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK
    """
    return SystemType.PYSPARK