Skip to content

Write to Kinesis

SparkKinesisDestination

Bases: DestinationInterface

This Kinesis destination class is used to write batch or streaming data to Kinesis. Kinesis configurations need to be specified as options in a dictionary. Args: data (DataFrame): Dataframe to be written to Delta options (dict): A dictionary of Kinesis configurations (See Attributes table below). All Configuration options for Kinesis can be found here. mode (str): Method of writing to Kinesis - append, complete, update trigger (str): Frequency of the write operation. Specify "availableNow" to execute a trigger once, otherwise specify a time period such as "30 seconds", "5 minutes" query_name (str): Unique name for the query in associated SparkSession

Attributes:

Name Type Description
endpointUrl str

Endpoint of the kinesis stream.

awsAccessKey str

AWS access key.

awsSecretKey str

AWS secret access key corresponding to the access key.

streamName List[str]

Name of the streams in Kinesis to write to.

Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kinesis.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
class SparkKinesisDestination(DestinationInterface):
    """
    This Kinesis destination class is used to write batch or streaming data to Kinesis. Kinesis configurations need to be specified as options in a dictionary.
    Args:
        data (DataFrame): Dataframe to be written to Delta
        options (dict): A dictionary of Kinesis configurations (See Attributes table below). All Configuration options for Kinesis can be found [here.](https://github.com/qubole/kinesis-sql#kinesis-sink-configuration){ target="_blank" }
        mode (str): Method of writing to Kinesis - append, complete, update
        trigger (str): Frequency of the write operation. Specify "availableNow" to execute a trigger once, otherwise specify a time period such as "30 seconds", "5 minutes"
        query_name (str): Unique name for the query in associated SparkSession

    Attributes:
        endpointUrl (str): Endpoint of the kinesis stream.
        awsAccessKey (str): AWS access key.
        awsSecretKey (str): AWS secret access key corresponding to the access key.
        streamName (List[str]): Name of the streams in Kinesis to write to.
    """

    def __init__(
        self,
        data: DataFrame,
        options: dict,
        mode: str = "update",
        trigger: str = "10 seconds",
        query_name="KinesisDestination",
    ) -> None:
        self.data = data
        self.options = options
        self.mode = mode
        self.trigger = trigger
        self.query_name = query_name

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK_DATABRICKS
        """
        return SystemType.PYSPARK_DATABRICKS

    @staticmethod
    def libraries():
        spark_libraries = Libraries()
        return spark_libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def pre_write_validation(self):
        return True

    def post_write_validation(self):
        return True

    def write_batch(self):
        """
        Writes batch data to Kinesis.
        """
        try:
            return self.data.write.format("kinesis").options(**self.options).save()
        except Py4JJavaError as e:
            logging.exception(e.errmsg)
            raise e
        except Exception as e:
            logging.exception(str(e))
            raise e

    def write_stream(self):
        """
        Writes steaming data to Kinesis.
        """
        try:
            TRIGGER_OPTION = (
                {"availableNow": True}
                if self.trigger == "availableNow"
                else {"processingTime": self.trigger}
            )
            query = (
                self.data.writeStream.trigger(**TRIGGER_OPTION)
                .format("kinesis")
                .outputMode(self.mode)
                .options(**self.options)
                .queryName(self.query_name)
                .start()
            )
            while query.isActive:
                if query.lastProgress:
                    logging.info(query.lastProgress)
                time.sleep(10)
        except Py4JJavaError as e:
            logging.exception(e.errmsg)
            raise e
        except Exception as e:
            logging.exception(str(e))
            raise e

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK_DATABRICKS

Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kinesis.py
54
55
56
57
58
59
60
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK_DATABRICKS
    """
    return SystemType.PYSPARK_DATABRICKS

write_batch()

Writes batch data to Kinesis.

Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kinesis.py
77
78
79
80
81
82
83
84
85
86
87
88
def write_batch(self):
    """
    Writes batch data to Kinesis.
    """
    try:
        return self.data.write.format("kinesis").options(**self.options).save()
    except Py4JJavaError as e:
        logging.exception(e.errmsg)
        raise e
    except Exception as e:
        logging.exception(str(e))
        raise e

write_stream()

Writes steaming data to Kinesis.

Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kinesis.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def write_stream(self):
    """
    Writes steaming data to Kinesis.
    """
    try:
        TRIGGER_OPTION = (
            {"availableNow": True}
            if self.trigger == "availableNow"
            else {"processingTime": self.trigger}
        )
        query = (
            self.data.writeStream.trigger(**TRIGGER_OPTION)
            .format("kinesis")
            .outputMode(self.mode)
            .options(**self.options)
            .queryName(self.query_name)
            .start()
        )
        while query.isActive:
            if query.lastProgress:
                logging.info(query.lastProgress)
            time.sleep(10)
    except Py4JJavaError as e:
        logging.exception(e.errmsg)
        raise e
    except Exception as e:
        logging.exception(str(e))
        raise e