Skip to content

Read from Autoloader

DataBricksAutoLoaderSource

Bases: SourceInterface

The Spark Auto Loader is used to read new data files as they arrive in cloud storage. Further information on Auto Loader is available here

Parameters:

Name Type Description Default
spark SparkSession

Spark Session required to read data from cloud storage

required
options dict

Options that can be specified for configuring the Auto Loader. Further information on the options available are here

required
path str

The cloud storage path

required
format str

Specifies the file format to be read. Supported formats are available here

required
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/autoloader.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
class DataBricksAutoLoaderSource(SourceInterface):
    '''
    The Spark Auto Loader is used to read new data files as they arrive in cloud storage. Further information on Auto Loader is available [here](https://docs.databricks.com/ingestion/auto-loader/index.html)

    Args:
        spark (SparkSession): Spark Session required to read data from cloud storage
        options (dict): Options that can be specified for configuring the Auto Loader. Further information on the options available are [here](https://docs.databricks.com/ingestion/auto-loader/options.html)
        path (str): The cloud storage path
        format (str): Specifies the file format to be read. Supported formats are available [here](https://docs.databricks.com/ingestion/auto-loader/options.html#file-format-options)
    ''' 
    spark: SparkSession
    options: dict
    path: str

    def __init__(self, spark: SparkSession, options: dict, path: str, format: str) -> None:
        self.spark = spark
        self.options = options
        self.path = path
        self.options["cloudFiles.format"] = format

    @staticmethod
    def system_type():
        '''
        Attributes:
            SystemType (Environment): Requires PYSPARK on Databricks
        '''        
        return SystemType.PYSPARK_DATABRICKS

    @staticmethod
    def libraries():
        libraries = Libraries()
        libraries.add_maven_library(DEFAULT_PACKAGES["spark_delta_core"])
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def pre_read_validation(self):
        return True

    def post_read_validation(self, df: DataFrame):
        return True

    def read_batch(self):
        '''
        Raises:
            NotImplementedError: Auto Loader only supports streaming reads. To perform a batch read, use the read_stream method of this component and specify the Trigger on the write_stream to be `availableNow=True` to perform batch-like reads of cloud storage files.
        '''
        raise NotImplementedError("Auto Loader only supports streaming reads. To perform a batch read, use the read_stream method and specify Trigger on the write_stream as `availableNow=True`")

    def read_stream(self) -> DataFrame:
        '''
        Performs streaming reads of files in cloud storage.
        '''
        try:
            return (self.spark
                .readStream
                .format("cloudFiles")
                .options(**self.options)
                .load(self.path)
            )

        except Exception as e:
            logging.exception(str(e))
            raise e

read_batch()

Raises:

Type Description
NotImplementedError

Auto Loader only supports streaming reads. To perform a batch read, use the read_stream method of this component and specify the Trigger on the write_stream to be availableNow=True to perform batch-like reads of cloud storage files.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/autoloader.py
66
67
68
69
70
71
def read_batch(self):
    '''
    Raises:
        NotImplementedError: Auto Loader only supports streaming reads. To perform a batch read, use the read_stream method of this component and specify the Trigger on the write_stream to be `availableNow=True` to perform batch-like reads of cloud storage files.
    '''
    raise NotImplementedError("Auto Loader only supports streaming reads. To perform a batch read, use the read_stream method and specify Trigger on the write_stream as `availableNow=True`")

read_stream()

Performs streaming reads of files in cloud storage.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/autoloader.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def read_stream(self) -> DataFrame:
    '''
    Performs streaming reads of files in cloud storage.
    '''
    try:
        return (self.spark
            .readStream
            .format("cloudFiles")
            .options(**self.options)
            .load(self.path)
        )

    except Exception as e:
        logging.exception(str(e))
        raise e

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK on Databricks

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/autoloader.py
42
43
44
45
46
47
48
@staticmethod
def system_type():
    '''
    Attributes:
        SystemType (Environment): Requires PYSPARK on Databricks
    '''        
    return SystemType.PYSPARK_DATABRICKS