Skip to content

Read from Autoloader

DataBricksAutoLoaderSource

Bases: SourceInterface

The Spark Auto Loader is used to read new data files as they arrive in cloud storage. Further information on Auto Loader is available here

Example

from rtdip_sdk.pipelines.sources import DataBricksAutoLoaderSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility

# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()

options = {}
path = "abfss://{FILE-SYSTEM}@{ACCOUNT-NAME}.dfs.core.windows.net/{PATH}/{FILE-NAME}
format = "{DESIRED-FILE-FORMAT}"

DataBricksAutoLoaderSource(spark, options, path, format).read_stream()

OR

DataBricksAutoLoaderSource(spark, options, path, format).read_batch()
from rtdip_sdk.pipelines.sources import DataBricksAutoLoaderSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility

# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()

options = {}
path = "https://s3.{REGION-CODE}.amazonaws.com/{BUCKET-NAME}/{KEY-NAME}"
format = "{DESIRED-FILE-FORMAT}"

DataBricksAutoLoaderSource(spark, options, path, format).read_stream()

OR

DataBricksAutoLoaderSource(spark, options, path, format).read_batch()
from rtdip_sdk.pipelines.sources import DataBricksAutoLoaderSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility

# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()

options = {}
path = "gs://{BUCKET-NAME}/{FILE-PATH}"
format = "{DESIRED-FILE-FORMAT}"

DataBricksAutoLoaderSource(spark, options, path, format).read_stream()

OR

DataBricksAutoLoaderSource(spark, options, path, format).read_batch()

Parameters:

Name Type Description Default
spark SparkSession

Spark Session required to read data from cloud storage

required
options dict

Options that can be specified for configuring the Auto Loader. Further information on the options available are here

required
path str

The cloud storage path

required
format str

Specifies the file format to be read. Supported formats are available here

required
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/autoloader.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
class DataBricksAutoLoaderSource(SourceInterface):
    """
    The Spark Auto Loader is used to read new data files as they arrive in cloud storage. Further information on Auto Loader is available [here](https://docs.databricks.com/ingestion/auto-loader/index.html)

    Example
    --------
    === "ADLS Gen2"

        ```python
        from rtdip_sdk.pipelines.sources import DataBricksAutoLoaderSource
        from rtdip_sdk.pipelines.utilities import SparkSessionUtility

        # Not required if using Databricks
        spark = SparkSessionUtility(config={}).execute()

        options = {}
        path = "abfss://{FILE-SYSTEM}@{ACCOUNT-NAME}.dfs.core.windows.net/{PATH}/{FILE-NAME}
        format = "{DESIRED-FILE-FORMAT}"

        DataBricksAutoLoaderSource(spark, options, path, format).read_stream()

        OR

        DataBricksAutoLoaderSource(spark, options, path, format).read_batch()
        ```
    === "AWS S3"

        ```python
        from rtdip_sdk.pipelines.sources import DataBricksAutoLoaderSource
        from rtdip_sdk.pipelines.utilities import SparkSessionUtility

        # Not required if using Databricks
        spark = SparkSessionUtility(config={}).execute()

        options = {}
        path = "https://s3.{REGION-CODE}.amazonaws.com/{BUCKET-NAME}/{KEY-NAME}"
        format = "{DESIRED-FILE-FORMAT}"

        DataBricksAutoLoaderSource(spark, options, path, format).read_stream()

        OR

        DataBricksAutoLoaderSource(spark, options, path, format).read_batch()
        ```
    === "GCS"

        ```python
        from rtdip_sdk.pipelines.sources import DataBricksAutoLoaderSource
        from rtdip_sdk.pipelines.utilities import SparkSessionUtility

        # Not required if using Databricks
        spark = SparkSessionUtility(config={}).execute()

        options = {}
        path = "gs://{BUCKET-NAME}/{FILE-PATH}"
        format = "{DESIRED-FILE-FORMAT}"

        DataBricksAutoLoaderSource(spark, options, path, format).read_stream()

        OR

        DataBricksAutoLoaderSource(spark, options, path, format).read_batch()
        ```

    Parameters:
        spark (SparkSession): Spark Session required to read data from cloud storage
        options (dict): Options that can be specified for configuring the Auto Loader. Further information on the options available are [here](https://docs.databricks.com/ingestion/auto-loader/options.html)
        path (str): The cloud storage path
        format (str): Specifies the file format to be read. Supported formats are available [here](https://docs.databricks.com/ingestion/auto-loader/options.html#file-format-options)
    """

    spark: SparkSession
    options: dict
    path: str

    def __init__(
        self, spark: SparkSession, options: dict, path: str, format: str
    ) -> None:
        self.spark = spark
        self.options = options
        self.path = path
        self.options["cloudFiles.format"] = format

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK on Databricks
        """
        return SystemType.PYSPARK_DATABRICKS

    @staticmethod
    def libraries():
        libraries = Libraries()
        libraries.add_maven_library(get_default_package("spark_delta_core"))
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def pre_read_validation(self):
        return True

    def post_read_validation(self, df: DataFrame):
        return True

    def read_batch(self):
        """
        Raises:
            NotImplementedError: Auto Loader only supports streaming reads. To perform a batch read, use the read_stream method of this component and specify the Trigger on the write_stream to be `availableNow` to perform batch-like reads of cloud storage files.
        """
        raise NotImplementedError(
            "Auto Loader only supports streaming reads. To perform a batch read, use the read_stream method and specify Trigger on the write_stream as `availableNow`"
        )

    def read_stream(self) -> DataFrame:
        """
        Performs streaming reads of files in cloud storage.
        """
        try:
            return (
                self.spark.readStream.format("cloudFiles")
                .options(**self.options)
                .load(self.path)
            )

        except Exception as e:
            logging.exception(str(e))
            raise e

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK on Databricks

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/autoloader.py
106
107
108
109
110
111
112
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK on Databricks
    """
    return SystemType.PYSPARK_DATABRICKS

read_batch()

Raises:

Type Description
NotImplementedError

Auto Loader only supports streaming reads. To perform a batch read, use the read_stream method of this component and specify the Trigger on the write_stream to be availableNow to perform batch-like reads of cloud storage files.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/autoloader.py
130
131
132
133
134
135
136
137
def read_batch(self):
    """
    Raises:
        NotImplementedError: Auto Loader only supports streaming reads. To perform a batch read, use the read_stream method of this component and specify the Trigger on the write_stream to be `availableNow` to perform batch-like reads of cloud storage files.
    """
    raise NotImplementedError(
        "Auto Loader only supports streaming reads. To perform a batch read, use the read_stream method and specify Trigger on the write_stream as `availableNow`"
    )

read_stream()

Performs streaming reads of files in cloud storage.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/autoloader.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
def read_stream(self) -> DataFrame:
    """
    Performs streaming reads of files in cloud storage.
    """
    try:
        return (
            self.spark.readStream.format("cloudFiles")
            .options(**self.options)
            .load(self.path)
        )

    except Exception as e:
        logging.exception(str(e))
        raise e