Skip to content

Write to Eventhub using Kafka

SparkKafkaEventhubDestination

Bases: DestinationInterface

This Spark Destination class is used to write batch or streaming data to an Eventhub using the Kafka protocol. This enables Eventhubs to be used as a destination in applications like Delta Live Tables or Databricks Serverless Jobs as the Spark Eventhubs JAR is not supported in these scenarios.

Default settings will be specified if not provided in the options parameter:

  • kafka.sasl.mechanism will be set to PLAIN
  • kafka.security.protocol will be set to SASL_SSL
  • kafka.request.timeout.ms will be set to 60000
  • kafka.session.timeout.ms will be set to 60000

Example

from rtdip_sdk.pipelines.destinations import SparkKafkaEventhubDestination
from rtdip_sdk.pipelines.utilities import SparkSessionUtility

# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()

connectionString = Endpoint=sb://{NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={ACCESS_KEY_NAME};SharedAccessKey={ACCESS_KEY}=;EntityPath={EVENT_HUB_NAME}

eventhub_destination = SparkKafkaEventhubDestination(
    spark=spark,
    data=df,
    options={
        "kafka.bootstrap.servers": "host1:port1,host2:port2"
    },
    connection_string="{YOUR-EVENTHUB-CONNECTION-STRING}",
    consumer_group="{YOUR-EVENTHUB-CONSUMER-GROUP}",
    trigger="10 seconds",
    query_name="KafkaEventhubDestination",
    query_wait_interval=None
)

eventhub_destination.write_stream()

OR

eventhub_destination.write_batch()

Parameters:

Name Type Description Default
spark SparkSession

Spark Session

required
data DataFrame

Any columns not listed in the required schema here will be merged into a single column named "value", or ignored if "value" is an existing column

required
connection_string str

Eventhubs connection string is required to connect to the Eventhubs service. This must include the Eventhub name as the EntityPath parameter. Example "Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=test;SharedAccessKey=test_key;EntityPath=test_eventhub"

required
options dict

A dictionary of Kafka configurations (See Attributes tables below)

required
consumer_group str

The Eventhub consumer group to use for the connection

required
trigger optional str

Frequency of the write operation. Specify "availableNow" to execute a trigger once, otherwise specify a time period such as "30 seconds", "5 minutes". Set to "0 seconds" if you do not want to use a trigger. (stream) Default is 10 seconds

'10 seconds'
query_name optional str

Unique name for the query in associated SparkSession

'KafkaEventhubDestination'
query_wait_interval optional int

If set, waits for the streaming query to complete before returning. (stream) Default is None

None

The following are commonly used parameters that may be included in the options dict. kafka.bootstrap.servers is the only required config. A full list of configs can be found here

Attributes:

Name Type Description
kafka.bootstrap.servers A comma-separated list of hostī¸°port

The Kafka "bootstrap.servers" configuration. (Streaming and Batch)

topic string

Required if there is no existing topic column in your DataFrame. Sets the topic that all rows will be written to in Kafka. (Streaming and Batch)

includeHeaders bool

