Skip to content

Read from an Eventhub

SparkEventhubSource

Bases: SourceInterface

This Spark source class is used to read batch or streaming data from Eventhubs. Eventhub configurations need to be specified as options in a dictionary. Additionally, there are more optional configurations which can be found here. If using startingPosition or endingPosition make sure to check out the Event Position section for more details and examples.

Example

#Eventhub Source for Streaming Queries

from rtdip_sdk.pipelines.sources import SparkEventhubSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
import json

# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()

connectionString = "Endpoint=sb://{NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={ACCESS_KEY_NAME};SharedAccessKey={ACCESS_KEY}=;EntityPath={EVENT_HUB_NAME}"

startingEventPosition = {
"offset": -1,
"seqNo": -1,
"enqueuedTime": None,
"isInclusive": True
}

eventhub_source = SparkEventhubSource(
    spark=spark,
    options = {
        "eventhubs.connectionString": connectionString,
        "eventhubs.consumerGroup": "{YOUR-CONSUMER-GROUP}",
        "eventhubs.startingPosition": json.dumps(startingEventPosition),
        "maxEventsPerTrigger" : 1000
    }
)

eventhub_source.read_stream()
 #Eventhub Source for Batch Queries

from rtdip_sdk.pipelines.sources import SparkEventhubSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
import json

# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()

connectionString = "Endpoint=sb://{NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={ACCESS_KEY_NAME};SharedAccessKey={ACCESS_KEY}=;EntityPath={EVENT_HUB_NAME}"

startingEventPosition = {
    "offset": -1,
    "seqNo": -1,
    "enqueuedTime": None,
    "isInclusive": True
}

endingEventPosition = {
    "offset": None,
    "seqNo": -1,
    "enqueuedTime": endTime,
    "isInclusive": True
}

eventhub_source = SparkEventhubSource(
    spark,
    options = {
        "eventhubs.connectionString": connectionString,
        "eventhubs.consumerGroup": "{YOUR-CONSUMER-GROUP}",
        "eventhubs.startingPosition": json.dumps(startingEventPosition),
        "eventhubs.endingPosition": json.dumps(endingEventPosition)
    }
)

eventhub_source.read_batch()

Parameters:

Name Type Description Default
spark SparkSession

Spark Session

required
options dict

A dictionary of Eventhub configurations (See Attributes table below)

required

Attributes:

Name Type Description
eventhubs.connectionString str

Eventhubs connection string is required to connect to the Eventhubs service. (Streaming and Batch)

eventhubs.consumerGroup str

A consumer group is a view of an entire eventhub. Consumer groups enable multiple consuming applications to each have a separate view of the event stream, and to read the stream independently at their own pace and with their own offsets. (Streaming and Batch)

eventhubs.startingPosition JSON str

The starting position for your Structured Streaming job. If a specific EventPosition is not set for a partition using startingPositions, then we use the EventPosition set in startingPosition. If nothing is set in either option, we will begin consuming from the end of the partition. (Streaming and Batch)

eventhubs.endingPosition JSON str

(JSON str): The ending position of a batch query. This works the same as startingPosition. (Batch)

maxEventsPerTrigger long

