Skip to content

ADLS Gen 2 Service Principal Connect

SparkADLSGen2SPNConnectUtility

Bases: UtilitiesInterface

Configures Spark to Connect to an ADLS Gen 2 Storage Account using a Service Principal.

Example

from rtdip_sdk.pipelines.utilities import SparkADLSGen2SPNConnectUtility
from rtdip_sdk.pipelines.utilities import SparkSessionUtility

# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()

adls_gen2_connect_utility = SparkADLSGen2SPNConnectUtility(
    spark=spark,
    storage_account="YOUR-STORAGAE-ACCOUNT-NAME",
    tenant_id="YOUR-TENANT-ID",
    client_id="YOUR-CLIENT-ID",
    client_secret="YOUR-CLIENT-SECRET"
)

result = adls_gen2_connect_utility.execute()

Parameters:

Name Type Description Default
spark SparkSession

Spark Session required to read data from cloud storage

required
storage_account str

Name of the ADLS Gen 2 Storage Account

required
tenant_id str

Tenant ID of the Service Principal

required
client_id str

Service Principal Client ID

required
client_secret str

Service Principal Client Secret

required
Source code in src/sdk/python/rtdip_sdk/pipelines/utilities/spark/adls_gen2_spn_connect.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
class SparkADLSGen2SPNConnectUtility(UtilitiesInterface):
    """
    Configures Spark to Connect to an ADLS Gen 2 Storage Account using a Service Principal.

    Example
    --------
    ```python
    from rtdip_sdk.pipelines.utilities import SparkADLSGen2SPNConnectUtility
    from rtdip_sdk.pipelines.utilities import SparkSessionUtility

    # Not required if using Databricks
    spark = SparkSessionUtility(config={}).execute()

    adls_gen2_connect_utility = SparkADLSGen2SPNConnectUtility(
        spark=spark,
        storage_account="YOUR-STORAGAE-ACCOUNT-NAME",
        tenant_id="YOUR-TENANT-ID",
        client_id="YOUR-CLIENT-ID",
        client_secret="YOUR-CLIENT-SECRET"
    )

    result = adls_gen2_connect_utility.execute()
    ```

    Parameters:
        spark (SparkSession): Spark Session required to read data from cloud storage
        storage_account (str): Name of the ADLS Gen 2 Storage Account
        tenant_id (str): Tenant ID of the Service Principal
        client_id (str): Service Principal Client ID
        client_secret (str): Service Principal Client Secret
    """

    spark: SparkSession
    storage_account: str
    tenant_id: str
    client_id: str
    client_secret: str

    def __init__(
        self,
        spark: SparkSession,
        storage_account: str,
        tenant_id: str,
        client_id: str,
        client_secret: str,
    ) -> None:
        self.spark = spark
        self.storage_account = storage_account
        self.tenant_id = tenant_id
        self.client_id = client_id
        self.client_secret = client_secret

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK
        """
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        libraries = Libraries()
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def execute(self) -> bool:
        """Executes spark configuration to connect to an ADLS Gen 2 Storage Account using a service principal"""
        try:
            adls_gen2_config = SparkConfigurationUtility(
                spark=self.spark,
                config={
                    "fs.azure.account.auth.type.{}.dfs.core.windows.net".format(
                        self.storage_account
                    ): "OAuth",
                    "fs.azure.account.oauth.provider.type.{}.dfs.core.windows.net".format(
                        self.storage_account
                    ): "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
                    "fs.azure.account.oauth2.client.id.{}.dfs.core.windows.net".format(
                        self.storage_account
                    ): self.client_id,
                    "fs.azure.account.oauth2.client.secret.{}.dfs.core.windows.net".format(
                        self.storage_account
                    ): self.client_secret,
                    "fs.azure.account.oauth2.client.endpoint.{}.dfs.core.windows.net".format(
                        self.storage_account
                    ): "https://login.microsoftonline.com/{}/oauth2/token".format(
                        self.tenant_id
                    ),
                },
            )
            adls_gen2_config.execute()
            return True

        except Py4JJavaError as e:
            logging.exception(e.errmsg)
            raise e
        except Exception as e:
            logging.exception(str(e))
            raise e

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK

Source code in src/sdk/python/rtdip_sdk/pipelines/utilities/spark/adls_gen2_spn_connect.py
76
77
78
79
80
81
82
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK
    """
    return SystemType.PYSPARK

execute()

Executes spark configuration to connect to an ADLS Gen 2 Storage Account using a service principal

Source code in src/sdk/python/rtdip_sdk/pipelines/utilities/spark/adls_gen2_spn_connect.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def execute(self) -> bool:
    """Executes spark configuration to connect to an ADLS Gen 2 Storage Account using a service principal"""
    try:
        adls_gen2_config = SparkConfigurationUtility(
            spark=self.spark,
            config={
                "fs.azure.account.auth.type.{}.dfs.core.windows.net".format(
                    self.storage_account
                ): "OAuth",
                "fs.azure.account.oauth.provider.type.{}.dfs.core.windows.net".format(
                    self.storage_account
                ): "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
                "fs.azure.account.oauth2.client.id.{}.dfs.core.windows.net".format(
                    self.storage_account
                ): self.client_id,
                "fs.azure.account.oauth2.client.secret.{}.dfs.core.windows.net".format(
                    self.storage_account
                ): self.client_secret,
                "fs.azure.account.oauth2.client.endpoint.{}.dfs.core.windows.net".format(
                    self.storage_account
                ): "https://login.microsoftonline.com/{}/oauth2/token".format(
                    self.tenant_id
                ),
            },
        )
        adls_gen2_config.execute()
        return True

    except Py4JJavaError as e:
        logging.exception(e.errmsg)
        raise e
    except Exception as e:
        logging.exception(str(e))
        raise e