Skip to content

Delta Table Vacuum

DeltaTableVacuumUtility

Bases: UtilitiesInterface

Vacuums a Delta Table.

Example

from rtdip_sdk.pipelines.utilities.spark.delta_table_vacuum import DeltaTableVacuumUtility

table_vacuum_utility =  DeltaTableVacuumUtility(
    spark=spark_session,
    table_name="delta_table",
    retention_hours="168"
)

result = table_vacuum_utility.execute()

Parameters:

Name Type Description Default
spark SparkSession

Spark Session required to read data from cloud storage

required
table_name str

Name of the table, including catalog and schema if table is to be created in Unity Catalog

required
retention_hours int

Sets the retention threshold in hours.

None
Source code in src/sdk/python/rtdip_sdk/pipelines/utilities/spark/delta_table_vacuum.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
class DeltaTableVacuumUtility(UtilitiesInterface):
    """
    [Vacuums](https://docs.delta.io/latest/delta-utility.html#-delta-vacuum) a Delta Table.

    Example
    -------
    ```python
    from rtdip_sdk.pipelines.utilities.spark.delta_table_vacuum import DeltaTableVacuumUtility

    table_vacuum_utility =  DeltaTableVacuumUtility(
        spark=spark_session,
        table_name="delta_table",
        retention_hours="168"
    )

    result = table_vacuum_utility.execute()
    ```

    Parameters:
        spark (SparkSession): Spark Session required to read data from cloud storage
        table_name (str): Name of the table, including catalog and schema if table is to be created in Unity Catalog
        retention_hours (int, optional): Sets the retention threshold in hours.
    """

    spark: SparkSession
    table_name: str
    retention_hours: Optional[int]

    def __init__(
        self, spark: SparkSession, table_name: str, retention_hours: int = None
    ) -> None:
        self.spark = spark
        self.table_name = table_name
        self.retention_hours = retention_hours

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK
        """
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        libraries = Libraries()
        libraries.add_maven_library(get_default_package("spark_delta_core"))
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def execute(self) -> bool:
        try:
            delta_table = DeltaTable.forName(self.spark, self.table_name)

            delta_table.vacuum(retentionHours=self.retention_hours)

            return True

        except Py4JJavaError as e:
            logging.exception(e.errmsg)
            raise e
        except Exception as e:
            logging.exception(str(e))
            raise e

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK

Source code in src/sdk/python/rtdip_sdk/pipelines/utilities/spark/delta_table_vacuum.py
61
62
63
64
65
66
67
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK
    """
    return SystemType.PYSPARK