Bases: SourceInterface
This Spark source class is used to read batch or streaming data from Eventhubs. Eventhub configurations need to be specified as options in a dictionary.
Additionally, there are more optional configurations which can be found here.
If using startingPosition or endingPosition make sure to check out the Event Position section for more details and examples.
Parameters:
Name |
Type |
Description |
Default |
spark |
SparkSession
|
Spark Session |
required
|
options |
dict
|
A dictionary of Eventhub configurations (See Attributes table below) |
required
|
Attributes:
Name |
Type |
Description |
eventhubs.connectionString |
str
|
Eventhubs connection string is required to connect to the Eventhubs service. (Streaming and Batch) |
eventhubs.consumerGroup |
str
|
A consumer group is a view of an entire eventhub. Consumer groups enable multiple consuming applications to each have a separate view of the event stream, and to read the stream independently at their own pace and with their own offsets. (Streaming and Batch) |
eventhubs.startingPosition |
JSON str
|
The starting position for your Structured Streaming job. If a specific EventPosition is not set for a partition using startingPositions, then we use the EventPosition set in startingPosition. If nothing is set in either option, we will begin consuming from the end of the partition. (Streaming and Batch) |
eventhubs.endingPosition |
JSON str
|
(JSON str): The ending position of a batch query. This works the same as startingPosition. (Batch) |
maxEventsPerTrigger |
long
|
Rate limit on maximum number of events processed per trigger interval. The specified total number of events will be proportionally split across partitions of different volume. (Stream) |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/eventhub.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121 | class SparkEventhubSource(SourceInterface):
'''
This Spark source class is used to read batch or streaming data from Eventhubs. Eventhub configurations need to be specified as options in a dictionary.
Additionally, there are more optional configurations which can be found [here.](https://github.com/Azure/azure-event-hubs-spark/blob/master/docs/PySpark/structured-streaming-pyspark.md#event-hubs-configuration){ target="_blank" }
If using startingPosition or endingPosition make sure to check out the **Event Position** section for more details and examples.
Args:
spark (SparkSession): Spark Session
options (dict): A dictionary of Eventhub configurations (See Attributes table below)
Attributes:
eventhubs.connectionString (str): Eventhubs connection string is required to connect to the Eventhubs service. (Streaming and Batch)
eventhubs.consumerGroup (str): A consumer group is a view of an entire eventhub. Consumer groups enable multiple consuming applications to each have a separate view of the event stream, and to read the stream independently at their own pace and with their own offsets. (Streaming and Batch)
eventhubs.startingPosition (JSON str): The starting position for your Structured Streaming job. If a specific EventPosition is not set for a partition using startingPositions, then we use the EventPosition set in startingPosition. If nothing is set in either option, we will begin consuming from the end of the partition. (Streaming and Batch)
eventhubs.endingPosition: (JSON str): The ending position of a batch query. This works the same as startingPosition. (Batch)
maxEventsPerTrigger (long): Rate limit on maximum number of events processed per trigger interval. The specified total number of events will be proportionally split across partitions of different volume. (Stream)
'''
spark: SparkSession
options: dict
def __init__(self, spark: SparkSession, options: dict) -> None:
self.spark = spark
self.options = options
self.schema = EVENTHUB_SCHEMA
@staticmethod
def system_type():
'''
Attributes:
SystemType (Environment): Requires PYSPARK
'''
return SystemType.PYSPARK
@staticmethod
def libraries():
spark_libraries = Libraries()
spark_libraries.add_maven_library(DEFAULT_PACKAGES["spark_azure_eventhub"])
return spark_libraries
@staticmethod
def settings() -> dict:
return {}
def pre_read_validation(self) -> bool:
return True
def post_read_validation(self, df: DataFrame) -> bool:
assert df.schema == self.schema
return True
def read_batch(self) -> DataFrame:
'''
Reads batch data from Eventhubs.
'''
eventhub_connection_string = "eventhubs.connectionString"
try:
if eventhub_connection_string in self.options:
sc = self.spark.sparkContext
self.options[eventhub_connection_string] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(self.options[eventhub_connection_string])
return (self.spark
.read
.format("eventhubs")
.options(**self.options)
.load()
)
except Py4JJavaError as e:
logging.exception(e.errmsg)
raise e
except Exception as e:
logging.exception(str(e))
raise e
def read_stream(self) -> DataFrame:
'''
Reads streaming data from Eventhubs.
'''
eventhub_connection_string = "eventhubs.connectionString"
try:
if eventhub_connection_string in self.options:
sc = self.spark.sparkContext
self.options[eventhub_connection_string] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(self.options[eventhub_connection_string])
return (self.spark
.readStream
.format("eventhubs")
.options(**self.options)
.load()
)
except Py4JJavaError as e:
logging.exception(e.errmsg)
raise e
except Exception as e:
logging.exception(str(e))
raise e
|
read_batch()
Reads batch data from Eventhubs.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/eventhub.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97 | def read_batch(self) -> DataFrame:
'''
Reads batch data from Eventhubs.
'''
eventhub_connection_string = "eventhubs.connectionString"
try:
if eventhub_connection_string in self.options:
sc = self.spark.sparkContext
self.options[eventhub_connection_string] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(self.options[eventhub_connection_string])
return (self.spark
.read
.format("eventhubs")
.options(**self.options)
.load()
)
except Py4JJavaError as e:
logging.exception(e.errmsg)
raise e
except Exception as e:
logging.exception(str(e))
raise e
|
read_stream()
Reads streaming data from Eventhubs.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/eventhub.py
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121 | def read_stream(self) -> DataFrame:
'''
Reads streaming data from Eventhubs.
'''
eventhub_connection_string = "eventhubs.connectionString"
try:
if eventhub_connection_string in self.options:
sc = self.spark.sparkContext
self.options[eventhub_connection_string] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(self.options[eventhub_connection_string])
return (self.spark
.readStream
.format("eventhubs")
.options(**self.options)
.load()
)
except Py4JJavaError as e:
logging.exception(e.errmsg)
raise e
except Exception as e:
logging.exception(str(e))
raise e
|
system_type()
staticmethod
Attributes:
Name |
Type |
Description |
SystemType |
Environment
|
Requires PYSPARK |
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/eventhub.py
| @staticmethod
def system_type():
'''
Attributes:
SystemType (Environment): Requires PYSPARK
'''
return SystemType.PYSPARK
|