Skip to content

Autoloader Resources

AzureAutoloaderResourcesUtility

Bases: UtilitiesInterface

Creates the required Azure Resources for the Databricks Autoloader Notification Mode.

Example

from rtdip_sdk.pipelines.utilities import AzureAutoloaderResourcesUtility

azure_autoloader_resources_utility = AzureAutoloaderResourcesUtility(
    subscription_id="YOUR-SUBSCRIPTION-ID",
    resource_group_name="YOUR-RESOURCE-GROUP",
    storage_account="YOUR-STORAGE-ACCOUNT-NAME",
    container="YOUR-CONTAINER-NAME",
    directory="DIRECTORY",
    credential="YOUR-CLIENT-ID",
    event_subscription_name="YOUR-EVENT-SUBSCRIPTION",
    queue_name="YOUR-QUEUE-NAME",
    system_topic_name=None
)

result = azure_autoloader_resources_utility.execute()

Parameters:

Name Type Description Default
subscription_id str

Azure Subscription ID

required
resource_group_name str

Resource Group Name of Subscription

required
storage_account str

Storage Account Name

required
container str

Container Name

required
directory str

Directory to be used for filtering messages in the Event Subscription. This will be equivalent to the Databricks Autoloader Path

required
credential TokenCredential

Credentials to authenticate with Storage Account

required
event_subscription_name str

Name of the Event Subscription

required
queue_name str

Name of the queue that will be used for the Endpoint of the Messages

required
Source code in src/sdk/python/rtdip_sdk/pipelines/utilities/azure/autoloader_resources.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
class AzureAutoloaderResourcesUtility(UtilitiesInterface):
    """
    Creates the required Azure Resources for the Databricks Autoloader Notification Mode.

    Example
    --------
    ```python
    from rtdip_sdk.pipelines.utilities import AzureAutoloaderResourcesUtility

    azure_autoloader_resources_utility = AzureAutoloaderResourcesUtility(
        subscription_id="YOUR-SUBSCRIPTION-ID",
        resource_group_name="YOUR-RESOURCE-GROUP",
        storage_account="YOUR-STORAGE-ACCOUNT-NAME",
        container="YOUR-CONTAINER-NAME",
        directory="DIRECTORY",
        credential="YOUR-CLIENT-ID",
        event_subscription_name="YOUR-EVENT-SUBSCRIPTION",
        queue_name="YOUR-QUEUE-NAME",
        system_topic_name=None
    )

    result = azure_autoloader_resources_utility.execute()
    ```

    Parameters:
        subscription_id (str): Azure Subscription ID
        resource_group_name (str): Resource Group Name of Subscription
        storage_account (str): Storage Account Name
        container (str): Container Name
        directory (str): Directory to be used for filtering messages in the Event Subscription. This will be equivalent to the Databricks Autoloader Path
        credential (TokenCredential): Credentials to authenticate with Storage Account
        event_subscription_name (str): Name of the Event Subscription
        queue_name (str): Name of the queue that will be used for the Endpoint of the Messages
    """

    subscription_id: str
    resource_group_name: str
    storage_account: str
    container: str
    directory: str
    credential: TokenCredential
    event_subscription_name: str
    queue_name: str

    def __init__(
        self,
        subscription_id: str,
        resource_group_name: str,
        storage_account: str,
        container: str,
        directory: str,
        credential: TokenCredential,
        event_subscription_name: str,
        queue_name: str,
    ) -> None:
        self.subscription_id = subscription_id
        self.resource_group_name = resource_group_name
        self.storage_account = storage_account
        self.container = container
        self.directory = directory
        self.credential = credential
        self.event_subscription_name = event_subscription_name
        self.queue_name = queue_name

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYTHON
        """
        return SystemType.PYTHON

    @staticmethod
    def libraries():
        libraries = Libraries()
        libraries.add_pypi_library(get_default_package("azure_eventgrid_mgmt"))
        libraries.add_pypi_library(get_default_package("azure_storage_mgmt"))
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def execute(self) -> bool:
        storage_mgmt_client = StorageManagementClient(
            credential=self.credential, subscription_id=self.subscription_id
        )

        try:
            queue_response = storage_mgmt_client.queue.get(
                resource_group_name=self.resource_group_name,
                account_name=self.storage_account,
                queue_name=self.queue_name,
            )
        except ResourceNotFoundError:
            queue_response = None

        if queue_response == None:
            storage_mgmt_client.queue.create(
                resource_group_name=self.resource_group_name,
                account_name=self.storage_account,
                queue_name=self.queue_name,
                queue=StorageQueue(),
            )

        eventgrid_client = EventGridManagementClient(
            credential=self.credential, subscription_id=self.subscription_id
        )

        source = "/subscriptions/{}/resourceGroups/{}/providers/Microsoft.Storage/StorageAccounts/{}".format(
            self.subscription_id, self.resource_group_name, self.storage_account
        )

        try:
            event_subscription_response = eventgrid_client.event_subscriptions.get(
                scope=source, event_subscription_name=self.event_subscription_name
            )
        except ResourceNotFoundError:
            event_subscription_response = None

        if event_subscription_response == None:
            event_subscription_destination = StorageQueueEventSubscriptionDestination(
                resource_id=source,
                queue_name=self.queue_name,
                queue_message_time_to_live_in_seconds=None,
            )

            event_subscription_filter = EventSubscriptionFilter(
                subject_begins_with="/blobServices/default/containers/{}/blobs/{}".format(
                    self.container, self.directory
                ),
                included_event_types=[
                    "Microsoft.Storage.BlobCreated",
                    "Microsoft.Storage.BlobRenamed",
                    "Microsoft.Storage.DirectoryRenamed",
                ],
                advanced_filters=[
                    StringContainsAdvancedFilter(
                        key="data.api",
                        values=[
                            "CopyBlob",
                            "PutBlob",
                            "PutBlockList",
                            "FlushWithClose",
                            "RenameFile",
                            "RenameDirectory",
                        ],
                    )
                ],
            )

            retry_policy = RetryPolicy()

            event_subscription_info = EventSubscription(
                destination=event_subscription_destination,
                filter=event_subscription_filter,
                event_delivery_schema=EventDeliverySchema.EVENT_GRID_SCHEMA,
                retry_policy=retry_policy,
            )

            eventgrid_client.event_subscriptions.begin_create_or_update(
                scope=source,
                event_subscription_name=self.event_subscription_name,
                event_subscription_info=event_subscription_info,
            ).result()

            return True

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYTHON

Source code in src/sdk/python/rtdip_sdk/pipelines/utilities/azure/autoloader_resources.py
 98
 99
100
101
102
103
104
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYTHON
    """
    return SystemType.PYTHON