Skip to content

Delta Table Optimize

DeltaTableOptimizeUtility

Bases: UtilitiesInterface

Optimizes a Delta Table

Parameters:

Name Type Description Default
spark SparkSession

Spark Session required to read data from cloud storage

required
table_name str

Name of the table, including catalog and schema if table is to be created in Unity Catalog

required
where str

Apply a partition filter to limit optimize to specific partitions. Example, "date='2021-11-18'" or "EventDate<=current_date()"

None
zorder_by list[str]

List of column names to zorder the table by. For more information, see here.

None
Source code in src/sdk/python/rtdip_sdk/pipelines/utilities/spark/delta_table_optimize.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
class DeltaTableOptimizeUtility(UtilitiesInterface):
    """
    [Optimizes](https://docs.delta.io/latest/optimizations-oss.html) a Delta Table

    Args:
        spark (SparkSession): Spark Session required to read data from cloud storage
        table_name (str): Name of the table, including catalog and schema if table is to be created in Unity Catalog
        where (str, optional): Apply a partition filter to limit optimize to specific partitions. Example, "date='2021-11-18'" or "EventDate<=current_date()"
        zorder_by (list[str], optional): List of column names to zorder the table by. For more information, see [here.](https://docs.delta.io/latest/optimizations-oss.html#optimize-performance-with-file-management&language-python)
    """

    spark: SparkSession
    table_name: str
    where: Optional[str]
    zorder_by: Optional[List[str]]

    def __init__(
        self,
        spark: SparkSession,
        table_name: str,
        where: str = None,
        zorder_by: List[str] = None,
    ) -> None:
        self.spark = spark
        self.table_name = table_name
        self.where = where
        self.zorder_by = zorder_by

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK
        """
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        libraries = Libraries()
        libraries.add_maven_library(get_default_package("spark_delta_core"))
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def execute(self) -> bool:
        try:
            delta_table = DeltaTable.forName(self.spark, self.table_name).optimize()

            if self.where is not None:
                delta_table = delta_table.where(self.where)

            if self.zorder_by is not None:
                delta_table = delta_table.executeZOrderBy(self.zorder_by)
            else:
                delta_table.executeCompaction()

            return True

        except Py4JJavaError as e:
            logging.exception(e.errmsg)
            raise e
        except Exception as e:
            logging.exception(str(e))
            raise e

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK

Source code in src/sdk/python/rtdip_sdk/pipelines/utilities/spark/delta_table_optimize.py
54
55
56
57
58
59
60
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK
    """
    return SystemType.PYSPARK