Skip to content

Read from Amazon Kinesis Data Streams

SparkKinesisSource

Bases: SourceInterface

The Spark Kinesis Source is used to read data from Kinesis in a Databricks environment. Structured streaming from Kinesis is not supported in open source Spark. Args: spark (SparkSession): Spark Session required to read data from Kinesis options (dict): Options that can be specified for a Kinesis read operation (See Attributes table below). Further information on the options is available here Attributes: awsAccessKey (str): AWS access key. awsSecretKey (str): AWS secret access key corresponding to the access key. streamName (List[str]): The stream names to subscribe to. region (str): The region the streams are defined in. endpoint (str): The regional endpoint for Kinesis Data Streams. initialPosition (str): The point to start reading from; earliest, latest, or at_timestamp.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/kinesis.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class SparkKinesisSource(SourceInterface):
    """
    The Spark Kinesis Source is used to read data from Kinesis in a Databricks environment.
    Structured streaming from Kinesis is **not** supported in open source Spark.
    Args:
        spark (SparkSession): Spark Session required to read data from Kinesis
        options (dict): Options that can be specified for a Kinesis read operation (See Attributes table below). Further information on the options is available [here](https://docs.databricks.com/structured-streaming/kinesis.html#configuration){ target="_blank" }
    Attributes:
        awsAccessKey (str): AWS access key.
        awsSecretKey (str): AWS secret access key corresponding to the access key.
        streamName (List[str]): The stream names to subscribe to.
        region (str): The region the streams are defined in.
        endpoint (str): The regional endpoint for Kinesis Data Streams.
        initialPosition (str): The point to start reading from; earliest, latest, or at_timestamp.
    """

    spark: SparkSession
    options: dict

    def __init__(self, spark: SparkSession, options: dict) -> None:
        self.spark = spark
        self.options = options
        self.schema = KINESIS_SCHEMA

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK_DATABRICKS
        """
        return SystemType.PYSPARK_DATABRICKS

    @staticmethod
    def libraries():
        libraries = Libraries()
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def pre_read_validation(self):
        return True

    def post_read_validation(self, df: DataFrame) -> bool:
        assert df.schema == self.schema
        return True

    def read_batch(self):
        """
        Raises:
            NotImplementedError: Kinesis only supports streaming reads. To perform a batch read, use the read_stream method of this component and specify the Trigger on the write_stream to be `availableNow=True` to perform batch-like reads of cloud storage files.
        """
        raise NotImplementedError(
            "Kinesis only supports streaming reads. To perform a batch read, use the read_stream method and specify Trigger on the write_stream as `availableNow=True`"
        )

    def read_stream(self) -> DataFrame:
        """
        Reads streaming data from Kinesis. All of the data in the table is processed as well as any new data that arrives after the stream started.
        """
        try:
            return (
                self.spark.readStream.format("kinesis").options(**self.options).load()
            )
        except Py4JJavaError as e:
            logging.exception(e.errmsg)
            raise e
        except Exception as e:
            logging.exception(str(e))
            raise e

read_batch()

Raises:

Type Description
NotImplementedError

Kinesis only supports streaming reads. To perform a batch read, use the read_stream method of this component and specify the Trigger on the write_stream to be availableNow=True to perform batch-like reads of cloud storage files.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/kinesis.py
71
72
73
74
75
76
77
78
def read_batch(self):
    """
    Raises:
        NotImplementedError: Kinesis only supports streaming reads. To perform a batch read, use the read_stream method of this component and specify the Trigger on the write_stream to be `availableNow=True` to perform batch-like reads of cloud storage files.
    """
    raise NotImplementedError(
        "Kinesis only supports streaming reads. To perform a batch read, use the read_stream method and specify Trigger on the write_stream as `availableNow=True`"
    )

read_stream()

Reads streaming data from Kinesis. All of the data in the table is processed as well as any new data that arrives after the stream started.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/kinesis.py
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def read_stream(self) -> DataFrame:
    """
    Reads streaming data from Kinesis. All of the data in the table is processed as well as any new data that arrives after the stream started.
    """
    try:
        return (
            self.spark.readStream.format("kinesis").options(**self.options).load()
        )
    except Py4JJavaError as e:
        logging.exception(e.errmsg)
        raise e
    except Exception as e:
        logging.exception(str(e))
        raise e

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK_DATABRICKS

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/kinesis.py
47
48
49
50
51
52
53
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK_DATABRICKS
    """
    return SystemType.PYSPARK_DATABRICKS