Skip to content

Read from Amazon Kinesis Data Streams

SparkKinesisSource

Bases: SourceInterface

The Spark Kinesis Source is used to read data from Kinesis in a Databricks environment. Structured streaming from Kinesis is not supported in open source Spark.

Example

from rtdip_sdk.pipelines.sources import SparkKinesisSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility

# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()

kinesis_source = SparkKinesisSource(
    spark=spark,
    options={
        "awsAccessKey": "{AWS-ACCESS-KEY}",
        "awsSecretKey": "{AWS-SECRET-KEY}",
        "streamName": "{STREAM-NAME}",
        "region": "{REGION}",
        "endpoint": "https://kinesis.{REGION}.amazonaws.com",
        "initialPosition": "earliest"
    }
)

kinesis_source.read_stream()

OR

kinesis_source.read_batch()

Parameters:

Name Type Description Default
spark SparkSession

Spark Session required to read data from Kinesis

required
options dict

Options that can be specified for a Kinesis read operation (See Attributes table below). Further information on the options is available here

required

Attributes:

Name Type Description
awsAccessKey str

AWS access key.

awsSecretKey str

AWS secret access key corresponding to the access key.

streamName List[str]

The stream names to subscribe to.

region str

The region the streams are defined in.

endpoint str

The regional endpoint for Kinesis Data Streams.

initialPosition str

The point to start reading from; earliest, latest, or at_timestamp.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/kinesis.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
class SparkKinesisSource(SourceInterface):
    """
    The Spark Kinesis Source is used to read data from Kinesis in a Databricks environment.
    Structured streaming from Kinesis is **not** supported in open source Spark.

    Example
    --------
    ```python
    from rtdip_sdk.pipelines.sources import SparkKinesisSource
    from rtdip_sdk.pipelines.utilities import SparkSessionUtility

    # Not required if using Databricks
    spark = SparkSessionUtility(config={}).execute()

    kinesis_source = SparkKinesisSource(
        spark=spark,
        options={
            "awsAccessKey": "{AWS-ACCESS-KEY}",
            "awsSecretKey": "{AWS-SECRET-KEY}",
            "streamName": "{STREAM-NAME}",
            "region": "{REGION}",
            "endpoint": "https://kinesis.{REGION}.amazonaws.com",
            "initialPosition": "earliest"
        }
    )

    kinesis_source.read_stream()

    OR

    kinesis_source.read_batch()
    ```

    Parameters:
        spark (SparkSession): Spark Session required to read data from Kinesis
        options (dict): Options that can be specified for a Kinesis read operation (See Attributes table below). Further information on the options is available [here](https://docs.databricks.com/structured-streaming/kinesis.html#configuration){ target="_blank" }

    Attributes:
        awsAccessKey (str): AWS access key.
        awsSecretKey (str): AWS secret access key corresponding to the access key.
        streamName (List[str]): The stream names to subscribe to.
        region (str): The region the streams are defined in.
        endpoint (str): The regional endpoint for Kinesis Data Streams.
        initialPosition (str): The point to start reading from; earliest, latest, or at_timestamp.
    """

    spark: SparkSession
    options: dict

    def __init__(self, spark: SparkSession, options: dict) -> None:
        self.spark = spark
        self.options = options
        self.schema = KINESIS_SCHEMA

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK_DATABRICKS
        """
        return SystemType.PYSPARK_DATABRICKS

    @staticmethod
    def libraries():
        libraries = Libraries()
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def pre_read_validation(self):
        return True

    def post_read_validation(self, df: DataFrame) -> bool:
        assert df.schema == self.schema
        return True

    def read_batch(self):
        """
        Raises:
            NotImplementedError: Kinesis only supports streaming reads. To perform a batch read, use the read_stream method of this component and specify the Trigger on the write_stream to be `availableNow=True` to perform batch-like reads of cloud storage files.
        """
        raise NotImplementedError(
            "Kinesis only supports streaming reads. To perform a batch read, use the read_stream method and specify Trigger on the write_stream as `availableNow=True`"
        )

    def read_stream(self) -> DataFrame:
        """
        Reads streaming data from Kinesis. All of the data in the table is processed as well as any new data that arrives after the stream started.
        """
        try:
            return (
                self.spark.readStream.format("kinesis").options(**self.options).load()
            )
        except Py4JJavaError as e:
            logging.exception(e.errmsg)
            raise e
        except Exception as e:
            logging.exception(str(e))
            raise e

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK_DATABRICKS

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/kinesis.py
77
78
79
80
81
82
83
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK_DATABRICKS
    """
    return SystemType.PYSPARK_DATABRICKS

read_batch()

Raises:

Type Description
NotImplementedError

Kinesis only supports streaming reads. To perform a batch read, use the read_stream method of this component and specify the Trigger on the write_stream to be availableNow=True to perform batch-like reads of cloud storage files.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/kinesis.py
101
102
103
104
105
106
107
108
def read_batch(self):
    """
    Raises:
        NotImplementedError: Kinesis only supports streaming reads. To perform a batch read, use the read_stream method of this component and specify the Trigger on the write_stream to be `availableNow=True` to perform batch-like reads of cloud storage files.
    """
    raise NotImplementedError(
        "Kinesis only supports streaming reads. To perform a batch read, use the read_stream method and specify Trigger on the write_stream as `availableNow=True`"
    )

read_stream()

Reads streaming data from Kinesis. All of the data in the table is processed as well as any new data that arrives after the stream started.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/kinesis.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def read_stream(self) -> DataFrame:
    """
    Reads streaming data from Kinesis. All of the data in the table is processed as well as any new data that arrives after the stream started.
    """
    try:
        return (
            self.spark.readStream.format("kinesis").options(**self.options).load()
        )
    except Py4JJavaError as e:
        logging.exception(e.errmsg)
        raise e
    except Exception as e:
        logging.exception(str(e))
        raise e