Skip to content

Write to Kinesis

SparkKinesisDestination

Bases: DestinationInterface

This Kinesis destination class is used to write batch or streaming data to Kinesis. Kinesis configurations need to be specified as options in a dictionary.

Example

from rtdip_sdk.pipelines.destinations import SparkKinesisDestination

kinesis_destination = SparkKinesisDestination(
    data=df,
    options={
        "endpointUrl": "https://kinesis.{REGION}.amazonaws.com",
        "awsAccessKey": "{YOUR-AWS-ACCESS-KEY}",
        "awsSecretKey": "{YOUR-AWS-SECRET-KEY}",
        "streamName": "{YOUR-STREAM-NAME}"
    },
    mode="update",
    trigger="10 seconds",
    query_name="KinesisDestination",
    query_wait_interval=None
)

kinesis_destination.write_stream()

OR

kinesis_destination.write_batch()

Parameters:

Name Type Description Default
data DataFrame

Dataframe to be written to Delta

required
options dict

A dictionary of Kinesis configurations (See Attributes table below). All Configuration options for Kinesis can be found here.

required
mode str

Method of writing to Kinesis - append, complete, update

'update'
trigger optional str

Frequency of the write operation. Specify "availableNow" to execute a trigger once, otherwise specify a time period such as "30 seconds", "5 minutes". Set to "0 seconds" if you do not want to use a trigger. (stream) Default is 10 seconds

'10 seconds'
query_name str

Unique name for the query in associated SparkSession

'KinesisDestination'
query_wait_interval optional int

If set, waits for the streaming query to complete before returning. (stream) Default is None

None

Attributes:

Name Type Description
endpointUrl str

Endpoint of the kinesis stream.

awsAccessKey str

AWS access key.

awsSecretKey str

AWS secret access key corresponding to the access key.

streamName List[str]

Name of the streams in Kinesis to write to.

Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kinesis.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
class SparkKinesisDestination(DestinationInterface):
    """
    This Kinesis destination class is used to write batch or streaming data to Kinesis. Kinesis configurations need to be specified as options in a dictionary.

    Example
    --------
    ```python
    from rtdip_sdk.pipelines.destinations import SparkKinesisDestination

    kinesis_destination = SparkKinesisDestination(
        data=df,
        options={
            "endpointUrl": "https://kinesis.{REGION}.amazonaws.com",
            "awsAccessKey": "{YOUR-AWS-ACCESS-KEY}",
            "awsSecretKey": "{YOUR-AWS-SECRET-KEY}",
            "streamName": "{YOUR-STREAM-NAME}"
        },
        mode="update",
        trigger="10 seconds",
        query_name="KinesisDestination",
        query_wait_interval=None
    )

    kinesis_destination.write_stream()

    OR

    kinesis_destination.write_batch()
    ```

    Parameters:
        data (DataFrame): Dataframe to be written to Delta
        options (dict): A dictionary of Kinesis configurations (See Attributes table below). All Configuration options for Kinesis can be found [here.](https://github.com/qubole/kinesis-sql#kinesis-sink-configuration){ target="_blank" }
        mode (str): Method of writing to Kinesis - append, complete, update
        trigger (optional str): Frequency of the write operation. Specify "availableNow" to execute a trigger once, otherwise specify a time period such as "30 seconds", "5 minutes". Set to "0 seconds" if you do not want to use a trigger. (stream) Default is 10 seconds
        query_name (str): Unique name for the query in associated SparkSession
        query_wait_interval (optional int): If set, waits for the streaming query to complete before returning. (stream) Default is None

    Attributes:
        endpointUrl (str): Endpoint of the kinesis stream.
        awsAccessKey (str): AWS access key.
        awsSecretKey (str): AWS secret access key corresponding to the access key.
        streamName (List[str]): Name of the streams in Kinesis to write to.
    """

    data: DataFrame
    options: dict
    mode: str
    trigger: str
    query_name: str
    query_wait_interval: int

    def __init__(
        self,
        data: DataFrame,
        options: dict,
        mode: str = "update",
        trigger: str = "10 seconds",
        query_name="KinesisDestination",
        query_wait_interval: int = None,
    ) -> None:
        self.data = data
        self.options = options
        self.mode = mode
        self.trigger = trigger
        self.query_name = query_name
        self.query_wait_interval = query_wait_interval

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK_DATABRICKS
        """
        return SystemType.PYSPARK_DATABRICKS

    @staticmethod
    def libraries():
        spark_libraries = Libraries()
        return spark_libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def pre_write_validation(self):
        return True

    def post_write_validation(self):
        return True

    def write_batch(self):
        """
        Writes batch data to Kinesis.
        """
        try:
            return self.data.write.format("kinesis").options(**self.options).save()
        except Py4JJavaError as e:
            logging.exception(e.errmsg)
            raise e
        except Exception as e:
            logging.exception(str(e))
            raise e

    def write_stream(self):
        """
        Writes steaming data to Kinesis.
        """
        try:
            TRIGGER_OPTION = (
                {"availableNow": True}
                if self.trigger == "availableNow"
                else {"processingTime": self.trigger}
            )
            query = (
                self.data.writeStream.trigger(**TRIGGER_OPTION)
                .format("kinesis")
                .outputMode(self.mode)
                .options(**self.options)
                .queryName(self.query_name)
                .start()
            )

            if self.query_wait_interval:
                while query.isActive:
                    if query.lastProgress:
                        logging.info(query.lastProgress)
                    time.sleep(self.query_wait_interval)

        except Py4JJavaError as e:
            logging.exception(e.errmsg)
            raise e
        except Exception as e:
            logging.exception(str(e))
            raise e

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK_DATABRICKS

Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kinesis.py
91
92
93
94
95
96
97
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK_DATABRICKS
    """
    return SystemType.PYSPARK_DATABRICKS

write_batch()

Writes batch data to Kinesis.

Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kinesis.py
114
115
116
117
118
119
120
121
122
123
124
125
def write_batch(self):
    """
    Writes batch data to Kinesis.
    """
    try:
        return self.data.write.format("kinesis").options(**self.options).save()
    except Py4JJavaError as e:
        logging.exception(e.errmsg)
        raise e
    except Exception as e:
        logging.exception(str(e))
        raise e

write_stream()

Writes steaming data to Kinesis.

Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kinesis.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def write_stream(self):
    """
    Writes steaming data to Kinesis.
    """
    try:
        TRIGGER_OPTION = (
            {"availableNow": True}
            if self.trigger == "availableNow"
            else {"processingTime": self.trigger}
        )
        query = (
            self.data.writeStream.trigger(**TRIGGER_OPTION)
            .format("kinesis")
            .outputMode(self.mode)
            .options(**self.options)
            .queryName(self.query_name)
            .start()
        )

        if self.query_wait_interval:
            while query.isActive:
                if query.lastProgress:
                    logging.info(query.lastProgress)
                time.sleep(self.query_wait_interval)

    except Py4JJavaError as e:
        logging.exception(e.errmsg)
        raise e
    except Exception as e:
        logging.exception(str(e))
        raise e