Skip to content

Delta Table Create

DeltaTableCreateUtility

Bases: UtilitiesInterface

Creates a Delta Table in a Hive Metastore or in Databricks Unity Catalog.

Example

from rtdip_sdk.pipelines.utilities.spark.delta_table_create import DeltaTableCreateUtility, DeltaTableColumn

table_create_utility = DeltaTableCreateUtility(
    spark=spark_session,
    table_name="delta_table",
    columns=[
        DeltaTableColumn(name="EventDate", type="date", nullable=False, metadata={"delta.generationExpression": "CAST(EventTime AS DATE)"}),
        DeltaTableColumn(name="TagName", type="string", nullable=False),
        DeltaTableColumn(name="EventTime", type="timestamp", nullable=False),
        DeltaTableColumn(name="Status", type="string", nullable=True),
        DeltaTableColumn(name="Value", type="float", nullable=True)
    ],
    partitioned_by=["EventDate"],
    properties={"delta.logRetentionDuration": "7 days", "delta.enableChangeDataFeed": "true"},
    comment="Creation of Delta Table"
)

result = table_create_utility.execute()

Parameters:

Name Type Description Default
spark SparkSession

Spark Session required to read data from cloud storage

required
table_name str

Name of the table, including catalog and schema if table is to be created in Unity Catalog

required
columns list[DeltaTableColumn]

List of columns and their related column properties

required
partitioned_by list[str]

List of column names to partition the table by

None
location str

Path to storage location

None
properties dict

Propoerties that can be specified for a Delta Table. Further information on the options available are here

None
comment str

Provides a comment on the table metadata

None
Source code in src/sdk/python/rtdip_sdk/pipelines/utilities/spark/delta_table_create.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
class DeltaTableCreateUtility(UtilitiesInterface):
    """
    Creates a Delta Table in a Hive Metastore or in Databricks Unity Catalog.

    Example
    -------
    ```python
    from rtdip_sdk.pipelines.utilities.spark.delta_table_create import DeltaTableCreateUtility, DeltaTableColumn

    table_create_utility = DeltaTableCreateUtility(
        spark=spark_session,
        table_name="delta_table",
        columns=[
            DeltaTableColumn(name="EventDate", type="date", nullable=False, metadata={"delta.generationExpression": "CAST(EventTime AS DATE)"}),
            DeltaTableColumn(name="TagName", type="string", nullable=False),
            DeltaTableColumn(name="EventTime", type="timestamp", nullable=False),
            DeltaTableColumn(name="Status", type="string", nullable=True),
            DeltaTableColumn(name="Value", type="float", nullable=True)
        ],
        partitioned_by=["EventDate"],
        properties={"delta.logRetentionDuration": "7 days", "delta.enableChangeDataFeed": "true"},
        comment="Creation of Delta Table"
    )

    result = table_create_utility.execute()
    ```

    Parameters:
        spark (SparkSession): Spark Session required to read data from cloud storage
        table_name (str): Name of the table, including catalog and schema if table is to be created in Unity Catalog
        columns (list[DeltaTableColumn]): List of columns and their related column properties
        partitioned_by (list[str], optional): List of column names to partition the table by
        location (str, optional): Path to storage location
        properties (dict, optional): Propoerties that can be specified for a Delta Table. Further information on the options available are [here](https://docs.databricks.com/delta/table-properties.html#delta-table-properties)
        comment (str, optional): Provides a comment on the table metadata


    """

    spark: SparkSession
    table_name: str
    columns: List[DeltaTableColumn]
    partitioned_by: List[str]
    location: str
    properties: dict
    comment: str

    def __init__(
        self,
        spark: SparkSession,
        table_name: str,
        columns: List[StructField],
        partitioned_by: List[str] = None,
        location: str = None,
        properties: dict = None,
        comment: str = None,
    ) -> None:
        self.spark = spark
        self.table_name = table_name
        self.columns = columns
        self.partitioned_by = partitioned_by
        self.location = location
        self.properties = properties
        self.comment = comment

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK
        """
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        libraries = Libraries()
        libraries.add_maven_library(get_default_package("spark_delta_core"))
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def execute(self) -> bool:
        try:
            columns = [StructField.fromJson(column.dict()) for column in self.columns]

            delta_table = (
                DeltaTable.createIfNotExists(self.spark)
                .tableName(self.table_name)
                .addColumns(columns)
            )

            if self.partitioned_by is not None:
                delta_table = delta_table.partitionedBy(self.partitioned_by)

            if self.location is not None:
                delta_table = delta_table.location(self.location)

            if self.properties is not None:
                for key, value in self.properties.items():
                    delta_table = delta_table.property(key, value)

            if self.comment is not None:
                delta_table = delta_table.comment(self.comment)

            delta_table.execute()
            return True

        except Py4JJavaError as e:
            logging.exception(e.errmsg)
            raise e
        except Exception as e:
            logging.exception(str(e))
            raise e

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK

Source code in src/sdk/python/rtdip_sdk/pipelines/utilities/spark/delta_table_create.py
100
101
102
103
104
105
106
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK
    """
    return SystemType.PYSPARK