Skip to content

Read from an Eventhub

SparkEventhubSource

Bases: SourceInterface

This Spark source class is used to read batch or streaming data from Eventhubs. Eventhub configurations need to be specified as options in a dictionary. Additionally, there are more optional configurations which can be found here. If using startingPosition or endingPosition make sure to check out the Event Position section for more details and examples.

Parameters:

Name Type Description Default
spark SparkSession

Spark Session

required
options dict

A dictionary of Eventhub configurations (See Attributes table below)

required

Attributes:

Name Type Description
eventhubs.connectionString str

Eventhubs connection string is required to connect to the Eventhubs service. (Streaming and Batch)

eventhubs.consumerGroup str

A consumer group is a view of an entire eventhub. Consumer groups enable multiple consuming applications to each have a separate view of the event stream, and to read the stream independently at their own pace and with their own offsets. (Streaming and Batch)

eventhubs.startingPosition JSON str

The starting position for your Structured Streaming job. If a specific EventPosition is not set for a partition using startingPositions, then we use the EventPosition set in startingPosition. If nothing is set in either option, we will begin consuming from the end of the partition. (Streaming and Batch)

eventhubs.endingPosition JSON str

(JSON str): The ending position of a batch query. This works the same as startingPosition. (Batch)

maxEventsPerTrigger long

Rate limit on maximum number of events processed per trigger interval. The specified total number of events will be proportionally split across partitions of different volume. (Stream)

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/eventhub.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
class SparkEventhubSource(SourceInterface):
    '''
    This Spark source class is used to read batch or streaming data from Eventhubs. Eventhub configurations need to be specified as options in a dictionary.
    Additionally, there are more optional configurations which can be found [here.](https://github.com/Azure/azure-event-hubs-spark/blob/master/docs/PySpark/structured-streaming-pyspark.md#event-hubs-configuration){ target="_blank" }
    If using startingPosition or endingPosition make sure to check out the **Event Position** section for more details and examples.
    Args:
        spark (SparkSession): Spark Session
        options (dict): A dictionary of Eventhub configurations (See Attributes table below)

    Attributes:
        eventhubs.connectionString (str):  Eventhubs connection string is required to connect to the Eventhubs service. (Streaming and Batch)
        eventhubs.consumerGroup (str): A consumer group is a view of an entire eventhub. Consumer groups enable multiple consuming applications to each have a separate view of the event stream, and to read the stream independently at their own pace and with their own offsets. (Streaming and Batch)
        eventhubs.startingPosition (JSON str): The starting position for your Structured Streaming job. If a specific EventPosition is not set for a partition using startingPositions, then we use the EventPosition set in startingPosition. If nothing is set in either option, we will begin consuming from the end of the partition. (Streaming and Batch)
        eventhubs.endingPosition: (JSON str): The ending position of a batch query. This works the same as startingPosition. (Batch)
        maxEventsPerTrigger (long): Rate limit on maximum number of events processed per trigger interval. The specified total number of events will be proportionally split across partitions of different volume. (Stream)

    '''
    spark: SparkSession
    options: dict

    def __init__(self, spark: SparkSession, options: dict) -> None:
        self.spark = spark
        self.options = options
        self.schema = EVENTHUB_SCHEMA


    @staticmethod
    def system_type():
        '''
        Attributes:
            SystemType (Environment): Requires PYSPARK
        '''            
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        spark_libraries = Libraries()
        spark_libraries.add_maven_library(DEFAULT_PACKAGES["spark_azure_eventhub"])
        return spark_libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def pre_read_validation(self) -> bool:
        return True

    def post_read_validation(self, df: DataFrame) -> bool:
        assert df.schema == self.schema
        return True

    def read_batch(self) -> DataFrame:
        '''
        Reads batch data from Eventhubs.
        '''
        eventhub_connection_string = "eventhubs.connectionString"
        try:
            if eventhub_connection_string in self.options:
                sc = self.spark.sparkContext
                self.options[eventhub_connection_string] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(self.options[eventhub_connection_string])

            return (self.spark
                .read
                .format("eventhubs")
                .options(**self.options)
                .load()
            )

        except Py4JJavaError as e:
            logging.exception(e.errmsg)
            raise e
        except Exception as e:
            logging.exception(str(e))
            raise e

    def read_stream(self) -> DataFrame:
        '''
        Reads streaming data from Eventhubs.
        '''
        eventhub_connection_string = "eventhubs.connectionString"
        try:
            if eventhub_connection_string in self.options:
                sc = self.spark.sparkContext
                self.options[eventhub_connection_string] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(self.options[eventhub_connection_string])

            return (self.spark
                .readStream
                .format("eventhubs")
                .options(**self.options)
                .load()
            )

        except Py4JJavaError as e:
            logging.exception(e.errmsg)
            raise e
        except Exception as e:
            logging.exception(str(e))
            raise e

read_batch()

Reads batch data from Eventhubs.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/eventhub.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def read_batch(self) -> DataFrame:
    '''
    Reads batch data from Eventhubs.
    '''
    eventhub_connection_string = "eventhubs.connectionString"
    try:
        if eventhub_connection_string in self.options:
            sc = self.spark.sparkContext
            self.options[eventhub_connection_string] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(self.options[eventhub_connection_string])

        return (self.spark
            .read
            .format("eventhubs")
            .options(**self.options)
            .load()
        )

    except Py4JJavaError as e:
        logging.exception(e.errmsg)
        raise e
    except Exception as e:
        logging.exception(str(e))
        raise e

read_stream()

Reads streaming data from Eventhubs.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/eventhub.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
def read_stream(self) -> DataFrame:
    '''
    Reads streaming data from Eventhubs.
    '''
    eventhub_connection_string = "eventhubs.connectionString"
    try:
        if eventhub_connection_string in self.options:
            sc = self.spark.sparkContext
            self.options[eventhub_connection_string] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(self.options[eventhub_connection_string])

        return (self.spark
            .readStream
            .format("eventhubs")
            .options(**self.options)
            .load()
        )

    except Py4JJavaError as e:
        logging.exception(e.errmsg)
        raise e
    except Exception as e:
        logging.exception(str(e))
        raise e

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/eventhub.py
50
51
52
53
54
55
56
@staticmethod
def system_type():
    '''
    Attributes:
        SystemType (Environment): Requires PYSPARK
    '''            
    return SystemType.PYSPARK