Write to Kafka
SparkKafkaDestination
Bases: DestinationInterface
This Spark destination class is used to write batch or streaming data from Kafka. Required and optional configurations can be found in the Attributes tables below.
Additionally, there are more optional configurations which can be found here.
For compatibility between Spark and Kafka, the columns in the input dataframe are concatenated into one 'value' column of JSON string.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
DataFrame
|
Dataframe to be written to Kafka |
required |
options |
dict
|
A dictionary of Kafka configurations (See Attributes tables below). For more information on configuration options see here |
required |
trigger |
str
|
Frequency of the write operation. Specify "availableNow" to execute a trigger once, otherwise specify a time period such as "30 seconds", "5 minutes" |
'10 seconds'
|
query_name |
str
|
Unique name for the query in associated SparkSession |
'KafkaDestination'
|
The following options must be set for the Kafka destination for both batch and streaming queries.
Attributes:
Name | Type | Description |
---|---|---|
kafka.bootstrap.servers |
A comma-separated list of hostport
|
The Kafka "bootstrap.servers" configuration. (Streaming and Batch) |
The following configurations are optional:
Attributes:
Name | Type | Description |
---|---|---|
topic |
str
|
Sets the topic that all rows will be written to in Kafka. This option overrides any topic column that may exist in the data. (Streaming and Batch) |
includeHeaders |
bool
|
Whether to include the Kafka headers in the row. (Streaming and Batch) |
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kafka.py
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
|
system_type()
staticmethod
Attributes:
Name | Type | Description |
---|---|---|
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kafka.py
65 66 67 68 69 70 71 |
|
write_batch()
Writes batch data to Kafka.
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kafka.py
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
|
write_stream()
Writes steaming data to Kafka.
Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kafka.py
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
|