Skip to content

Write to Kafka

SparkKafkaDestination

Bases: DestinationInterface

This Spark destination class is used to write batch or streaming data from Kafka. Required and optional configurations can be found in the Attributes tables below.

Additionally, there are more optional configurations which can be found here.

For compatibility between Spark and Kafka, the columns in the input dataframe are concatenated into one 'value' column of JSON string.

Parameters:

Name Type Description Default
data DataFrame

Dataframe to be written to Kafka

required
options dict

A dictionary of Kafka configurations (See Attributes tables below). For more information on configuration options see here

required
trigger str

Frequency of the write operation. Specify "availableNow" to execute a trigger once, otherwise specify a time period such as "30 seconds", "5 minutes"

'10 seconds'
query_name str

Unique name for the query in associated SparkSession

'KafkaDestination'

The following options must be set for the Kafka destination for both batch and streaming queries.

Attributes:

Name Type Description
kafka.bootstrap.servers A comma-separated list of hostport

The Kafka "bootstrap.servers" configuration. (Streaming and Batch)

The following configurations are optional:

Attributes:

Name Type Description
topic str

Sets the topic that all rows will be written to in Kafka. This option overrides any topic column that may exist in the data. (Streaming and Batch)

includeHeaders bool

Whether to include the Kafka headers in the row. (Streaming and Batch)

Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kafka.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
class SparkKafkaDestination(DestinationInterface):
    """
    This Spark destination class is used to write batch or streaming data from Kafka. Required and optional configurations can be found in the Attributes tables below.

    Additionally, there are more optional configurations which can be found [here.](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html){ target="_blank" }

    For compatibility between Spark and Kafka, the columns in the input dataframe are concatenated into one 'value' column of JSON string.

    Args:
        data (DataFrame): Dataframe to be written to Kafka
        options (dict): A dictionary of Kafka configurations (See Attributes tables below). For more information on configuration options see [here](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html){ target="_blank" }
        trigger (str): Frequency of the write operation. Specify "availableNow" to execute a trigger once, otherwise specify a time period such as "30 seconds", "5 minutes"
        query_name (str): Unique name for the query in associated SparkSession

    The following options must be set for the Kafka destination for both batch and streaming queries.

    Attributes:
        kafka.bootstrap.servers (A comma-separated list of hostport): The Kafka "bootstrap.servers" configuration. (Streaming and Batch)

    The following configurations are optional:

    Attributes:
        topic (str):Sets the topic that all rows will be written to in Kafka. This option overrides any topic column that may exist in the data. (Streaming and Batch)
        includeHeaders (bool): Whether to include the Kafka headers in the row. (Streaming and Batch)

    """

    def __init__(
        self,
        data: DataFrame,
        options: dict,
        trigger="10 seconds",
        query_name="KafkaDestination",
    ) -> None:
        self.data = data
        self.options = options
        self.trigger = trigger
        self.query_name = query_name

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK
        """
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        spark_libraries = Libraries()
        spark_libraries.add_maven_library(get_default_package("spark_sql_kafka"))
        return spark_libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def pre_write_validation(self):
        return True

    def post_write_validation(self):
        return True

    def write_batch(self):
        """
        Writes batch data to Kafka.
        """
        try:
            return (
                self.data.select(to_json(struct("*")).alias("value"))
                .write.format("kafka")
                .options(**self.options)
                .save()
            )

        except Py4JJavaError as e:
            logging.exception(e.errmsg)
            raise e
        except Exception as e:
            logging.exception(str(e))
            raise e

    def write_stream(self):
        """
        Writes steaming data to Kafka.
        """
        try:
            TRIGGER_OPTION = (
                {"availableNow": True}
                if self.trigger == "availableNow"
                else {"processingTime": self.trigger}
            )
            query = (
                self.data.select(to_json(struct("*")).alias("value"))
                .writeStream.trigger(**TRIGGER_OPTION)
                .format("kafka")
                .options(**self.options)
                .queryName(self.query_name)
                .start()
            )
            while query.isActive:
                if query.lastProgress:
                    logging.info(query.lastProgress)
                time.sleep(10)

        except Py4JJavaError as e:
            logging.exception(e.errmsg)
            raise e
        except Exception as e:
            logging.exception(str(e))
            raise e

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK

Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kafka.py
65
66
67
68
69
70
71
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK
    """
    return SystemType.PYSPARK

write_batch()

Writes batch data to Kafka.

Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kafka.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def write_batch(self):
    """
    Writes batch data to Kafka.
    """
    try:
        return (
            self.data.select(to_json(struct("*")).alias("value"))
            .write.format("kafka")
            .options(**self.options)
            .save()
        )

    except Py4JJavaError as e:
        logging.exception(e.errmsg)
        raise e
    except Exception as e:
        logging.exception(str(e))
        raise e

write_stream()

Writes steaming data to Kafka.

Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kafka.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def write_stream(self):
    """
    Writes steaming data to Kafka.
    """
    try:
        TRIGGER_OPTION = (
            {"availableNow": True}
            if self.trigger == "availableNow"
            else {"processingTime": self.trigger}
        )
        query = (
            self.data.select(to_json(struct("*")).alias("value"))
            .writeStream.trigger(**TRIGGER_OPTION)
            .format("kafka")
            .options(**self.options)
            .queryName(self.query_name)
            .start()
        )
        while query.isActive:
            if query.lastProgress:
                logging.info(query.lastProgress)
            time.sleep(10)

    except Py4JJavaError as e:
        logging.exception(e.errmsg)
        raise e
    except Exception as e:
        logging.exception(str(e))
        raise e