Rate limit on maximum number of events processed per trigger interval. The specified total number of events will be proportionally split across partitions of different volume. (Stream)

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/eventhub.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
class SparkEventhubSource(SourceInterface):
    """
    This Spark source class is used to read batch or streaming data from Eventhubs. Eventhub configurations need to be specified as options in a dictionary.
    Additionally, there are more optional configurations which can be found [here.](https://github.com/Azure/azure-event-hubs-spark/blob/master/docs/PySpark/structured-streaming-pyspark.md#event-hubs-configuration){ target="_blank" }
    If using startingPosition or endingPosition make sure to check out the **Event Position** section for more details and examples.

    Example
    --------
    ```python
    #Eventhub Source for Streaming Queries

    from rtdip_sdk.pipelines.sources import SparkEventhubSource
    from rtdip_sdk.pipelines.utilities import SparkSessionUtility
    import json

    # Not required if using Databricks
    spark = SparkSessionUtility(config={}).execute()

    connectionString = "Endpoint=sb://{NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={ACCESS_KEY_NAME};SharedAccessKey={ACCESS_KEY}=;EntityPath={EVENT_HUB_NAME}"

    startingEventPosition = {
    "offset": -1,
    "seqNo": -1,
    "enqueuedTime": None,
    "isInclusive": True
    }

    eventhub_source = SparkEventhubSource(
        spark=spark,
        options = {
            "eventhubs.connectionString": connectionString,
            "eventhubs.consumerGroup": "{YOUR-CONSUMER-GROUP}",
            "eventhubs.startingPosition": json.dumps(startingEventPosition),
            "maxEventsPerTrigger" : 1000
        }
    )

    eventhub_source.read_stream()
    ```
    ```python
     #Eventhub Source for Batch Queries

    from rtdip_sdk.pipelines.sources import SparkEventhubSource
    from rtdip_sdk.pipelines.utilities import SparkSessionUtility
    import json

    # Not required if using Databricks
    spark = SparkSessionUtility(config={}).execute()

    connectionString = "Endpoint=sb://{NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={ACCESS_KEY_NAME};SharedAccessKey={ACCESS_KEY}=;EntityPath={EVENT_HUB_NAME}"

    startingEventPosition = {
        "offset": -1,
        "seqNo": -1,
        "enqueuedTime": None,
        "isInclusive": True
    }

    endingEventPosition = {
        "offset": None,
        "seqNo": -1,
        "enqueuedTime": endTime,
        "isInclusive": True
    }

    eventhub_source = SparkEventhubSource(
        spark,
        options = {
            "eventhubs.connectionString": connectionString,
            "eventhubs.consumerGroup": "{YOUR-CONSUMER-GROUP}",
            "eventhubs.startingPosition": json.dumps(startingEventPosition),
            "eventhubs.endingPosition": json.dumps(endingEventPosition)
        }
    )

    eventhub_source.read_batch()
    ```

    Parameters:
        spark (SparkSession): Spark Session
        options (dict): A dictionary of Eventhub configurations (See Attributes table below)

    Attributes:
        eventhubs.connectionString (str):  Eventhubs connection string is required to connect to the Eventhubs service. (Streaming and Batch)
        eventhubs.consumerGroup (str): A consumer group is a view of an entire eventhub. Consumer groups enable multiple consuming applications to each have a separate view of the event stream, and to read the stream independently at their own pace and with their own offsets. (Streaming and Batch)
        eventhubs.startingPosition (JSON str): The starting position for your Structured Streaming job. If a specific EventPosition is not set for a partition using startingPositions, then we use the EventPosition set in startingPosition. If nothing is set in either option, we will begin consuming from the end of the partition. (Streaming and Batch)
        eventhubs.endingPosition: (JSON str): The ending position of a batch query. This works the same as startingPosition. (Batch)
        maxEventsPerTrigger (long): Rate limit on maximum number of events processed per trigger interval. The specified total number of events will be proportionally split across partitions of different volume. (Stream)

    """

    spark: SparkSession
    options: dict

    def __init__(self, spark: SparkSession, options: dict) -> None:
        self.spark = spark
        self.options = options
        self.schema = EVENTHUB_SCHEMA

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK
        """
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        spark_libraries = Libraries()
        spark_libraries.add_maven_library(get_default_package("spark_azure_eventhub"))
        return spark_libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def pre_read_validation(self) -> bool:
        return True

    def post_read_validation(self, df: DataFrame) -> bool:
        assert df.schema == self.schema
        return True

    def read_batch(self) -> DataFrame:
        """
        Reads batch data from Eventhubs.
        """
        eventhub_connection_string = "eventhubs.connectionString"
        try:
            if eventhub_connection_string in self.options:
                sc = self.spark.sparkContext
                self.options[eventhub_connection_string] = (
                    sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
                        self.options[eventhub_connection_string]
                    )
                )

            return self.spark.read.format("eventhubs").options(**self.options).load()

        except Py4JJavaError as e:
            logging.exception(e.errmsg)
            raise e
        except Exception as e:
            logging.exception(str(e))
            raise e

    def read_stream(self) -> DataFrame:
        """
        Reads streaming data from Eventhubs.
        """
        eventhub_connection_string = "eventhubs.connectionString"
        try:
            if eventhub_connection_string in self.options:
                sc = self.spark.sparkContext
                self.options[eventhub_connection_string] = (
                    sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
                        self.options[eventhub_connection_string]
                    )
                )

            return (
                self.spark.readStream.format("eventhubs").options(**self.options).load()
            )

        except Py4JJavaError as e:
            logging.exception(e.errmsg)
            raise e
        except Exception as e:
            logging.exception(str(e))
            raise e

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/eventhub.py
124
125
126
127
128
129
130
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK
    """
    return SystemType.PYSPARK

read_batch()

Reads batch data from Eventhubs.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/eventhub.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def read_batch(self) -> DataFrame:
    """
    Reads batch data from Eventhubs.
    """
    eventhub_connection_string = "eventhubs.connectionString"
    try:
        if eventhub_connection_string in self.options:
            sc = self.spark.sparkContext
            self.options[eventhub_connection_string] = (
                sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
                    self.options[eventhub_connection_string]
                )
            )

        return self.spark.read.format("eventhubs").options(**self.options).load()

    except Py4JJavaError as e:
        logging.exception(e.errmsg)
        raise e
    except Exception as e:
        logging.exception(str(e))
        raise e

read_stream()

Reads streaming data from Eventhubs.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/eventhub.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def read_stream(self) -> DataFrame:
    """
    Reads streaming data from Eventhubs.
    """
    eventhub_connection_string = "eventhubs.connectionString"
    try:
        if eventhub_connection_string in self.options:
            sc = self.spark.sparkContext
            self.options[eventhub_connection_string] = (
                sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
                    self.options[eventhub_connection_string]
                )
            )

        return (
            self.spark.readStream.format("eventhubs").options(**self.options).load()
        )

    except Py4JJavaError as e:
        logging.exception(e.errmsg)
        raise e
    except Exception as e:
        logging.exception(str(e))
        raise e