Determines whether to include the Kafka headers in the row; defaults to False. (Streaming and Batch)

Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kafka_eventhub.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
class SparkKafkaEventhubDestination(DestinationInterface):
    """
    This Spark Destination class is used to write batch or streaming data to an Eventhub using the Kafka protocol. This enables Eventhubs to be used as a destination in applications like Delta Live Tables or Databricks Serverless Jobs as the Spark Eventhubs JAR is not supported in these scenarios.

    Default settings will be specified if not provided in the `options` parameter:

    - `kafka.sasl.mechanism` will be set to `PLAIN`
    - `kafka.security.protocol` will be set to `SASL_SSL`
    - `kafka.request.timeout.ms` will be set to `60000`
    - `kafka.session.timeout.ms` will be set to `60000`

    Example
    --------
    ```python
    from rtdip_sdk.pipelines.destinations import SparkKafkaEventhubDestination
    from rtdip_sdk.pipelines.utilities import SparkSessionUtility

    # Not required if using Databricks
    spark = SparkSessionUtility(config={}).execute()

    connectionString = Endpoint=sb://{NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={ACCESS_KEY_NAME};SharedAccessKey={ACCESS_KEY}=;EntityPath={EVENT_HUB_NAME}

    eventhub_destination = SparkKafkaEventhubDestination(
        spark=spark,
        data=df,
        options={
            "kafka.bootstrap.servers": "host1:port1,host2:port2"
        },
        connection_string="{YOUR-EVENTHUB-CONNECTION-STRING}",
        consumer_group="{YOUR-EVENTHUB-CONSUMER-GROUP}",
        trigger="10 seconds",
        query_name="KafkaEventhubDestination",
        query_wait_interval=None
    )

    eventhub_destination.write_stream()

    OR

    eventhub_destination.write_batch()
    ```

    Parameters:
        spark (SparkSession): Spark Session
        data (DataFrame): Any columns not listed in the required schema [here](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-data-to-kafka){ target="_blank" } will be merged into a single column named "value", or ignored if "value" is an existing column
        connection_string (str): Eventhubs connection string is required to connect to the Eventhubs service. This must include the Eventhub name as the `EntityPath` parameter. Example `"Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=test;SharedAccessKey=test_key;EntityPath=test_eventhub"`
        options (dict): A dictionary of Kafka configurations (See Attributes tables below)
        consumer_group (str): The Eventhub consumer group to use for the connection
        trigger (optional str): Frequency of the write operation. Specify "availableNow" to execute a trigger once, otherwise specify a time period such as "30 seconds", "5 minutes". Set to "0 seconds" if you do not want to use a trigger. (stream) Default is 10 seconds
        query_name (optional str): Unique name for the query in associated SparkSession
        query_wait_interval (optional int): If set, waits for the streaming query to complete before returning. (stream) Default is None

    The following are commonly used parameters that may be included in the options dict. kafka.bootstrap.servers is the only required config. A full list of configs can be found [here](https://kafka.apache.org/documentation/#producerconfigs){ target="_blank" }

    Attributes:
        kafka.bootstrap.servers (A comma-separated list of hostī¸°port):  The Kafka "bootstrap.servers" configuration. (Streaming and Batch)
        topic (string): Required if there is no existing topic column in your DataFrame. Sets the topic that all rows will be written to in Kafka. (Streaming and Batch)
        includeHeaders (bool): Determines whether to include the Kafka headers in the row; defaults to False. (Streaming and Batch)
    """

    spark: SparkSession
    data: DataFrame
    connection_string: str
    options: dict
    consumer_group: str
    trigger: str
    query_name: str
    connection_string_properties: dict
    query_wait_interval: int

    def __init__(
        self,
        spark: SparkSession,
        data: DataFrame,
        connection_string: str,
        options: dict,
        consumer_group: str,
        trigger: str = "10 seconds",
        query_name: str = "KafkaEventhubDestination",
        query_wait_interval: int = None,
    ) -> None:
        self.spark = spark
        self.data = data
        self.connection_string = connection_string
        self.options = options
        self.consumer_group = consumer_group
        self.trigger = trigger
        self.query_name = query_name
        self.connection_string_properties = self._parse_connection_string(
            connection_string
        )
        self.options = self._configure_options(options)
        self.query_wait_interval = query_wait_interval

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK
        """
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        spark_libraries = Libraries()
        spark_libraries.add_maven_library(get_default_package("spark_sql_kafka"))
        return spark_libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def pre_write_validation(self) -> bool:
        return True

    def post_write_validation(self) -> bool:
        return True

    # Code is from Azure Eventhub Python SDK. Will import the package if possible with Conda in the  conda-forge channel in the future
    def _parse_connection_string(self, connection_string: str):
        conn_settings = [s.split("=", 1) for s in connection_string.split(";")]
        if any(len(tup) != 2 for tup in conn_settings):
            raise ValueError("Connection string is either blank or malformed.")
        conn_settings = dict(conn_settings)
        shared_access_signature = None
        for key, value in conn_settings.items():
            if key.lower() == "sharedaccesssignature":
                shared_access_signature = value
        shared_access_key = conn_settings.get("SharedAccessKey")
        shared_access_key_name = conn_settings.get("SharedAccessKeyName")
        if any([shared_access_key, shared_access_key_name]) and not all(
            [shared_access_key, shared_access_key_name]
        ):
            raise ValueError(
                "Connection string must have both SharedAccessKeyName and SharedAccessKey."
            )
        if shared_access_signature is not None and shared_access_key is not None:
            raise ValueError(
                "Only one of the SharedAccessKey or SharedAccessSignature must be present."
            )
        endpoint = conn_settings.get("Endpoint")
        if not endpoint:
            raise ValueError("Connection string is either blank or malformed.")
        parsed = urlparse(endpoint.rstrip("/"))
        if not parsed.netloc:
            raise ValueError("Invalid Endpoint on the Connection String.")
        namespace = parsed.netloc.strip()
        properties = {
            "fully_qualified_namespace": namespace,
            "endpoint": endpoint,
            "eventhub_name": conn_settings.get("EntityPath"),
            "shared_access_signature": shared_access_signature,
            "shared_access_key_name": shared_access_key_name,
            "shared_access_key": shared_access_key,
        }
        return properties

    def _connection_string_builder(self, properties: dict) -> str:
        connection_string = "Endpoint=" + properties.get("endpoint") + ";"

        if properties.get("shared_access_key"):
            connection_string += (
                "SharedAccessKey=" + properties.get("shared_access_key") + ";"
            )

        if properties.get("shared_access_key_name"):
            connection_string += (
                "SharedAccessKeyName=" + properties.get("shared_access_key_name") + ";"
            )

        if properties.get("shared_access_signature"):
            connection_string += (
                "SharedAccessSignature="
                + properties.get("shared_access_signature")
                + ";"
            )
        return connection_string

    def _configure_options(self, options: dict) -> dict:
        if "topic" not in options:
            options["topic"] = self.connection_string_properties.get("eventhub_name")

        if "kafka.bootstrap.servers" not in options:
            options["kafka.bootstrap.servers"] = (
                self.connection_string_properties.get("fully_qualified_namespace")
                + ":9093"
            )

        if "kafka.sasl.mechanism" not in options:
            options["kafka.sasl.mechanism"] = "PLAIN"

        if "kafka.security.protocol" not in options:
            options["kafka.security.protocol"] = "SASL_SSL"

        if "kafka.sasl.jaas.config" not in options:
            kafka_package = "org.apache.kafka.common.security.plain.PlainLoginModule"
            if "DATABRICKS_RUNTIME_VERSION" in os.environ or (
                "_client" in self.spark.__dict__
                and "databricks" in self.spark.client.host
            ):
                kafka_package = "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule"
            connection_string = self._connection_string_builder(
                self.connection_string_properties
            )
            options["kafka.sasl.jaas.config"] = (
                '{} required username="$ConnectionString" password="{}";'.format(
                    kafka_package, connection_string
                )
            )  # NOSONAR

        if "kafka.request.timeout.ms" not in options:
            options["kafka.request.timeout.ms"] = "60000"

        if "kafka.session.timeout.ms" not in options:
            options["kafka.session.timeout.ms"] = "60000"

        if "kafka.group.id" not in options:
            options["kafka.group.id"] = self.consumer_group

        options["includeHeaders"] = "true"

        return options

    def _transform_to_eventhub_schema(self, df: DataFrame) -> DataFrame:
        column_list = ["key", "headers", "topic", "partition"]
        if "value" not in df.columns:
            df = df.withColumn(
                "value",
                to_json(
                    struct(
                        [
                            col(column).alias(column)
                            for column in df.columns
                            if column not in column_list
                        ]
                    )
                ),
            )
        if "headers" in df.columns and (
            df.schema["headers"].dataType.elementType["key"].nullable == True
            or df.schema["headers"].dataType.elementType["value"].nullable == True
        ):
            raise ValueError("key and value in the headers column cannot be nullable")

        return df.select(
            [
                column
                for column in df.columns
                if column in ["value", "key", "headers", "topic", "partition"]
            ]
        )

    def write_batch(self) -> DataFrame:
        """
        Reads batch data from Kafka.
        """
        try:
            df = self._transform_to_eventhub_schema(self.data)
            df.write.format("kafka").options(**self.options).save()

        except Py4JJavaError as e:
            logging.exception(e.errmsg)
            raise e
        except Exception as e:
            logging.exception(str(e))
            raise e

    def write_stream(self) -> DataFrame:
        """
        Reads streaming data from Kafka.
        """
        try:
            df = self._transform_to_eventhub_schema(self.data)
            TRIGGER_OPTION = (
                {"availableNow": True}
                if self.trigger == "availableNow"
                else {"processingTime": self.trigger}
            )
            query = (
                df.writeStream.trigger(**TRIGGER_OPTION)
                .format("kafka")
                .options(**self.options)
                .queryName(self.query_name)
                .start()
            )

            if self.query_wait_interval:
                while query.isActive:
                    if query.lastProgress:
                        logging.info(query.lastProgress)
                    time.sleep(self.query_wait_interval)

        except Py4JJavaError as e:
            logging.exception(e.errmsg)
            raise e
        except Exception as e:
            logging.exception(str(e))
            raise e

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK

Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kafka_eventhub.py
131
132
133
134
135
136
137
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK
    """
    return SystemType.PYSPARK

