Bases: SourceInterface
This Spark source class is used to read batch or streaming data from an IoT Hub. IoT Hub configurations need to be specified as options in a dictionary.
Additionally, there are more optional configurations which can be found here.
If using startingPosition or endingPosition make sure to check out the Event Position section for more details and examples.
Args:
spark (SparkSession): Spark Session
options (dict): A dictionary of IoT Hub configurations (See Attributes table below)
Attributes:
Name |
Type |
Description |
eventhubs.connectionString |
str
|
IoT Hub connection string is required to connect to the Eventhubs service. (Streaming and Batch)
|
eventhubs.consumerGroup |
str
|
A consumer group is a view of an entire IoT Hub. Consumer groups enable multiple consuming applications to each have a separate view of the event stream, and to read the stream independently at their own pace and with their own offsets. (Streaming and Batch)
|
eventhubs.startingPosition |
JSON str
|
The starting position for your Structured Streaming job. If a specific EventPosition is not set for a partition using startingPositions, then we use the EventPosition set in startingPosition. If nothing is set in either option, we will begin consuming from the end of the partition. (Streaming and Batch)
|
eventhubs.endingPosition |
JSON str
|
(JSON str): The ending position of a batch query. This works the same as startingPosition. (Batch)
|
maxEventsPerTrigger |
long
|
Rate limit on maximum number of events processed per trigger interval. The specified total number of events will be proportionally split across partitions of different volume. (Stream)
|
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iot_hub.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122 | class SparkIoThubSource(SourceInterface):
"""
This Spark source class is used to read batch or streaming data from an IoT Hub. IoT Hub configurations need to be specified as options in a dictionary.
Additionally, there are more optional configurations which can be found [here.](https://github.com/Azure/azure-event-hubs-spark/blob/master/docs/PySpark/structured-streaming-pyspark.md#event-hubs-configuration){ target="_blank" }
If using startingPosition or endingPosition make sure to check out the **Event Position** section for more details and examples.
Args:
spark (SparkSession): Spark Session
options (dict): A dictionary of IoT Hub configurations (See Attributes table below)
Attributes:
eventhubs.connectionString (str): IoT Hub connection string is required to connect to the Eventhubs service. (Streaming and Batch)
eventhubs.consumerGroup (str): A consumer group is a view of an entire IoT Hub. Consumer groups enable multiple consuming applications to each have a separate view of the event stream, and to read the stream independently at their own pace and with their own offsets. (Streaming and Batch)
eventhubs.startingPosition (JSON str): The starting position for your Structured Streaming job. If a specific EventPosition is not set for a partition using startingPositions, then we use the EventPosition set in startingPosition. If nothing is set in either option, we will begin consuming from the end of the partition. (Streaming and Batch)
eventhubs.endingPosition: (JSON str): The ending position of a batch query. This works the same as startingPosition. (Batch)
maxEventsPerTrigger (long): Rate limit on maximum number of events processed per trigger interval. The specified total number of events will be proportionally split across partitions of different volume. (Stream)
"""
options: dict
spark: SparkSession
def __init__(self, spark: SparkSession, options: dict) -> None:
self.spark = spark
self.schema = EVENTHUB_SCHEMA
self.options = options
@staticmethod
def system_type():
"""
Attributes:
SystemType (Environment): Requires PYSPARK
"""
return SystemType.PYSPARK
@staticmethod
def settings() -> dict:
return {}
@staticmethod
def libraries():
spark_libraries = Libraries()
spark_libraries.add_maven_library(get_default_package("spark_azure_eventhub"))
return spark_libraries
def pre_read_validation(self) -> bool:
return True
def post_read_validation(self, df: DataFrame) -> bool:
assert df.schema == self.schema
return True
def read_batch(self) -> DataFrame:
"""
Reads batch data from IoT Hubs.
"""
iothub_connection_string = "eventhubs.connectionString"
try:
if iothub_connection_string in self.options:
sc = self.spark.sparkContext
self.options[
iothub_connection_string
] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
self.options[iothub_connection_string]
)
return self.spark.read.format("eventhubs").options(**self.options).load()
except Py4JJavaError as e:
logging.exception(e.errmsg)
raise e
except Exception as e:
logging.exception(str(e))
raise e
def read_stream(self) -> DataFrame:
"""
Reads streaming data from IoT Hubs.
"""
iothub_connection_string = "eventhubs.connectionString"
try:
if iothub_connection_string in self.options:
sc = self.spark.sparkContext
self.options[
iothub_connection_string
] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
self.options[iothub_connection_string]
)
return (
self.spark.readStream.format("eventhubs").options(**self.options).load()
)
except Py4JJavaError as e:
logging.exception(e.errmsg)
raise e
except Exception as e:
logging.exception(str(e))
raise e
|
read_batch()
Reads batch data from IoT Hubs.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iot_hub.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97 | def read_batch(self) -> DataFrame:
"""
Reads batch data from IoT Hubs.
"""
iothub_connection_string = "eventhubs.connectionString"
try:
if iothub_connection_string in self.options:
sc = self.spark.sparkContext
self.options[
iothub_connection_string
] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
self.options[iothub_connection_string]
)
return self.spark.read.format("eventhubs").options(**self.options).load()
except Py4JJavaError as e:
logging.exception(e.errmsg)
raise e
except Exception as e:
logging.exception(str(e))
raise e
|
read_stream()
Reads streaming data from IoT Hubs.
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iot_hub.py
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122 | def read_stream(self) -> DataFrame:
"""
Reads streaming data from IoT Hubs.
"""
iothub_connection_string = "eventhubs.connectionString"
try:
if iothub_connection_string in self.options:
sc = self.spark.sparkContext
self.options[
iothub_connection_string
] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
self.options[iothub_connection_string]
)
return (
self.spark.readStream.format("eventhubs").options(**self.options).load()
)
except Py4JJavaError as e:
logging.exception(e.errmsg)
raise e
except Exception as e:
logging.exception(str(e))
raise e
|
system_type()
staticmethod
Attributes:
Name |
Type |
Description |
SystemType |
Environment
|
|
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iot_hub.py
| @staticmethod
def system_type():
"""
Attributes:
SystemType (Environment): Requires PYSPARK
"""
return SystemType.PYSPARK
|