Skip to content

Write to Kafka

SparkKafkaDestination

Bases: DestinationInterface

This Spark destination class is used to write batch or streaming data from Kafka. Required and optional configurations can be found in the Attributes tables below.

Additionally, there are more optional configurations which can be found here.

For compatibility between Spark and Kafka, the columns in the input dataframe are concatenated into one 'value' column of JSON string.

Example

from rtdip_sdk.pipelines.destinations import SparkKafkaDestination

kafka_destination = SparkKafkaDestination(
    data=df,
    options={
        "kafka.bootstrap.servers": "host1:port1,host2:port2"
    },
    trigger="10 seconds",
    query_name="KafkaDestination",
    query_wait_interval=None
)

kafka_destination.write_stream()

OR

kafka_destination.write_batch()

Parameters:

Name Type Description Default
data DataFrame

Dataframe to be written to Kafka

required
options dict

A dictionary of Kafka configurations (See Attributes tables below). For more information on configuration options see here

required
trigger optional str

Frequency of the write operation. Specify "availableNow" to execute a trigger once, otherwise specify a time period such as "30 seconds", "5 minutes". Set to "0 seconds" if you do not want to use a trigger. (stream) Default is 10 seconds

'10 seconds'
query_name str

Unique name for the query in associated SparkSession

'KafkaDestination'
query_wait_interval optional int

If set, waits for the streaming query to complete before returning. (stream) Default is None

None

The following options must be set for the Kafka destination for both batch and streaming queries.

Attributes:

Name Type Description
kafka.bootstrap.servers A comma-separated list of hostī¸°port

The Kafka "bootstrap.servers" configuration. (Streaming and Batch)

The following configurations are optional:

Attributes:

Name Type Description
topic str

Sets the topic that all rows will be written to in Kafka. This option overrides any topic column that may exist in the data. (Streaming and Batch)

includeHeaders bool

Whether to include the Kafka headers in the row. (Streaming and Batch)

Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kafka.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
class SparkKafkaDestination(DestinationInterface):
    """
    This Spark destination class is used to write batch or streaming data from Kafka. Required and optional configurations can be found in the Attributes tables below.

    Additionally, there are more optional configurations which can be found [here.](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html){ target="_blank" }

    For compatibility between Spark and Kafka, the columns in the input dataframe are concatenated into one 'value' column of JSON string.

    Example
    --------
    ```python
    from rtdip_sdk.pipelines.destinations import SparkKafkaDestination

    kafka_destination = SparkKafkaDestination(
        data=df,
        options={
            "kafka.bootstrap.servers": "host1:port1,host2:port2"
        },
        trigger="10 seconds",
        query_name="KafkaDestination",
        query_wait_interval=None
    )

    kafka_destination.write_stream()

    OR

    kafka_destination.write_batch()
    ```

    Parameters:
        data (DataFrame): Dataframe to be written to Kafka
        options (dict): A dictionary of Kafka configurations (See Attributes tables below). For more information on configuration options see [here](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html){ target="_blank" }
        trigger (optional str): Frequency of the write operation. Specify "availableNow" to execute a trigger once, otherwise specify a time period such as "30 seconds", "5 minutes". Set to "0 seconds" if you do not want to use a trigger. (stream) Default is 10 seconds
        query_name (str): Unique name for the query in associated SparkSession
        query_wait_interval (optional int): If set, waits for the streaming query to complete before returning. (stream) Default is None

    The following options must be set for the Kafka destination for both batch and streaming queries.

    Attributes:
        kafka.bootstrap.servers (A comma-separated list of hostī¸°port): The Kafka "bootstrap.servers" configuration. (Streaming and Batch)

    The following configurations are optional:

    Attributes:
        topic (str):Sets the topic that all rows will be written to in Kafka. This option overrides any topic column that may exist in the data. (Streaming and Batch)
        includeHeaders (bool): Whether to include the Kafka headers in the row. (Streaming and Batch)

    """

    data: DataFrame
    options: dict
    trigger: str
    query_name: str
    query_wait_interval: int

    def __init__(
        self,
        data: DataFrame,
        options: dict,
        trigger="10 seconds",
        query_name="KafkaDestination",
        query_wait_interval: int = None,
    ) -> None:
        self.data = data
        self.options = options
        self.trigger = trigger
        self.query_name = query_name
        self.query_wait_interval = query_wait_interval

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK
        """
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        spark_libraries = Libraries()
        spark_libraries.add_maven_library(get_default_package("spark_sql_kafka"))
        return spark_libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def pre_write_validation(self):
        return True

    def post_write_validation(self):
        return True

    def write_batch(self):
        """
        Writes batch data to Kafka.
        """
        try:
            return (
                self.data.select(to_json(struct("*")).alias("value"))
                .write.format("kafka")
                .options(**self.options)
                .save()
            )

        except Py4JJavaError as e:
            logging.exception(e.errmsg)
            raise e
        except Exception as e:
            logging.exception(str(e))
            raise e

    def write_stream(self):
        """
        Writes steaming data to Kafka.
        """
        try:
            TRIGGER_OPTION = (
                {"availableNow": True}
                if self.trigger == "availableNow"
                else {"processingTime": self.trigger}
            )
            query = (
                self.data.select(to_json(struct("*")).alias("value"))
                .writeStream.trigger(**TRIGGER_OPTION)
                .format("kafka")
                .options(**self.options)
                .queryName(self.query_name)
                .start()
            )

            if self.query_wait_interval:
                while query.isActive:
                    if query.lastProgress:
                        logging.info(query.lastProgress)
                    time.sleep(self.query_wait_interval)

        except Py4JJavaError as e:
            logging.exception(e.errmsg)
            raise e
        except Exception as e:
            logging.exception(str(e))
            raise e

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK

Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kafka.py
 96
 97
 98
 99
100
101
102
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK
    """
    return SystemType.PYSPARK

write_batch()

Writes batch data to Kafka.

Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kafka.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def write_batch(self):
    """
    Writes batch data to Kafka.
    """
    try:
        return (
            self.data.select(to_json(struct("*")).alias("value"))
            .write.format("kafka")
            .options(**self.options)
            .save()
        )

    except Py4JJavaError as e:
        logging.exception(e.errmsg)
        raise e
    except Exception as e:
        logging.exception(str(e))
        raise e

write_stream()

Writes steaming data to Kafka.

Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kafka.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def write_stream(self):
    """
    Writes steaming data to Kafka.
    """
    try:
        TRIGGER_OPTION = (
            {"availableNow": True}
            if self.trigger == "availableNow"
            else {"processingTime": self.trigger}
        )
        query = (
            self.data.select(to_json(struct("*")).alias("value"))
            .writeStream.trigger(**TRIGGER_OPTION)
            .format("kafka")
            .options(**self.options)
            .queryName(self.query_name)
            .start()
        )

        if self.query_wait_interval:
            while query.isActive:
                if query.lastProgress:
                    logging.info(query.lastProgress)
                time.sleep(self.query_wait_interval)

    except Py4JJavaError as e:
        logging.exception(e.errmsg)
        raise e
    except Exception as e:
        logging.exception(str(e))
        raise e