write_batch()

Reads batch data from Kafka.

Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kafka_eventhub.py
289
290
291
292
293
294
295
296
297
298
299
300
301
302
def write_batch(self) -> DataFrame:
    """
    Reads batch data from Kafka.
    """
    try:
        df = self._transform_to_eventhub_schema(self.data)
        df.write.format("kafka").options(**self.options).save()

    except Py4JJavaError as e:
        logging.exception(e.errmsg)
        raise e
    except Exception as e:
        logging.exception(str(e))
        raise e

write_stream()

Reads streaming data from Kafka.

Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kafka_eventhub.py
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
def write_stream(self) -> DataFrame:
    """
    Reads streaming data from Kafka.
    """
    try:
        df = self._transform_to_eventhub_schema(self.data)
        TRIGGER_OPTION = (
            {"availableNow": True}
            if self.trigger == "availableNow"
            else {"processingTime": self.trigger}
        )
        query = (
            df.writeStream.trigger(**TRIGGER_OPTION)
            .format("kafka")
            .options(**self.options)
            .queryName(self.query_name)
            .start()
        )

        if self.query_wait_interval:
            while query.isActive:
                if query.lastProgress:
                    logging.info(query.lastProgress)
                time.sleep(self.query_wait_interval)

    except Py4JJavaError as e:
        logging.exception(e.errmsg)
        raise e
    except Exception as e:
        logging.exception(str(e))
        raise e