Skip to content

Json

DataBricksAutoLoaderSource

Bases: SourceInterface

The Spark Auto Loader is used to read new data files as they arrive in cloud storage. Further information on Auto Loader is available here

Example

from rtdip_sdk.pipelines.sources import DataBricksAutoLoaderSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility

# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()

options = {}
path = "abfss://{FILE-SYSTEM}@{ACCOUNT-NAME}.dfs.core.windows.net/{PATH}/{FILE-NAME}
format = "{DESIRED-FILE-FORMAT}"

DataBricksAutoLoaderSource(spark, options, path, format).read_stream()

OR

DataBricksAutoLoaderSource(spark, options, path, format).read_batch()
from rtdip_sdk.pipelines.sources import DataBricksAutoLoaderSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility

# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()

options = {}
path = "https://s3.{REGION-CODE}.amazonaws.com/{BUCKET-NAME}/{KEY-NAME}"
format = "{DESIRED-FILE-FORMAT}"

DataBricksAutoLoaderSource(spark, options, path, format).read_stream()

OR

DataBricksAutoLoaderSource(spark, options, path, format).read_batch()
from rtdip_sdk.pipelines.sources import DataBricksAutoLoaderSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility

# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()

options = {}
path = "gs://{BUCKET-NAME}/{FILE-PATH}"
format = "{DESIRED-FILE-FORMAT}"

DataBricksAutoLoaderSource(spark, options, path, format).read_stream()

OR

DataBricksAutoLoaderSource(spark, options, path, format).read_batch()

Parameters:

Name Type Description Default
spark SparkSession

Spark Session required to read data from cloud storage

required
options dict

Options that can be specified for configuring the Auto Loader. Further information on the options available are here

required
path str

The cloud storage path

required
format str

Specifies the file format to be read. Supported formats are available here

required
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/autoloader.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
class DataBricksAutoLoaderSource(SourceInterface):
    """
    The Spark Auto Loader is used to read new data files as they arrive in cloud storage. Further information on Auto Loader is available [here](https://docs.databricks.com/ingestion/auto-loader/index.html)

    Example
    --------
    === "ADLS Gen2"

        ```python
        from rtdip_sdk.pipelines.sources import DataBricksAutoLoaderSource
        from rtdip_sdk.pipelines.utilities import SparkSessionUtility

        # Not required if using Databricks
        spark = SparkSessionUtility(config={}).execute()

        options = {}
        path = "abfss://{FILE-SYSTEM}@{ACCOUNT-NAME}.dfs.core.windows.net/{PATH}/{FILE-NAME}
        format = "{DESIRED-FILE-FORMAT}"

        DataBricksAutoLoaderSource(spark, options, path, format).read_stream()

        OR

        DataBricksAutoLoaderSource(spark, options, path, format).read_batch()
        ```
    === "AWS S3"

        ```python
        from rtdip_sdk.pipelines.sources import DataBricksAutoLoaderSource
        from rtdip_sdk.pipelines.utilities import SparkSessionUtility

        # Not required if using Databricks
        spark = SparkSessionUtility(config={}).execute()

        options = {}
        path = "https://s3.{REGION-CODE}.amazonaws.com/{BUCKET-NAME}/{KEY-NAME}"
        format = "{DESIRED-FILE-FORMAT}"

        DataBricksAutoLoaderSource(spark, options, path, format).read_stream()

        OR

        DataBricksAutoLoaderSource(spark, options, path, format).read_batch()
        ```
    === "GCS"

        ```python
        from rtdip_sdk.pipelines.sources import DataBricksAutoLoaderSource
        from rtdip_sdk.pipelines.utilities import SparkSessionUtility

        # Not required if using Databricks
        spark = SparkSessionUtility(config={}).execute()

        options = {}
        path = "gs://{BUCKET-NAME}/{FILE-PATH}"
        format = "{DESIRED-FILE-FORMAT}"

        DataBricksAutoLoaderSource(spark, options, path, format).read_stream()

        OR

        DataBricksAutoLoaderSource(spark, options, path, format).read_batch()
        ```

    Parameters:
        spark (SparkSession): Spark Session required to read data from cloud storage
        options (dict): Options that can be specified for configuring the Auto Loader. Further information on the options available are [here](https://docs.databricks.com/ingestion/auto-loader/options.html)
        path (str): The cloud storage path
        format (str): Specifies the file format to be read. Supported formats are available [here](https://docs.databricks.com/ingestion/auto-loader/options.html#file-format-options)
    """

    spark: SparkSession
    options: dict
    path: str

    def __init__(
        self, spark: SparkSession, options: dict, path: str, format: str
    ) -> None:
        self.spark = spark
        self.options = options
        self.path = path
        self.options["cloudFiles.format"] = format

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK on Databricks
        """
        return SystemType.PYSPARK_DATABRICKS

    @staticmethod
    def libraries():
        libraries = Libraries()
        libraries.add_maven_library(get_default_package("spark_delta_core"))
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def pre_read_validation(self):
        return True

    def post_read_validation(self, df: DataFrame):
        return True

    def read_batch(self):
        """
        Raises:
            NotImplementedError: Auto Loader only supports streaming reads. To perform a batch read, use the read_stream method of this component and specify the Trigger on the write_stream to be `availableNow` to perform batch-like reads of cloud storage files.
        """
        raise NotImplementedError(
            "Auto Loader only supports streaming reads. To perform a batch read, use the read_stream method and specify Trigger on the write_stream as `availableNow`"
        )

    def read_stream(self) -> DataFrame:
        """
        Performs streaming reads of files in cloud storage.
        """
        try:
            return (
                self.spark.readStream.format("cloudFiles")
                .options(**self.options)
                .load(self.path)
            )

        except Exception as e:
            logging.exception(str(e))
            raise e

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK on Databricks

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/autoloader.py
106
107
108
109
110
111
112
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK on Databricks
    """
    return SystemType.PYSPARK_DATABRICKS

read_batch()

Raises:

Type Description
NotImplementedError

Auto Loader only supports streaming reads. To perform a batch read, use the read_stream method of this component and specify the Trigger on the write_stream to be availableNow to perform batch-like reads of cloud storage files.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/autoloader.py
130
131
132
133
134
135
136
137
def read_batch(self):
    """
    Raises:
        NotImplementedError: Auto Loader only supports streaming reads. To perform a batch read, use the read_stream method of this component and specify the Trigger on the write_stream to be `availableNow` to perform batch-like reads of cloud storage files.
    """
    raise NotImplementedError(
        "Auto Loader only supports streaming reads. To perform a batch read, use the read_stream method and specify Trigger on the write_stream as `availableNow`"
    )

read_stream()

Performs streaming reads of files in cloud storage.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/autoloader.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
def read_stream(self) -> DataFrame:
    """
    Performs streaming reads of files in cloud storage.
    """
    try:
        return (
            self.spark.readStream.format("cloudFiles")
            .options(**self.options)
            .load(self.path)
        )

    except Exception as e:
        logging.exception(str(e))
        raise e

SparkDeltaSharingSource

Bases: SourceInterface

The Spark Delta Sharing Source is used to read data from a Delta table where Delta sharing is configured

Example

#Delta Sharing Source for Streaming Queries

from rtdip_sdk.pipelines.sources import SparkDeltaSharingSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility

# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()

delta_sharing_source = SparkDeltaSharingSource(
    spark=spark,
    options={
        "maxFilesPerTrigger": 1000,
        "ignoreChanges: True,
        "startingVersion": 0
    },
    table_name="{YOUR-DELTA-TABLE-PATH}"
)

delta_sharing_source.read_stream()
#Delta Sharing Source for Batch Queries

from rtdip_sdk.pipelines.sources import SparkDeltaSharingSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility

# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()

delta_sharing_source = SparkDeltaSharingSource(
    spark=spark,
    options={
        "versionAsOf": 0,
        "timestampAsOf": "yyyy-mm-dd hh:mm:ss[.fffffffff]"
    },
    table_name="{YOUR-DELTA-TABLE-PATH}"
)

delta_sharing_source.read_batch()

Parameters:

Name Type Description Default
spark SparkSession

Spark Session required to read data from a Delta table

required
options dict

Options that can be specified for a Delta Table read operation (See Attributes table below). Further information on the options is available here

required
table_path str

Path to credentials file and Delta table to query

required

Attributes:

Name Type Description
ignoreDeletes bool str

Ignore transactions that delete data at partition boundaries. (Streaming)

ignoreChanges bool str

Pre-process updates if files had to be rewritten in the source table due to a data changing operation. (Streaming)

startingVersion int str

The Delta Lake version to start from. (Streaming)

startingTimestamp datetime str

The timestamp to start from. (Streaming)

maxFilesPerTrigger int

How many new files to be considered in every micro-batch. The default is 1000. (Streaming)

maxBytesPerTrigger int

How much data gets processed in each micro-batch. (Streaming)

readChangeFeed bool str

Stream read the change data feed of the shared table. (Batch & Streaming)

timestampAsOf datetime str

Query the Delta Table from a specific point in time. (Batch)

versionAsOf int str

Query the Delta Table from a specific version. (Batch)

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/delta_sharing.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
class SparkDeltaSharingSource(SourceInterface):
    """
    The Spark Delta Sharing Source is used to read data from a Delta table where Delta sharing is configured

    Example
    --------
    ```python
    #Delta Sharing Source for Streaming Queries

    from rtdip_sdk.pipelines.sources import SparkDeltaSharingSource
    from rtdip_sdk.pipelines.utilities import SparkSessionUtility

    # Not required if using Databricks
    spark = SparkSessionUtility(config={}).execute()

    delta_sharing_source = SparkDeltaSharingSource(
        spark=spark,
        options={
            "maxFilesPerTrigger": 1000,
            "ignoreChanges: True,
            "startingVersion": 0
        },
        table_name="{YOUR-DELTA-TABLE-PATH}"
    )

    delta_sharing_source.read_stream()
    ```
    ```python
    #Delta Sharing Source for Batch Queries

    from rtdip_sdk.pipelines.sources import SparkDeltaSharingSource
    from rtdip_sdk.pipelines.utilities import SparkSessionUtility

    # Not required if using Databricks
    spark = SparkSessionUtility(config={}).execute()

    delta_sharing_source = SparkDeltaSharingSource(
        spark=spark,
        options={
            "versionAsOf": 0,
            "timestampAsOf": "yyyy-mm-dd hh:mm:ss[.fffffffff]"
        },
        table_name="{YOUR-DELTA-TABLE-PATH}"
    )

    delta_sharing_source.read_batch()
    ```

    Parameters:
        spark (SparkSession): Spark Session required to read data from a Delta table
        options (dict): Options that can be specified for a Delta Table read operation (See Attributes table below). Further information on the options is available [here](https://docs.databricks.com/data-sharing/read-data-open.html#apache-spark-read-shared-data){ target="_blank" }
        table_path (str): Path to credentials file and Delta table to query

    Attributes:
        ignoreDeletes (bool str): Ignore transactions that delete data at partition boundaries. (Streaming)
        ignoreChanges (bool str): Pre-process updates if files had to be rewritten in the source table due to a data changing operation. (Streaming)
        startingVersion (int str): The Delta Lake version to start from. (Streaming)
        startingTimestamp (datetime str): The timestamp to start from. (Streaming)
        maxFilesPerTrigger (int): How many new files to be considered in every micro-batch. The default is 1000. (Streaming)
        maxBytesPerTrigger (int): How much data gets processed in each micro-batch. (Streaming)
        readChangeFeed (bool str): Stream read the change data feed of the shared table. (Batch & Streaming)
        timestampAsOf (datetime str): Query the Delta Table from a specific point in time. (Batch)
        versionAsOf (int str): Query the Delta Table from a specific version. (Batch)
    """

    spark: SparkSession
    options: dict
    table_path: str

    def __init__(self, spark: SparkSession, options: dict, table_path: str) -> None:
        self.spark = spark
        self.options = options
        self.table_path = table_path

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK
        """
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        libraries = Libraries()
        libraries.add_maven_library(get_default_package("spark_delta_sharing"))
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def pre_read_validation(self):
        return True

    def post_read_validation(self):
        return True

    def read_batch(self):
        """
        Reads batch data from Delta. Most of the options provided by the Apache Spark DataFrame read API are supported for performing batch reads on Delta tables.
        """
        try:
            return (
                self.spark.read.format("deltaSharing")
                .options(**self.options)
                .table(self.table_path)
            )

        except Py4JJavaError as e:
            logging.exception(e.errmsg)
            raise e
        except Exception as e:
            logging.exception(str(e))
            raise e

    def read_stream(self) -> DataFrame:
        """
        Reads streaming data from Delta. All of the data in the table is processed as well as any new data that arrives after the stream started. .load() can take table name or path.
        """
        try:
            return (
                self.spark.readStream.format("deltaSharing")
                .options(**self.options)
                .load(self.table_path)
            )

        except Py4JJavaError as e:
            logging.exception(e.errmsg)
            raise e
        except Exception as e:
            logging.exception(str(e))
            raise e

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/delta_sharing.py
 98
 99
100
101
102
103
104
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK
    """
    return SystemType.PYSPARK

read_batch()

Reads batch data from Delta. Most of the options provided by the Apache Spark DataFrame read API are supported for performing batch reads on Delta tables.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/delta_sharing.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def read_batch(self):
    """
    Reads batch data from Delta. Most of the options provided by the Apache Spark DataFrame read API are supported for performing batch reads on Delta tables.
    """
    try:
        return (
            self.spark.read.format("deltaSharing")
            .options(**self.options)
            .table(self.table_path)
        )

    except Py4JJavaError as e:
        logging.exception(e.errmsg)
        raise e
    except Exception as e:
        logging.exception(str(e))
        raise e

read_stream()

Reads streaming data from Delta. All of the data in the table is processed as well as any new data that arrives after the stream started. .load() can take table name or path.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/delta_sharing.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def read_stream(self) -> DataFrame:
    """
    Reads streaming data from Delta. All of the data in the table is processed as well as any new data that arrives after the stream started. .load() can take table name or path.
    """
    try:
        return (
            self.spark.readStream.format("deltaSharing")
            .options(**self.options)
            .load(self.table_path)
        )

    except Py4JJavaError as e:
        logging.exception(e.errmsg)
        raise e
    except Exception as e:
        logging.exception(str(e))
        raise e

SparkEventhubSource

Bases: SourceInterface

This Spark source class is used to read batch or streaming data from Eventhubs. Eventhub configurations need to be specified as options in a dictionary. Additionally, there are more optional configurations which can be found here. If using startingPosition or endingPosition make sure to check out the Event Position section for more details and examples.

Example

#Eventhub Source for Streaming Queries

from rtdip_sdk.pipelines.sources import SparkEventhubSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
import json

# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()

connectionString = "Endpoint=sb://{NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={ACCESS_KEY_NAME};SharedAccessKey={ACCESS_KEY}=;EntityPath={EVENT_HUB_NAME}"

startingEventPosition = {
"offset": -1,
"seqNo": -1,
"enqueuedTime": None,
"isInclusive": True
}

eventhub_source = SparkEventhubSource(
    spark=spark,
    options = {
        "eventhubs.connectionString": connectionString,
        "eventhubs.consumerGroup": "{YOUR-CONSUMER-GROUP}",
        "eventhubs.startingPosition": json.dumps(startingEventPosition),
        "maxEventsPerTrigger" : 1000
    }
)

eventhub_source.read_stream()
 #Eventhub Source for Batch Queries

from rtdip_sdk.pipelines.sources import SparkEventhubSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
import json

# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()

connectionString = "Endpoint=sb://{NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={ACCESS_KEY_NAME};SharedAccessKey={ACCESS_KEY}=;EntityPath={EVENT_HUB_NAME}"

startingEventPosition = {
    "offset": -1,
    "seqNo": -1,
    "enqueuedTime": None,
    "isInclusive": True
}

endingEventPosition = {
    "offset": None,
    "seqNo": -1,
    "enqueuedTime": endTime,
    "isInclusive": True
}

eventhub_source = SparkEventhubSource(
    spark,
    options = {
        "eventhubs.connectionString": connectionString,
        "eventhubs.consumerGroup": "{YOUR-CONSUMER-GROUP}",
        "eventhubs.startingPosition": json.dumps(startingEventPosition),
        "eventhubs.endingPosition": json.dumps(endingEventPosition)
    }
)

eventhub_source.read_batch()

Parameters:

Name Type Description Default
spark SparkSession

Spark Session

required
options dict

A dictionary of Eventhub configurations (See Attributes table below)

required

Attributes:

Name Type Description
eventhubs.connectionString str

Eventhubs connection string is required to connect to the Eventhubs service. (Streaming and Batch)

eventhubs.consumerGroup str

A consumer group is a view of an entire eventhub. Consumer groups enable multiple consuming applications to each have a separate view of the event stream, and to read the stream independently at their own pace and with their own offsets. (Streaming and Batch)

eventhubs.startingPosition JSON str

The starting position for your Structured Streaming job. If a specific EventPosition is not set for a partition using startingPositions, then we use the EventPosition set in startingPosition. If nothing is set in either option, we will begin consuming from the end of the partition. (Streaming and Batch)

eventhubs.endingPosition JSON str

(JSON str): The ending position of a batch query. This works the same as startingPosition. (Batch)

maxEventsPerTrigger long

Rate limit on maximum number of events processed per trigger interval. The specified total number of events will be proportionally split across partitions of different volume. (Stream)

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/eventhub.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
class SparkEventhubSource(SourceInterface):
    """
    This Spark source class is used to read batch or streaming data from Eventhubs. Eventhub configurations need to be specified as options in a dictionary.
    Additionally, there are more optional configurations which can be found [here.](https://github.com/Azure/azure-event-hubs-spark/blob/master/docs/PySpark/structured-streaming-pyspark.md#event-hubs-configuration){ target="_blank" }
    If using startingPosition or endingPosition make sure to check out the **Event Position** section for more details and examples.

    Example
    --------
    ```python
    #Eventhub Source for Streaming Queries

    from rtdip_sdk.pipelines.sources import SparkEventhubSource
    from rtdip_sdk.pipelines.utilities import SparkSessionUtility
    import json

    # Not required if using Databricks
    spark = SparkSessionUtility(config={}).execute()

    connectionString = "Endpoint=sb://{NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={ACCESS_KEY_NAME};SharedAccessKey={ACCESS_KEY}=;EntityPath={EVENT_HUB_NAME}"

    startingEventPosition = {
    "offset": -1,
    "seqNo": -1,
    "enqueuedTime": None,
    "isInclusive": True
    }

    eventhub_source = SparkEventhubSource(
        spark=spark,
        options = {
            "eventhubs.connectionString": connectionString,
            "eventhubs.consumerGroup": "{YOUR-CONSUMER-GROUP}",
            "eventhubs.startingPosition": json.dumps(startingEventPosition),
            "maxEventsPerTrigger" : 1000
        }
    )

    eventhub_source.read_stream()
    ```
    ```python
     #Eventhub Source for Batch Queries

    from rtdip_sdk.pipelines.sources import SparkEventhubSource
    from rtdip_sdk.pipelines.utilities import SparkSessionUtility
    import json

    # Not required if using Databricks
    spark = SparkSessionUtility(config={}).execute()

    connectionString = "Endpoint=sb://{NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={ACCESS_KEY_NAME};SharedAccessKey={ACCESS_KEY}=;EntityPath={EVENT_HUB_NAME}"

    startingEventPosition = {
        "offset": -1,
        "seqNo": -1,
        "enqueuedTime": None,
        "isInclusive": True
    }

    endingEventPosition = {
        "offset": None,
        "seqNo": -1,
        "enqueuedTime": endTime,
        "isInclusive": True
    }

    eventhub_source = SparkEventhubSource(
        spark,
        options = {
            "eventhubs.connectionString": connectionString,
            "eventhubs.consumerGroup": "{YOUR-CONSUMER-GROUP}",
            "eventhubs.startingPosition": json.dumps(startingEventPosition),
            "eventhubs.endingPosition": json.dumps(endingEventPosition)
        }
    )

    eventhub_source.read_batch()
    ```

    Parameters:
        spark (SparkSession): Spark Session
        options (dict): A dictionary of Eventhub configurations (See Attributes table below)

    Attributes:
        eventhubs.connectionString (str):  Eventhubs connection string is required to connect to the Eventhubs service. (Streaming and Batch)
        eventhubs.consumerGroup (str): A consumer group is a view of an entire eventhub. Consumer groups enable multiple consuming applications to each have a separate view of the event stream, and to read the stream independently at their own pace and with their own offsets. (Streaming and Batch)
        eventhubs.startingPosition (JSON str): The starting position for your Structured Streaming job. If a specific EventPosition is not set for a partition using startingPositions, then we use the EventPosition set in startingPosition. If nothing is set in either option, we will begin consuming from the end of the partition. (Streaming and Batch)
        eventhubs.endingPosition: (JSON str): The ending position of a batch query. This works the same as startingPosition. (Batch)
        maxEventsPerTrigger (long): Rate limit on maximum number of events processed per trigger interval. The specified total number of events will be proportionally split across partitions of different volume. (Stream)

    """

    spark: SparkSession
    options: dict

    def __init__(self, spark: SparkSession, options: dict) -> None:
        self.spark = spark
        self.options = options
        self.schema = EVENTHUB_SCHEMA

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK
        """
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        spark_libraries = Libraries()
        spark_libraries.add_maven_library(get_default_package("spark_azure_eventhub"))
        return spark_libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def pre_read_validation(self) -> bool:
        return True

    def post_read_validation(self, df: DataFrame) -> bool:
        assert df.schema == self.schema
        return True

    def read_batch(self) -> DataFrame:
        """
        Reads batch data from Eventhubs.
        """
        eventhub_connection_string = "eventhubs.connectionString"
        try:
            if eventhub_connection_string in self.options:
                sc = self.spark.sparkContext
                self.options[eventhub_connection_string] = (
                    sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
                        self.options[eventhub_connection_string]
                    )
                )

            return self.spark.read.format("eventhubs").options(**self.options).load()

        except Py4JJavaError as e:
            logging.exception(e.errmsg)
            raise e
        except Exception as e:
            logging.exception(str(e))
            raise e

    def read_stream(self) -> DataFrame:
        """
        Reads streaming data from Eventhubs.
        """
        eventhub_connection_string = "eventhubs.connectionString"
        try:
            if eventhub_connection_string in self.options:
                sc = self.spark.sparkContext
                self.options[eventhub_connection_string] = (
                    sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
                        self.options[eventhub_connection_string]
                    )
                )

            return (
                self.spark.readStream.format("eventhubs").options(**self.options).load()
            )

        except Py4JJavaError as e:
            logging.exception(e.errmsg)
            raise e
        except Exception as e:
            logging.exception(str(e))
            raise e

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/eventhub.py
124
125
126
127
128
129
130
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK
    """
    return SystemType.PYSPARK

read_batch()

Reads batch data from Eventhubs.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/eventhub.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def read_batch(self) -> DataFrame:
    """
    Reads batch data from Eventhubs.
    """
    eventhub_connection_string = "eventhubs.connectionString"
    try:
        if eventhub_connection_string in self.options:
            sc = self.spark.sparkContext
            self.options[eventhub_connection_string] = (
                sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
                    self.options[eventhub_connection_string]
                )
            )

        return self.spark.read.format("eventhubs").options(**self.options).load()

    except Py4JJavaError as e:
        logging.exception(e.errmsg)
        raise e
    except Exception as e:
        logging.exception(str(e))
        raise e

read_stream()

Reads streaming data from Eventhubs.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/eventhub.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def read_stream(self) -> DataFrame:
    """
    Reads streaming data from Eventhubs.
    """
    eventhub_connection_string = "eventhubs.connectionString"
    try:
        if eventhub_connection_string in self.options:
            sc = self.spark.sparkContext
            self.options[eventhub_connection_string] = (
                sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
                    self.options[eventhub_connection_string]
                )
            )

        return (
            self.spark.readStream.format("eventhubs").options(**self.options).load()
        )

    except Py4JJavaError as e:
        logging.exception(e.errmsg)
        raise e
    except Exception as e:
        logging.exception(str(e))
        raise e

SparkIoThubSource

Bases: SourceInterface

This Spark source class is used to read batch or streaming data from an IoT Hub. IoT Hub configurations need to be specified as options in a dictionary. Additionally, there are more optional configurations which can be found here. If using startingPosition or endingPosition make sure to check out the Event Position section for more details and examples.

Example

#IoT Hub Source for Streaming Queries

from rtdip_sdk.pipelines.sources import SparkIoThubSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
import json

# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()

connectionString = "Endpoint=sb://{NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={ACCESS_KEY_NAME};SharedAccessKey={ACCESS_KEY}=;EntityPath={EVENT_HUB_NAME}"

startingEventPosition = {
"offset": -1,
"seqNo": -1,
"enqueuedTime": None,
"isInclusive": True
}

iot_hub_source = SparkIoThubSource(
    spark=spark,
    options = {
        "eventhubs.connectionString": connectionString,
        "eventhubs.consumerGroup": "{YOUR-CONSUMER-GROUP}",
        "eventhubs.startingPosition": json.dumps(startingEventPosition),
        "maxEventsPerTrigger" : 1000
    }
)

iot_hub_source.read_stream()
 #IoT Hub Source for Batch Queries

from rtdip_sdk.pipelines.sources import SparkIoThubSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility
import json

# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()

connectionString = "Endpoint=sb://{NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={ACCESS_KEY_NAME};SharedAccessKey={ACCESS_KEY}=;EntityPath={EVENT_HUB_NAME}"

startingEventPosition = {
    "offset": -1,
    "seqNo": -1,
    "enqueuedTime": None,
    "isInclusive": True
}

endingEventPosition = {
    "offset": None,
    "seqNo": -1,
    "enqueuedTime": endTime,
    "isInclusive": True
}

iot_hub_source = SparkIoThubSource(
    spark,
    options = {
        "eventhubs.connectionString": connectionString,
        "eventhubs.consumerGroup": "{YOUR-CONSUMER-GROUP}",
        "eventhubs.startingPosition": json.dumps(startingEventPosition),
        "eventhubs.endingPosition": json.dumps(endingEventPosition)
    }
)

iot_hub_source.read_batch()

Parameters:

Name Type Description Default
spark SparkSession

Spark Session

required
options dict

A dictionary of IoT Hub configurations (See Attributes table below)

required

Attributes:

Name Type Description
eventhubs.connectionString str

IoT Hub connection string is required to connect to the Eventhubs service. (Streaming and Batch)

eventhubs.consumerGroup str

A consumer group is a view of an entire IoT Hub. Consumer groups enable multiple consuming applications to each have a separate view of the event stream, and to read the stream independently at their own pace and with their own offsets. (Streaming and Batch)

eventhubs.startingPosition JSON str

The starting position for your Structured Streaming job. If a specific EventPosition is not set for a partition using startingPositions, then we use the EventPosition set in startingPosition. If nothing is set in either option, we will begin consuming from the end of the partition. (Streaming and Batch)

eventhubs.endingPosition JSON str

(JSON str): The ending position of a batch query. This works the same as startingPosition. (Batch)

maxEventsPerTrigger long

Rate limit on maximum number of events processed per trigger interval. The specified total number of events will be proportionally split across partitions of different volume. (Stream)

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iot_hub.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
class SparkIoThubSource(SourceInterface):
    """
    This Spark source class is used to read batch or streaming data from an IoT Hub. IoT Hub configurations need to be specified as options in a dictionary.
    Additionally, there are more optional configurations which can be found [here.](https://github.com/Azure/azure-event-hubs-spark/blob/master/docs/PySpark/structured-streaming-pyspark.md#event-hubs-configuration){ target="_blank" }
    If using startingPosition or endingPosition make sure to check out the **Event Position** section for more details and examples.

    Example
    --------
    ```python
    #IoT Hub Source for Streaming Queries

    from rtdip_sdk.pipelines.sources import SparkIoThubSource
    from rtdip_sdk.pipelines.utilities import SparkSessionUtility
    import json

    # Not required if using Databricks
    spark = SparkSessionUtility(config={}).execute()

    connectionString = "Endpoint=sb://{NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={ACCESS_KEY_NAME};SharedAccessKey={ACCESS_KEY}=;EntityPath={EVENT_HUB_NAME}"

    startingEventPosition = {
    "offset": -1,
    "seqNo": -1,
    "enqueuedTime": None,
    "isInclusive": True
    }

    iot_hub_source = SparkIoThubSource(
        spark=spark,
        options = {
            "eventhubs.connectionString": connectionString,
            "eventhubs.consumerGroup": "{YOUR-CONSUMER-GROUP}",
            "eventhubs.startingPosition": json.dumps(startingEventPosition),
            "maxEventsPerTrigger" : 1000
        }
    )

    iot_hub_source.read_stream()
    ```
    ```python
     #IoT Hub Source for Batch Queries

    from rtdip_sdk.pipelines.sources import SparkIoThubSource
    from rtdip_sdk.pipelines.utilities import SparkSessionUtility
    import json

    # Not required if using Databricks
    spark = SparkSessionUtility(config={}).execute()

    connectionString = "Endpoint=sb://{NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={ACCESS_KEY_NAME};SharedAccessKey={ACCESS_KEY}=;EntityPath={EVENT_HUB_NAME}"

    startingEventPosition = {
        "offset": -1,
        "seqNo": -1,
        "enqueuedTime": None,
        "isInclusive": True
    }

    endingEventPosition = {
        "offset": None,
        "seqNo": -1,
        "enqueuedTime": endTime,
        "isInclusive": True
    }

    iot_hub_source = SparkIoThubSource(
        spark,
        options = {
            "eventhubs.connectionString": connectionString,
            "eventhubs.consumerGroup": "{YOUR-CONSUMER-GROUP}",
            "eventhubs.startingPosition": json.dumps(startingEventPosition),
            "eventhubs.endingPosition": json.dumps(endingEventPosition)
        }
    )

    iot_hub_source.read_batch()
    ```

    Parameters:
        spark (SparkSession): Spark Session
        options (dict): A dictionary of IoT Hub configurations (See Attributes table below)

    Attributes:
        eventhubs.connectionString (str):  IoT Hub connection string is required to connect to the Eventhubs service. (Streaming and Batch)
        eventhubs.consumerGroup (str): A consumer group is a view of an entire IoT Hub. Consumer groups enable multiple consuming applications to each have a separate view of the event stream, and to read the stream independently at their own pace and with their own offsets. (Streaming and Batch)
        eventhubs.startingPosition (JSON str): The starting position for your Structured Streaming job. If a specific EventPosition is not set for a partition using startingPositions, then we use the EventPosition set in startingPosition. If nothing is set in either option, we will begin consuming from the end of the partition. (Streaming and Batch)
        eventhubs.endingPosition: (JSON str): The ending position of a batch query. This works the same as startingPosition. (Batch)
        maxEventsPerTrigger (long): Rate limit on maximum number of events processed per trigger interval. The specified total number of events will be proportionally split across partitions of different volume. (Stream)

    """

    options: dict
    spark: SparkSession

    def __init__(self, spark: SparkSession, options: dict) -> None:
        self.spark = spark
        self.schema = EVENTHUB_SCHEMA
        self.options = options

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK
        """
        return SystemType.PYSPARK

    @staticmethod
    def settings() -> dict:
        return {}

    @staticmethod
    def libraries():
        spark_libraries = Libraries()
        spark_libraries.add_maven_library(get_default_package("spark_azure_eventhub"))
        return spark_libraries

    def pre_read_validation(self) -> bool:
        return True

    def post_read_validation(self, df: DataFrame) -> bool:
        assert df.schema == self.schema
        return True

    def read_batch(self) -> DataFrame:
        """
        Reads batch data from IoT Hubs.
        """
        iothub_connection_string = "eventhubs.connectionString"
        try:
            if iothub_connection_string in self.options:
                sc = self.spark.sparkContext
                self.options[iothub_connection_string] = (
                    sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
                        self.options[iothub_connection_string]
                    )
                )

            return self.spark.read.format("eventhubs").options(**self.options).load()

        except Py4JJavaError as e:
            logging.exception(e.errmsg)
            raise e
        except Exception as e:
            logging.exception(str(e))
            raise e

    def read_stream(self) -> DataFrame:
        """
        Reads streaming data from IoT Hubs.
        """
        iothub_connection_string = "eventhubs.connectionString"
        try:
            if iothub_connection_string in self.options:
                sc = self.spark.sparkContext
                self.options[iothub_connection_string] = (
                    sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
                        self.options[iothub_connection_string]
                    )
                )

            return (
                self.spark.readStream.format("eventhubs").options(**self.options).load()
            )

        except Py4JJavaError as e:
            logging.exception(e.errmsg)
            raise e
        except Exception as e:
            logging.exception(str(e))
            raise e

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iot_hub.py
124
125
126
127
128
129
130
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK
    """
    return SystemType.PYSPARK

read_batch()

Reads batch data from IoT Hubs.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iot_hub.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def read_batch(self) -> DataFrame:
    """
    Reads batch data from IoT Hubs.
    """
    iothub_connection_string = "eventhubs.connectionString"
    try:
        if iothub_connection_string in self.options:
            sc = self.spark.sparkContext
            self.options[iothub_connection_string] = (
                sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
                    self.options[iothub_connection_string]
                )
            )

        return self.spark.read.format("eventhubs").options(**self.options).load()

    except Py4JJavaError as e:
        logging.exception(e.errmsg)
        raise e
    except Exception as e:
        logging.exception(str(e))
        raise e

read_stream()

Reads streaming data from IoT Hubs.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iot_hub.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def read_stream(self) -> DataFrame:
    """
    Reads streaming data from IoT Hubs.
    """
    iothub_connection_string = "eventhubs.connectionString"
    try:
        if iothub_connection_string in self.options:
            sc = self.spark.sparkContext
            self.options[iothub_connection_string] = (
                sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
                    self.options[iothub_connection_string]
                )
            )

        return (
            self.spark.readStream.format("eventhubs").options(**self.options).load()
        )

    except Py4JJavaError as e:
        logging.exception(e.errmsg)
        raise e
    except Exception as e:
        logging.exception(str(e))
        raise e

SparkKafkaSource

Bases: SourceInterface

This Spark source class is used to read batch or streaming data from Kafka. Required and optional configurations can be found in the Attributes tables below.

Additionally, there are more optional configurations which can be found here.

Example

 #Kafka Source for Streaming Queries

from rtdip_sdk.pipelines.sources import SparkKafkaSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility

# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()

kafka_source = SparkKafkaSource(
    spark=spark,
    options={
        "kafka.bootstrap.servers": "{HOST_1}:{PORT_1},{HOST_2}:{PORT_2}",
        "subscribe": "{TOPIC_1},{TOPIC_2}",
        "includeHeaders", "true"
    }
)

kafka_source.read_stream()
 #Kafka Source for Batch Queries

from rtdip_sdk.pipelines.sources import SparkKafkaSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility

# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()

kafka_source = SparkKafkaSource(
    spark=spark,
    options={
        "kafka.bootstrap.servers": "{HOST_1}:{PORT_1},{HOST_2}:{PORT_2}",
        "subscribe": "{TOPIC_1},{TOPIC_2}",
        "startingOffsets": "earliest",
        "endingOffsets": "latest"
    }
)

kafka_source.read_batch()

Parameters:

Name Type Description Default
spark SparkSession

Spark Session

required
options dict

A dictionary of Kafka configurations (See Attributes tables below). For more information on configuration options see here

required

The following attributes are the most common configurations for Kafka.

The only configuration that must be set for the Kafka source for both batch and streaming queries is listed below.

Attributes:

Name Type Description
kafka.bootstrap.servers A comma-separated list of host︰port

The Kafka "bootstrap.servers" configuration. (Streaming and Batch)

There are multiple ways of specifying which topics to subscribe to. You should provide only one of these attributes:

Attributes:

Name Type Description
assign json string {"topicA"︰[0,1],"topicB"︰[2,4]}

Specific TopicPartitions to consume. Only one of "assign", "subscribe" or "subscribePattern" options can be specified for Kafka source. (Streaming and Batch)

subscribe A comma-separated list of topics

The topic list to subscribe. Only one of "assign", "subscribe" or "subscribePattern" options can be specified for Kafka source. (Streaming and Batch)

subscribePattern Java regex string

The pattern used to subscribe to topic(s). Only one of "assign, "subscribe" or "subscribePattern" options can be specified for Kafka source. (Streaming and Batch)

The following configurations are optional:

Attributes:

Name Type Description
startingTimestamp timestamp str

The start point of timestamp when a query is started, a string specifying a starting timestamp for all partitions in topics being subscribed. Please refer the note on starting timestamp offset options below. (Streaming and Batch)

startingOffsetsByTimestamp JSON str

The start point of timestamp when a query is started, a json string specifying a starting timestamp for each TopicPartition. Please refer the note on starting timestamp offset options below. (Streaming and Batch)

startingOffsets "earliest", "latest" (streaming only), or JSON string

The start point when a query is started, either "earliest" which is from the earliest offsets, "latest" which is just from the latest offsets, or a json string specifying a starting offset for each TopicPartition. In the json, -2 as an offset can be used to refer to earliest, -1 to latest.

endingTimestamp timestamp str

The end point when a batch query is ended, a json string specifying an ending timestamp for all partitions in topics being subscribed. Please refer the note on ending timestamp offset options below. (Batch)

endingOffsetsByTimestamp JSON str

The end point when a batch query is ended, a json string specifying an ending timestamp for each TopicPartition. Please refer the note on ending timestamp offset options below. (Batch)

endingOffsets latest or JSON str

The end point when a batch query is ended, either "latest" which is just referred to the latest, or a json string specifying an ending offset for each TopicPartition. In the json, -1 as an offset can be used to refer to latest, and -2 (earliest) as an offset is not allowed. (Batch)

maxOffsetsPerTrigger long

Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume. (Streaming)

minOffsetsPerTrigger long

Minimum number of offsets to be processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume. (Streaming)

failOnDataLoss bool

Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn't work as you expected.

minPartitions int

Desired minimum number of partitions to read from Kafka. By default, Spark has a 1-1 mapping of topicPartitions to Spark partitions consuming from Kafka. (Streaming and Batch)

includeHeaders bool

Whether to include the Kafka headers in the row. (Streaming and Batch)

Starting Timestamp Offset Note

If Kafka doesn't return the matched offset, the behavior will follow to the value of the option startingOffsetsByTimestampStrategy.

startingTimestamp takes precedence over startingOffsetsByTimestamp and startingOffsets.

For streaming queries, this only applies when a new query is started, and that resuming will always pick up from where the query left off. Newly discovered partitions during a query will start at earliest.

Ending Timestamp Offset Note

If Kafka doesn't return the matched offset, the offset will be set to latest.

endingOffsetsByTimestamp takes precedence over endingOffsets.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/kafka.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
class SparkKafkaSource(SourceInterface):
    """
    This Spark source class is used to read batch or streaming data from Kafka. Required and optional configurations can be found in the Attributes tables below.

    Additionally, there are more optional configurations which can be found [here.](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html){ target="_blank" }

    Example
    --------
    ```python
     #Kafka Source for Streaming Queries

    from rtdip_sdk.pipelines.sources import SparkKafkaSource
    from rtdip_sdk.pipelines.utilities import SparkSessionUtility

    # Not required if using Databricks
    spark = SparkSessionUtility(config={}).execute()

    kafka_source = SparkKafkaSource(
        spark=spark,
        options={
            "kafka.bootstrap.servers": "{HOST_1}:{PORT_1},{HOST_2}:{PORT_2}",
            "subscribe": "{TOPIC_1},{TOPIC_2}",
            "includeHeaders", "true"
        }
    )

    kafka_source.read_stream()
    ```
    ```python
     #Kafka Source for Batch Queries

    from rtdip_sdk.pipelines.sources import SparkKafkaSource
    from rtdip_sdk.pipelines.utilities import SparkSessionUtility

    # Not required if using Databricks
    spark = SparkSessionUtility(config={}).execute()

    kafka_source = SparkKafkaSource(
        spark=spark,
        options={
            "kafka.bootstrap.servers": "{HOST_1}:{PORT_1},{HOST_2}:{PORT_2}",
            "subscribe": "{TOPIC_1},{TOPIC_2}",
            "startingOffsets": "earliest",
            "endingOffsets": "latest"
        }
    )

    kafka_source.read_batch()
    ```

    Parameters:
        spark (SparkSession): Spark Session
        options (dict): A dictionary of Kafka configurations (See Attributes tables below). For more information on configuration options see [here](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html){ target="_blank" }

    The following attributes are the most common configurations for Kafka.

    The only configuration that must be set for the Kafka source for both batch and streaming queries is listed below.

    Attributes:
        kafka.bootstrap.servers (A comma-separated list of host︰port):  The Kafka "bootstrap.servers" configuration. (Streaming and Batch)

    There are multiple ways of specifying which topics to subscribe to. You should provide only one of these attributes:

    Attributes:
        assign (json string {"topicA"︰[0,1],"topicB"︰[2,4]}):  Specific TopicPartitions to consume. Only one of "assign", "subscribe" or "subscribePattern" options can be specified for Kafka source. (Streaming and Batch)
        subscribe (A comma-separated list of topics): The topic list to subscribe. Only one of "assign", "subscribe" or "subscribePattern" options can be specified for Kafka source. (Streaming and Batch)
        subscribePattern (Java regex string): The pattern used to subscribe to topic(s). Only one of "assign, "subscribe" or "subscribePattern" options can be specified for Kafka source. (Streaming and Batch)

    The following configurations are optional:

    Attributes:
        startingTimestamp (timestamp str): The start point of timestamp when a query is started, a string specifying a starting timestamp for all partitions in topics being subscribed. Please refer the note on starting timestamp offset options below. (Streaming and Batch)
        startingOffsetsByTimestamp (JSON str): The start point of timestamp when a query is started, a json string specifying a starting timestamp for each TopicPartition. Please refer the note on starting timestamp offset options below. (Streaming and Batch)
        startingOffsets ("earliest", "latest" (streaming only), or JSON string): The start point when a query is started, either "earliest" which is from the earliest offsets, "latest" which is just from the latest offsets, or a json string specifying a starting offset for each TopicPartition. In the json, -2 as an offset can be used to refer to earliest, -1 to latest.
        endingTimestamp (timestamp str): The end point when a batch query is ended, a json string specifying an ending timestamp for all partitions in topics being subscribed. Please refer the note on ending timestamp offset options below. (Batch)
        endingOffsetsByTimestamp (JSON str): The end point when a batch query is ended, a json string specifying an ending timestamp for each TopicPartition. Please refer the note on ending timestamp offset options below. (Batch)
        endingOffsets (latest or JSON str): The end point when a batch query is ended, either "latest" which is just referred to the latest, or a json string specifying an ending offset for each TopicPartition. In the json, -1 as an offset can be used to refer to latest, and -2 (earliest) as an offset is not allowed. (Batch)
        maxOffsetsPerTrigger (long): Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume. (Streaming)
        minOffsetsPerTrigger (long): Minimum number of offsets to be processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume. (Streaming)
        failOnDataLoss (bool): Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn't work as you expected.
        minPartitions (int): Desired minimum number of partitions to read from Kafka. By default, Spark has a 1-1 mapping of topicPartitions to Spark partitions consuming from Kafka. (Streaming and Batch)
        includeHeaders (bool): Whether to include the Kafka headers in the row. (Streaming and Batch)

    !!! note "Starting Timestamp Offset Note"
        If Kafka doesn't return the matched offset, the behavior will follow to the value of the option <code>startingOffsetsByTimestampStrategy</code>.

        <code>startingTimestamp</code> takes precedence over <code>startingOffsetsByTimestamp</code> and </code>startingOffsets</code>.

        For streaming queries, this only applies when a new query is started, and that resuming will always pick up from where the query left off. Newly discovered partitions during a query will start at earliest.

    !!! note "Ending Timestamp Offset Note"
        If Kafka doesn't return the matched offset, the offset will be set to latest.

        <code>endingOffsetsByTimestamp</code> takes precedence over <code>endingOffsets</code>.

    """

    spark: SparkSession
    options: dict

    def __init__(self, spark: SparkSession, options: dict) -> None:
        self.spark = spark
        self.options = options
        self.schema = KAFKA_SCHEMA

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK
        """
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        spark_libraries = Libraries()
        spark_libraries.add_maven_library(get_default_package("spark_sql_kafka"))
        return spark_libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def pre_read_validation(self) -> bool:
        return True

    def post_read_validation(self, df: DataFrame) -> bool:
        assert df.schema == self.schema
        return True

    def read_batch(self) -> DataFrame:
        """
        Reads batch data from Kafka.
        """
        try:
            return self.spark.read.format("kafka").options(**self.options).load()

        except Py4JJavaError as e:
            logging.exception(e.errmsg)
            raise e
        except Exception as e:
            logging.exception(str(e))
            raise e

    def read_stream(self) -> DataFrame:
        """
        Reads streaming data from Kafka.
        """
        try:
            return self.spark.readStream.format("kafka").options(**self.options).load()

        except Py4JJavaError as e:
            logging.exception(e.errmsg)
            raise e
        except Exception as e:
            logging.exception(str(e))
            raise e

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/kafka.py
130
131
132
133
134
135
136
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK
    """
    return SystemType.PYSPARK

read_batch()

Reads batch data from Kafka.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/kafka.py
155
156
157
158
159
160
161
162
163
164
165
166
167
def read_batch(self) -> DataFrame:
    """
    Reads batch data from Kafka.
    """
    try:
        return self.spark.read.format("kafka").options(**self.options).load()

    except Py4JJavaError as e:
        logging.exception(e.errmsg)
        raise e
    except Exception as e:
        logging.exception(str(e))
        raise e

read_stream()

Reads streaming data from Kafka.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/kafka.py
169
170
171
172
173
174
175
176
177
178
179
180
181
def read_stream(self) -> DataFrame:
    """
    Reads streaming data from Kafka.
    """
    try:
        return self.spark.readStream.format("kafka").options(**self.options).load()

    except Py4JJavaError as e:
        logging.exception(e.errmsg)
        raise e
    except Exception as e:
        logging.exception(str(e))
        raise e

SparkKafkaEventhubSource

Bases: SourceInterface

This Spark source class is used to read batch or streaming data from an Eventhub using the Kafka protocol. This enables Eventhubs to be used as a source in applications like Delta Live Tables or Databricks Serverless Jobs as the Spark Eventhubs JAR is not supported in these scenarios.

The dataframe returned is transformed to ensure the schema is as close to the Eventhub Spark source as possible. There are some minor differences:

  • offset is dependent on x-opt-offset being populated in the headers provided. If this is not found in the headers, the value will be null
  • publisher is dependent on x-opt-publisher being populated in the headers provided. If this is not found in the headers, the value will be null
  • partitionKey is dependent on x-opt-partition-key being populated in the headers provided. If this is not found in the headers, the value will be null
  • systemProperties are identified according to the list provided in the Eventhub documentation and IoT Hub documentation

Default settings will be specified if not provided in the options parameter:

  • kafka.sasl.mechanism will be set to PLAIN
  • kafka.security.protocol will be set to SASL_SSL
  • kafka.request.timeout.ms will be set to 60000
  • kafka.session.timeout.ms will be set to 60000

Examples

#Kafka Source for Streaming Queries

from rtdip_sdk.pipelines.sources import SparkKafkaEventhubSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility

# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()

connectionString = "Endpoint=sb://{NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={ACCESS_KEY_NAME};SharedAccessKey={ACCESS_KEY}=;EntityPath={EVENT_HUB_NAME}"
consumerGroup = "{YOUR-CONSUMER-GROUP}"

kafka_eventhub_source = SparkKafkaEventhubSource(
    spark=spark,
    options={
        "startingOffsets": "earliest",
        "maxOffsetsPerTrigger": 10000,
        "failOnDataLoss": "false",
    },
    connection_string=connectionString,
    consumer_group="consumerGroup"
)

kafka_eventhub_source.read_stream()
#Kafka Source for Batch Queries

from rtdip_sdk.pipelines.sources import SparkKafkaEventhubSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility

# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()

connectionString = "Endpoint=sb://{NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={ACCESS_KEY_NAME};SharedAccessKey={ACCESS_KEY}=;EntityPath={EVENT_HUB_NAME}"
consumerGroup = "{YOUR-CONSUMER-GROUP}"

kafka_eventhub_source = SparkKafkaEventhubSource(
    spark=spark,
    options={
        "startingOffsets": "earliest",
        "endingOffsets": "latest",
        "failOnDataLoss": "false"
    },
    connection_string=connectionString,
    consumer_group="consumerGroup"
)

kafka_eventhub_source.read_batch()

Required and optional configurations can be found in the Attributes and Parameter tables below. Additionally, there are more optional configurations which can be found here.

Parameters:

Name Type Description Default
spark SparkSession

Spark Session

required
options dict

A dictionary of Kafka configurations (See Attributes tables below). For more information on configuration options see here

required
connection_string str

Eventhubs connection string is required to connect to the Eventhubs service. This must include the Eventhub name as the EntityPath parameter. Example "Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=test;SharedAccessKey=test_key;EntityPath=test_eventhub"

required
consumer_group str

The Eventhub consumer group to use for the connection

required
decode_kafka_headers_to_amqp_properties optional bool

Perform decoding of Kafka headers into their AMQP properties. Default is True

True

The only configuration that must be set for the Kafka source for both batch and streaming queries is listed below.

Attributes:

Name Type Description
kafka.bootstrap.servers A comma-separated list of host︰port

The Kafka "bootstrap.servers" configuration. (Streaming and Batch)

There are multiple ways of specifying which topics to subscribe to. You should provide only one of these parameters:

Attributes:

Name Type Description
assign json string {"topicA"︰[0,1],"topicB"︰[2,4]}

Specific TopicPartitions to consume. Only one of "assign", "subscribe" or "subscribePattern" options can be specified for Kafka source. (Streaming and Batch)

subscribe A comma-separated list of topics

The topic list to subscribe. Only one of "assign", "subscribe" or "subscribePattern" options can be specified for Kafka source. (Streaming and Batch)

subscribePattern Java regex string

The pattern used to subscribe to topic(s). Only one of "assign, "subscribe" or "subscribePattern" options can be specified for Kafka source. (Streaming and Batch)

The following configurations are optional:

Attributes:

Name Type Description
startingTimestamp timestamp str

The start point of timestamp when a query is started, a string specifying a starting timestamp for all partitions in topics being subscribed. Please refer the note on starting timestamp offset options below. (Streaming and Batch)

startingOffsetsByTimestamp JSON str

The start point of timestamp when a query is started, a json string specifying a starting timestamp for each TopicPartition. Please refer the note on starting timestamp offset options below. (Streaming and Batch)

startingOffsets "earliest", "latest" (streaming only), or JSON string

The start point when a query is started, either "earliest" which is from the earliest offsets, "latest" which is just from the latest offsets, or a json string specifying a starting offset for each TopicPartition. In the json, -2 as an offset can be used to refer to earliest, -1 to latest.

endingTimestamp timestamp str

The end point when a batch query is ended, a json string specifying an ending timestamp for all partitions in topics being subscribed. Please refer the note on ending timestamp offset options below. (Batch)

endingOffsetsByTimestamp JSON str

The end point when a batch query is ended, a json string specifying an ending timestamp for each TopicPartition. Please refer the note on ending timestamp offset options below. (Batch)

endingOffsets latest or JSON str

The end point when a batch query is ended, either "latest" which is just referred to the latest, or a json string specifying an ending offset for each TopicPartition. In the json, -1 as an offset can be used to refer to latest, and -2 (earliest) as an offset is not allowed. (Batch)

maxOffsetsPerTrigger long

Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume. (Streaming)

minOffsetsPerTrigger long

Minimum number of offsets to be processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume. (Streaming)

failOnDataLoss bool

Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn't work as you expected.

minPartitions int

Desired minimum number of partitions to read from Kafka. By default, Spark has a 1-1 mapping of topicPartitions to Spark partitions consuming from Kafka. (Streaming and Batch)

includeHeaders bool

Whether to include the Kafka headers in the row. (Streaming and Batch)

Starting Timestamp Offset Note

If Kafka doesn't return the matched offset, the behavior will follow to the value of the option startingOffsetsByTimestampStrategy.

startingTimestamp takes precedence over startingOffsetsByTimestamp and startingOffsets.

For streaming queries, this only applies when a new query is started, and that resuming will always pick up from where the query left off. Newly discovered partitions during a query will start at earliest.

Ending Timestamp Offset Note

If Kafka doesn't return the matched offset, the offset will be set to latest.

endingOffsetsByTimestamp takes precedence over endingOffsets.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/kafka_eventhub.py
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
class SparkKafkaEventhubSource(SourceInterface):
    """
    This Spark source class is used to read batch or streaming data from an Eventhub using the Kafka protocol. This enables Eventhubs to be used as a source in applications like Delta Live Tables or Databricks Serverless Jobs as the Spark Eventhubs JAR is not supported in these scenarios.

    The dataframe returned is transformed to ensure the schema is as close to the Eventhub Spark source as possible. There are some minor differences:

    - `offset` is dependent on `x-opt-offset` being populated in the headers provided. If this is not found in the headers, the value will be null
    - `publisher` is dependent on `x-opt-publisher` being populated in the headers provided. If this is not found in the headers, the value will be null
    - `partitionKey` is dependent on `x-opt-partition-key` being populated in the headers provided. If this is not found in the headers, the value will be null
    - `systemProperties` are identified according to the list provided in the [Eventhub documentation](https://learn.microsoft.com/en-us/azure/data-explorer/ingest-data-event-hub-overview#event-system-properties-mapping){ target="_blank" } and [IoT Hub documentation](https://learn.microsoft.com/en-us/azure/data-explorer/ingest-data-iot-hub-overview#event-system-properties-mapping){ target="_blank" }

    Default settings will be specified if not provided in the `options` parameter:

    - `kafka.sasl.mechanism` will be set to `PLAIN`
    - `kafka.security.protocol` will be set to `SASL_SSL`
    - `kafka.request.timeout.ms` will be set to `60000`
    - `kafka.session.timeout.ms` will be set to `60000`

    Examples
    --------
    ```python
    #Kafka Source for Streaming Queries

    from rtdip_sdk.pipelines.sources import SparkKafkaEventhubSource
    from rtdip_sdk.pipelines.utilities import SparkSessionUtility

    # Not required if using Databricks
    spark = SparkSessionUtility(config={}).execute()

    connectionString = "Endpoint=sb://{NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={ACCESS_KEY_NAME};SharedAccessKey={ACCESS_KEY}=;EntityPath={EVENT_HUB_NAME}"
    consumerGroup = "{YOUR-CONSUMER-GROUP}"

    kafka_eventhub_source = SparkKafkaEventhubSource(
        spark=spark,
        options={
            "startingOffsets": "earliest",
            "maxOffsetsPerTrigger": 10000,
            "failOnDataLoss": "false",
        },
        connection_string=connectionString,
        consumer_group="consumerGroup"
    )

    kafka_eventhub_source.read_stream()
    ```
    ```python
    #Kafka Source for Batch Queries

    from rtdip_sdk.pipelines.sources import SparkKafkaEventhubSource
    from rtdip_sdk.pipelines.utilities import SparkSessionUtility

    # Not required if using Databricks
    spark = SparkSessionUtility(config={}).execute()

    connectionString = "Endpoint=sb://{NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={ACCESS_KEY_NAME};SharedAccessKey={ACCESS_KEY}=;EntityPath={EVENT_HUB_NAME}"
    consumerGroup = "{YOUR-CONSUMER-GROUP}"

    kafka_eventhub_source = SparkKafkaEventhubSource(
        spark=spark,
        options={
            "startingOffsets": "earliest",
            "endingOffsets": "latest",
            "failOnDataLoss": "false"
        },
        connection_string=connectionString,
        consumer_group="consumerGroup"
    )

    kafka_eventhub_source.read_batch()
    ```

    Required and optional configurations can be found in the Attributes and Parameter tables below.
    Additionally, there are more optional configurations which can be found [here.](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html){ target="_blank" }

    Parameters:
        spark (SparkSession): Spark Session
        options (dict): A dictionary of Kafka configurations (See Attributes tables below). For more information on configuration options see [here](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html){ target="_blank" }
        connection_string (str): Eventhubs connection string is required to connect to the Eventhubs service. This must include the Eventhub name as the `EntityPath` parameter. Example `"Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=test;SharedAccessKey=test_key;EntityPath=test_eventhub"`
        consumer_group (str): The Eventhub consumer group to use for the connection
        decode_kafka_headers_to_amqp_properties (optional bool): Perform decoding of Kafka headers into their AMQP properties. Default is True

    The only configuration that must be set for the Kafka source for both batch and streaming queries is listed below.

    Attributes:
        kafka.bootstrap.servers (A comma-separated list of host︰port):  The Kafka "bootstrap.servers" configuration. (Streaming and Batch)

    There are multiple ways of specifying which topics to subscribe to. You should provide only one of these parameters:

    Attributes:
        assign (json string {"topicA"︰[0,1],"topicB"︰[2,4]}):  Specific TopicPartitions to consume. Only one of "assign", "subscribe" or "subscribePattern" options can be specified for Kafka source. (Streaming and Batch)
        subscribe (A comma-separated list of topics): The topic list to subscribe. Only one of "assign", "subscribe" or "subscribePattern" options can be specified for Kafka source. (Streaming and Batch)
        subscribePattern (Java regex string): The pattern used to subscribe to topic(s). Only one of "assign, "subscribe" or "subscribePattern" options can be specified for Kafka source. (Streaming and Batch)

    The following configurations are optional:

    Attributes:
        startingTimestamp (timestamp str): The start point of timestamp when a query is started, a string specifying a starting timestamp for all partitions in topics being subscribed. Please refer the note on starting timestamp offset options below. (Streaming and Batch)
        startingOffsetsByTimestamp (JSON str): The start point of timestamp when a query is started, a json string specifying a starting timestamp for each TopicPartition. Please refer the note on starting timestamp offset options below. (Streaming and Batch)
        startingOffsets ("earliest", "latest" (streaming only), or JSON string): The start point when a query is started, either "earliest" which is from the earliest offsets, "latest" which is just from the latest offsets, or a json string specifying a starting offset for each TopicPartition. In the json, -2 as an offset can be used to refer to earliest, -1 to latest.
        endingTimestamp (timestamp str): The end point when a batch query is ended, a json string specifying an ending timestamp for all partitions in topics being subscribed. Please refer the note on ending timestamp offset options below. (Batch)
        endingOffsetsByTimestamp (JSON str): The end point when a batch query is ended, a json string specifying an ending timestamp for each TopicPartition. Please refer the note on ending timestamp offset options below. (Batch)
        endingOffsets (latest or JSON str): The end point when a batch query is ended, either "latest" which is just referred to the latest, or a json string specifying an ending offset for each TopicPartition. In the json, -1 as an offset can be used to refer to latest, and -2 (earliest) as an offset is not allowed. (Batch)
        maxOffsetsPerTrigger (long): Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume. (Streaming)
        minOffsetsPerTrigger (long): Minimum number of offsets to be processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume. (Streaming)
        failOnDataLoss (bool): Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn't work as you expected.
        minPartitions (int): Desired minimum number of partitions to read from Kafka. By default, Spark has a 1-1 mapping of topicPartitions to Spark partitions consuming from Kafka. (Streaming and Batch)
        includeHeaders (bool): Whether to include the Kafka headers in the row. (Streaming and Batch)

    !!! note "Starting Timestamp Offset Note"
        If Kafka doesn't return the matched offset, the behavior will follow to the value of the option <code>startingOffsetsByTimestampStrategy</code>.

        <code>startingTimestamp</code> takes precedence over <code>startingOffsetsByTimestamp</code> and </code>startingOffsets</code>.

        For streaming queries, this only applies when a new query is started, and that resuming will always pick up from where the query left off. Newly discovered partitions during a query will start at earliest.

    !!! note "Ending Timestamp Offset Note"
        If Kafka doesn't return the matched offset, the offset will be set to latest.

        <code>endingOffsetsByTimestamp</code> takes precedence over <code>endingOffsets</code>.

    """

    def __init__(
        self,
        spark: SparkSession,
        options: dict,
        connection_string: str,
        consumer_group: str,
        decode_kafka_headers_to_amqp_properties: bool = True,
    ) -> None:
        self.spark = spark
        self.options = options
        self.connection_string = connection_string
        self.consumer_group = consumer_group
        self.decode_kafka_headers_to_amqp_properties = (
            decode_kafka_headers_to_amqp_properties
        )
        self.connection_string_properties = self._parse_connection_string(
            connection_string
        )
        self.schema = KAFKA_EVENTHUB_SCHEMA
        self.options = self._configure_options(options)

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK
        """
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        spark_libraries = Libraries()
        spark_libraries.add_maven_library(get_default_package("spark_sql_kafka"))
        return spark_libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def pre_read_validation(self) -> bool:
        return True

    def post_read_validation(self, df: DataFrame) -> bool:
        assert df.schema == self.schema
        return True

    # Code is from Azure Eventhub Python SDK. Will import the package if possible with Conda in the  conda-forge channel in the future
    def _parse_connection_string(self, connection_string: str):
        conn_settings = [s.split("=", 1) for s in connection_string.split(";")]
        if any(len(tup) != 2 for tup in conn_settings):
            raise ValueError("Connection string is either blank or malformed.")
        conn_settings = dict(conn_settings)
        shared_access_signature = None
        for key, value in conn_settings.items():
            if key.lower() == "sharedaccesssignature":
                shared_access_signature = value
        shared_access_key = conn_settings.get("SharedAccessKey")
        shared_access_key_name = conn_settings.get("SharedAccessKeyName")
        if any([shared_access_key, shared_access_key_name]) and not all(
            [shared_access_key, shared_access_key_name]
        ):
            raise ValueError(
                "Connection string must have both SharedAccessKeyName and SharedAccessKey."
            )
        if shared_access_signature is not None and shared_access_key is not None:
            raise ValueError(
                "Only one of the SharedAccessKey or SharedAccessSignature must be present."
            )
        endpoint = conn_settings.get("Endpoint")
        if not endpoint:
            raise ValueError("Connection string is either blank or malformed.")
        parsed = urlparse(endpoint.rstrip("/"))
        if not parsed.netloc:
            raise ValueError("Invalid Endpoint on the Connection String.")
        namespace = parsed.netloc.strip()
        properties = {
            "fully_qualified_namespace": namespace,
            "endpoint": endpoint,
            "eventhub_name": conn_settings.get("EntityPath"),
            "shared_access_signature": shared_access_signature,
            "shared_access_key_name": shared_access_key_name,
            "shared_access_key": shared_access_key,
        }
        return properties

    def _connection_string_builder(self, properties: dict) -> str:
        connection_string = "Endpoint=" + properties.get("endpoint") + ";"

        if properties.get("shared_access_key"):
            connection_string += (
                "SharedAccessKey=" + properties.get("shared_access_key") + ";"
            )

        if properties.get("shared_access_key_name"):
            connection_string += (
                "SharedAccessKeyName=" + properties.get("shared_access_key_name") + ";"
            )

        if properties.get("shared_access_signature"):
            connection_string += (
                "SharedAccessSignature="
                + properties.get("shared_access_signature")
                + ";"
            )
        return connection_string

    def _configure_options(self, options: dict) -> dict:
        if "subscribe" not in options:
            options["subscribe"] = self.connection_string_properties.get(
                "eventhub_name"
            )

        if "kafka.bootstrap.servers" not in options:
            options["kafka.bootstrap.servers"] = (
                self.connection_string_properties.get("fully_qualified_namespace")
                + ":9093"
            )

        if "kafka.sasl.mechanism" not in options:
            options["kafka.sasl.mechanism"] = "PLAIN"

        if "kafka.security.protocol" not in options:
            options["kafka.security.protocol"] = "SASL_SSL"

        if "kafka.sasl.jaas.config" not in options:
            kafka_package = "org.apache.kafka.common.security.plain.PlainLoginModule"
            if "DATABRICKS_RUNTIME_VERSION" in os.environ or (
                "_client" in self.spark.__dict__
                and "databricks" in self.spark.client.host
            ):
                kafka_package = "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule"
            connection_string = self._connection_string_builder(
                self.connection_string_properties
            )
            options["kafka.sasl.jaas.config"] = (
                '{} required username="$ConnectionString" password="{}";'.format(
                    kafka_package, connection_string
                )
            )  # NOSONAR

        if "kafka.request.timeout.ms" not in options:
            options["kafka.request.timeout.ms"] = "60000"

        if "kafka.session.timeout.ms" not in options:
            options["kafka.session.timeout.ms"] = "60000"

        if "kafka.group.id" not in options:
            options["kafka.group.id"] = self.consumer_group

        options["includeHeaders"] = "true"

        return options

    def _transform_to_eventhub_schema(self, df: DataFrame) -> DataFrame:
        return (
            df.withColumn("headers", map_from_entries(col("headers")))
            .select(
                col("value").alias("body"),
                col("partition").cast("string"),
                col("offset").alias("sequenceNumber"),
                col("timestamp").alias("enqueuedTime"),
                (
                    decode_kafka_headers_to_amqp_properties(col("headers")).alias(
                        "properties"
                    )
                    if self.decode_kafka_headers_to_amqp_properties
                    else create_map().cast("map<string,string>").alias("properties")
                ),
            )
            .withColumn("offset", col("properties").getItem("x-opt-offset"))
            .withColumn("publisher", col("properties").getItem("x-opt-publisher"))
            .withColumn(
                "partitionKey", col("properties").getItem("x-opt-partition-key")
            )
            .withColumn(
                "systemProperties",
                map_filter(
                    col("properties"), lambda k, _: k.isin(eventhub_system_properties)
                ),
            )
            .withColumn(
                "properties",
                map_filter(
                    col("properties"), lambda k, _: ~k.isin(eventhub_system_properties)
                ),
            )
            .select(
                col("body"),
                col("partition"),
                col("offset"),
                col("sequenceNumber"),
                col("enqueuedTime"),
                col("publisher"),
                col("partitionKey"),
                col("properties"),
                col("systemProperties"),
            )
        )

    def read_batch(self) -> DataFrame:
        """
        Reads batch data from Kafka.
        """
        try:
            df = self.spark.read.format("kafka").options(**self.options).load()
            return self._transform_to_eventhub_schema(df)

        except Py4JJavaError as e:
            logging.exception(e.errmsg)
            raise e
        except Exception as e:
            logging.exception(str(e))
            raise e

    def read_stream(self) -> DataFrame:
        """
        Reads streaming data from Kafka.
        """
        try:
            df = self.spark.readStream.format("kafka").options(**self.options).load()
            return self._transform_to_eventhub_schema(df)

        except Py4JJavaError as e:
            logging.exception(e.errmsg)
            raise e
        except Exception as e:
            logging.exception(str(e))
            raise e

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/kafka_eventhub.py
191
192
193
194
195
196
197
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK
    """
    return SystemType.PYSPARK

read_batch()

Reads batch data from Kafka.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/kafka_eventhub.py
369
370
371
372
373
374
375
376
377
378
379
380
381
382
def read_batch(self) -> DataFrame:
    """
    Reads batch data from Kafka.
    """
    try:
        df = self.spark.read.format("kafka").options(**self.options).load()
        return self._transform_to_eventhub_schema(df)

    except Py4JJavaError as e:
        logging.exception(e.errmsg)
        raise e
    except Exception as e:
        logging.exception(str(e))
        raise e

read_stream()

Reads streaming data from Kafka.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/kafka_eventhub.py
384
385
386
387
388
389
390
391
392
393
394
395
396
397
def read_stream(self) -> DataFrame:
    """
    Reads streaming data from Kafka.
    """
    try:
        df = self.spark.readStream.format("kafka").options(**self.options).load()
        return self._transform_to_eventhub_schema(df)

    except Py4JJavaError as e:
        logging.exception(e.errmsg)
        raise e
    except Exception as e:
        logging.exception(str(e))
        raise e

SparkKinesisSource

Bases: SourceInterface

The Spark Kinesis Source is used to read data from Kinesis in a Databricks environment. Structured streaming from Kinesis is not supported in open source Spark.

Example

from rtdip_sdk.pipelines.sources import SparkKinesisSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility

# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()

kinesis_source = SparkKinesisSource(
    spark=spark,
    options={
        "awsAccessKey": "{AWS-ACCESS-KEY}",
        "awsSecretKey": "{AWS-SECRET-KEY}",
        "streamName": "{STREAM-NAME}",
        "region": "{REGION}",
        "endpoint": "https://kinesis.{REGION}.amazonaws.com",
        "initialPosition": "earliest"
    }
)

kinesis_source.read_stream()

OR

kinesis_source.read_batch()

Parameters:

Name Type Description Default
spark SparkSession

Spark Session required to read data from Kinesis

required
options dict

Options that can be specified for a Kinesis read operation (See Attributes table below). Further information on the options is available here

required

Attributes:

Name Type Description
awsAccessKey str

AWS access key.

awsSecretKey str

AWS secret access key corresponding to the access key.

streamName List[str]

The stream names to subscribe to.

region str

The region the streams are defined in.

endpoint str

The regional endpoint for Kinesis Data Streams.

initialPosition str

The point to start reading from; earliest, latest, or at_timestamp.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/kinesis.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
class SparkKinesisSource(SourceInterface):
    """
    The Spark Kinesis Source is used to read data from Kinesis in a Databricks environment.
    Structured streaming from Kinesis is **not** supported in open source Spark.

    Example
    --------
    ```python
    from rtdip_sdk.pipelines.sources import SparkKinesisSource
    from rtdip_sdk.pipelines.utilities import SparkSessionUtility

    # Not required if using Databricks
    spark = SparkSessionUtility(config={}).execute()

    kinesis_source = SparkKinesisSource(
        spark=spark,
        options={
            "awsAccessKey": "{AWS-ACCESS-KEY}",
            "awsSecretKey": "{AWS-SECRET-KEY}",
            "streamName": "{STREAM-NAME}",
            "region": "{REGION}",
            "endpoint": "https://kinesis.{REGION}.amazonaws.com",
            "initialPosition": "earliest"
        }
    )

    kinesis_source.read_stream()

    OR

    kinesis_source.read_batch()
    ```

    Parameters:
        spark (SparkSession): Spark Session required to read data from Kinesis
        options (dict): Options that can be specified for a Kinesis read operation (See Attributes table below). Further information on the options is available [here](https://docs.databricks.com/structured-streaming/kinesis.html#configuration){ target="_blank" }

    Attributes:
        awsAccessKey (str): AWS access key.
        awsSecretKey (str): AWS secret access key corresponding to the access key.
        streamName (List[str]): The stream names to subscribe to.
        region (str): The region the streams are defined in.
        endpoint (str): The regional endpoint for Kinesis Data Streams.
        initialPosition (str): The point to start reading from; earliest, latest, or at_timestamp.
    """

    spark: SparkSession
    options: dict

    def __init__(self, spark: SparkSession, options: dict) -> None:
        self.spark = spark
        self.options = options
        self.schema = KINESIS_SCHEMA

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK_DATABRICKS
        """
        return SystemType.PYSPARK_DATABRICKS

    @staticmethod
    def libraries():
        libraries = Libraries()
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def pre_read_validation(self):
        return True

    def post_read_validation(self, df: DataFrame) -> bool:
        assert df.schema == self.schema
        return True

    def read_batch(self):
        """
        Raises:
            NotImplementedError: Kinesis only supports streaming reads. To perform a batch read, use the read_stream method of this component and specify the Trigger on the write_stream to be `availableNow=True` to perform batch-like reads of cloud storage files.
        """
        raise NotImplementedError(
            "Kinesis only supports streaming reads. To perform a batch read, use the read_stream method and specify Trigger on the write_stream as `availableNow=True`"
        )

    def read_stream(self) -> DataFrame:
        """
        Reads streaming data from Kinesis. All of the data in the table is processed as well as any new data that arrives after the stream started.
        """
        try:
            return (
                self.spark.readStream.format("kinesis").options(**self.options).load()
            )
        except Py4JJavaError as e:
            logging.exception(e.errmsg)
            raise e
        except Exception as e:
            logging.exception(str(e))
            raise e

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK_DATABRICKS

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/kinesis.py
77
78
79
80
81
82
83
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK_DATABRICKS
    """
    return SystemType.PYSPARK_DATABRICKS

read_batch()

Raises:

Type Description
NotImplementedError

Kinesis only supports streaming reads. To perform a batch read, use the read_stream method of this component and specify the Trigger on the write_stream to be availableNow=True to perform batch-like reads of cloud storage files.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/kinesis.py
101
102
103
104
105
106
107
108
def read_batch(self):
    """
    Raises:
        NotImplementedError: Kinesis only supports streaming reads. To perform a batch read, use the read_stream method of this component and specify the Trigger on the write_stream to be `availableNow=True` to perform batch-like reads of cloud storage files.
    """
    raise NotImplementedError(
        "Kinesis only supports streaming reads. To perform a batch read, use the read_stream method and specify Trigger on the write_stream as `availableNow=True`"
    )

read_stream()

Reads streaming data from Kinesis. All of the data in the table is processed as well as any new data that arrives after the stream started.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/kinesis.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def read_stream(self) -> DataFrame:
    """
    Reads streaming data from Kinesis. All of the data in the table is processed as well as any new data that arrives after the stream started.
    """
    try:
        return (
            self.spark.readStream.format("kinesis").options(**self.options).load()
        )
    except Py4JJavaError as e:
        logging.exception(e.errmsg)
        raise e
    except Exception as e:
        logging.exception(str(e))
        raise e

BaseISOSource

Bases: SourceInterface

Base class for all the ISO Sources. It provides common functionality and helps in reducing the code redundancy.

Parameters:

Name Type Description Default
spark SparkSession

Spark Session instance

required
options dict

A dictionary of ISO Source specific configurations

required
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
class BaseISOSource(SourceInterface):
    """
    Base class for all the ISO Sources. It provides common functionality and helps in reducing the code redundancy.

    Parameters:
        spark (SparkSession): Spark Session instance
        options (dict): A dictionary of ISO Source specific configurations
    """

    spark: SparkSession
    options: dict
    iso_url: str = "https://"
    query_datetime_format: str = "%Y%m%d"
    required_options: list = []
    spark_schema = StructType([StructField("id", IntegerType(), True)])
    default_query_timezone: str = "UTC"

    def __init__(self, spark: SparkSession, options: dict) -> None:
        self.spark = spark
        self.options = options
        self.query_timezone = pytz.timezone(
            self.options.get("query_timezone", self.default_query_timezone)
        )
        self.current_date = datetime.now(timezone.utc).astimezone(self.query_timezone)

    def _fetch_from_url(self, url_suffix: str) -> bytes:
        """
        Gets data from external ISO API.

        Args:
            url_suffix: String to be used as suffix to iso url.

        Returns:
            Raw content of the data received.

        """
        url = f"{self.iso_url}{url_suffix}"
        logging.info(f"Requesting URL - {url}")

        response = requests.get(url)
        code = response.status_code

        if code != 200:
            raise HTTPError(
                f"Unable to access URL `{url}`."
                f" Received status code {code} with message {response.content}"
            )

        return response.content

    def _get_localized_datetime(self, datetime_str: str) -> datetime:
        """
        Converts string datetime into Python datetime object with configured format and timezone.
        Args:
            datetime_str: String to be converted into datetime.

        Returns: Timezone aware datetime object.

        """
        parsed_dt = datetime.strptime(datetime_str, self.query_datetime_format)
        parsed_dt = parsed_dt.replace(tzinfo=self.query_timezone)
        return parsed_dt

    def _pull_data(self) -> pd.DataFrame:
        """
        Hits the fetch_from_url method with certain parameters to get raw data from API.

        All the children ISO classes must override this method and call the fetch_url method
        in it.

        Returns:
             Raw DataFrame from API.
        """

        return pd.read_csv(BytesIO(self._fetch_from_url("")))

    def _prepare_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Performs all the basic transformations to prepare data for further processing.
        All the children ISO classes must override this method.

        Args:
            df: Raw DataFrame, received from the API.

        Returns:
             Modified DataFrame, ready for basic use.

        """
        return df

    def _sanitize_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Another data transformation helper method to be called after prepare data.
        Used for advance data processing such as cleaning, filtering, restructuring.
        All the children ISO classes must override this method if there is any post-processing required.

        Args:
            df: Initial modified version of DataFrame, received after preparing the data.

        Returns:
             Final version of data after all the fixes and modifications.

        """
        return df

    def _get_data(self) -> pd.DataFrame:
        """
        Entrypoint method to return the final version of DataFrame.

        Returns:
            Modified form of data for specific use case.

        """
        df = self._pull_data()
        df = self._prepare_data(df)
        df = self._sanitize_data(df)

        # Reorder columns to keep the data consistent
        df = df[self.spark_schema.names]

        return df

    @staticmethod
    def system_type():
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        libraries = Libraries()
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def _validate_options(self) -> bool:
        """
        Performs all the options checks. Raises exception in case of any invalid value.
        Returns:
             True if all checks are passed.

        """
        return True

    def pre_read_validation(self) -> bool:
        """
        Ensures all the required options are provided and performs other validations.
        Returns:
             True if all checks are passed.

        """
        for key in self.required_options:
            if key not in self.options:
                raise ValueError(f"Required option `{key}` is missing.")

        return self._validate_options()

    def post_read_validation(self) -> bool:
        return True

    def read_batch(self) -> DataFrame:
        """
        Spark entrypoint, It executes the entire process of pulling, transforming & fixing data.
        Returns:
             Final Spark DataFrame converted from Pandas DataFrame post-execution.

        """

        try:
            self.pre_read_validation()
            pdf = self._get_data()
            pdf = _prepare_pandas_to_convert_to_spark(pdf)

            # The below is to fix the compatibility issues between Pandas 2.0 and PySpark.
            pd.DataFrame.iteritems = pd.DataFrame.items
            df = self.spark.createDataFrame(data=pdf, schema=self.spark_schema)
            return df

        except Exception as e:
            logging.exception(str(e))
            raise e

    def read_stream(self) -> DataFrame:
        """
        By default, the streaming operation is not supported but child classes can override if ISO supports streaming.

        Returns:
             Final Spark DataFrame after all the processing.

        """

        raise NotImplementedError(
            f"{self.__class__.__name__} connector doesn't support stream operation."
        )

pre_read_validation()

Ensures all the required options are provided and performs other validations. Returns: True if all checks are passed.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
175
176
177
178
179
180
181
182
183
184
185
186
def pre_read_validation(self) -> bool:
    """
    Ensures all the required options are provided and performs other validations.
    Returns:
         True if all checks are passed.

    """
    for key in self.required_options:
        if key not in self.options:
            raise ValueError(f"Required option `{key}` is missing.")

    return self._validate_options()

read_batch()

Spark entrypoint, It executes the entire process of pulling, transforming & fixing data. Returns: Final Spark DataFrame converted from Pandas DataFrame post-execution.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
def read_batch(self) -> DataFrame:
    """
    Spark entrypoint, It executes the entire process of pulling, transforming & fixing data.
    Returns:
         Final Spark DataFrame converted from Pandas DataFrame post-execution.

    """

    try:
        self.pre_read_validation()
        pdf = self._get_data()
        pdf = _prepare_pandas_to_convert_to_spark(pdf)

        # The below is to fix the compatibility issues between Pandas 2.0 and PySpark.
        pd.DataFrame.iteritems = pd.DataFrame.items
        df = self.spark.createDataFrame(data=pdf, schema=self.spark_schema)
        return df

    except Exception as e:
        logging.exception(str(e))
        raise e

read_stream()

By default, the streaming operation is not supported but child classes can override if ISO supports streaming.

Returns:

Type Description
DataFrame

Final Spark DataFrame after all the processing.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
213
214
215
216
217
218
219
220
221
222
223
224
def read_stream(self) -> DataFrame:
    """
    By default, the streaming operation is not supported but child classes can override if ISO supports streaming.

    Returns:
         Final Spark DataFrame after all the processing.

    """

    raise NotImplementedError(
        f"{self.__class__.__name__} connector doesn't support stream operation."
    )

ERCOTDailyLoadISOSource

Bases: BaseISOSource

The ERCOT Daily Load ISO Source is used to read daily load data from ERCOT using WebScrapping. It supports actual and forecast data. To read more about the reports, visit the following URLs (The urls are only accessible if the requester/client is in US)-

For load type actual: Actual System Load by Weather Zone
For load type forecast: Seven-Day Load Forecast by Weather Zone

Parameters:

Name Type Description Default
spark SparkSession

Spark Session instance

required
options dict

A dictionary of ISO Source specific configurations (See Attributes table below)

required

Attributes:

Name Type Description
load_type list

Must be one of actual or forecast.

date str

Must be in YYYY-MM-DD format.

certificate_pfx_key str

The certificate key data or password received from ERCOT.

certificate_pfx_key_contents str

The certificate data received from ERCOT, it could be base64 encoded.

Please check the BaseISOSource for available methods.

BaseISOSource

BaseISOSource

Bases: SourceInterface

Base class for all the ISO Sources. It provides common functionality and helps in reducing the code redundancy.

Parameters:

Name Type Description Default
spark SparkSession

Spark Session instance

required
options dict

A dictionary of ISO Source specific configurations

required
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
class BaseISOSource(SourceInterface):
    """
    Base class for all the ISO Sources. It provides common functionality and helps in reducing the code redundancy.

    Parameters:
        spark (SparkSession): Spark Session instance
        options (dict): A dictionary of ISO Source specific configurations
    """

    spark: SparkSession
    options: dict
    iso_url: str = "https://"
    query_datetime_format: str = "%Y%m%d"
    required_options: list = []
    spark_schema = StructType([StructField("id", IntegerType(), True)])
    default_query_timezone: str = "UTC"

    def __init__(self, spark: SparkSession, options: dict) -> None:
        self.spark = spark
        self.options = options
        self.query_timezone = pytz.timezone(
            self.options.get("query_timezone", self.default_query_timezone)
        )
        self.current_date = datetime.now(timezone.utc).astimezone(self.query_timezone)

    def _fetch_from_url(self, url_suffix: str) -> bytes:
        """
        Gets data from external ISO API.

        Args:
            url_suffix: String to be used as suffix to iso url.

        Returns:
            Raw content of the data received.

        """
        url = f"{self.iso_url}{url_suffix}"
        logging.info(f"Requesting URL - {url}")

        response = requests.get(url)
        code = response.status_code

        if code != 200:
            raise HTTPError(
                f"Unable to access URL `{url}`."
                f" Received status code {code} with message {response.content}"
            )

        return response.content

    def _get_localized_datetime(self, datetime_str: str) -> datetime:
        """
        Converts string datetime into Python datetime object with configured format and timezone.
        Args:
            datetime_str: String to be converted into datetime.

        Returns: Timezone aware datetime object.

        """
        parsed_dt = datetime.strptime(datetime_str, self.query_datetime_format)
        parsed_dt = parsed_dt.replace(tzinfo=self.query_timezone)
        return parsed_dt

    def _pull_data(self) -> pd.DataFrame:
        """
        Hits the fetch_from_url method with certain parameters to get raw data from API.

        All the children ISO classes must override this method and call the fetch_url method
        in it.

        Returns:
             Raw DataFrame from API.
        """

        return pd.read_csv(BytesIO(self._fetch_from_url("")))

    def _prepare_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Performs all the basic transformations to prepare data for further processing.
        All the children ISO classes must override this method.

        Args:
            df: Raw DataFrame, received from the API.

        Returns:
             Modified DataFrame, ready for basic use.

        """
        return df

    def _sanitize_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Another data transformation helper method to be called after prepare data.
        Used for advance data processing such as cleaning, filtering, restructuring.
        All the children ISO classes must override this method if there is any post-processing required.

        Args:
            df: Initial modified version of DataFrame, received after preparing the data.

        Returns:
             Final version of data after all the fixes and modifications.

        """
        return df

    def _get_data(self) -> pd.DataFrame:
        """
        Entrypoint method to return the final version of DataFrame.

        Returns:
            Modified form of data for specific use case.

        """
        df = self._pull_data()
        df = self._prepare_data(df)
        df = self._sanitize_data(df)

        # Reorder columns to keep the data consistent
        df = df[self.spark_schema.names]

        return df

    @staticmethod
    def system_type():
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        libraries = Libraries()
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def _validate_options(self) -> bool:
        """
        Performs all the options checks. Raises exception in case of any invalid value.
        Returns:
             True if all checks are passed.

        """
        return True

    def pre_read_validation(self) -> bool:
        """
        Ensures all the required options are provided and performs other validations.
        Returns:
             True if all checks are passed.

        """
        for key in self.required_options:
            if key not in self.options:
                raise ValueError(f"Required option `{key}` is missing.")

        return self._validate_options()

    def post_read_validation(self) -> bool:
        return True

    def read_batch(self) -> DataFrame:
        """
        Spark entrypoint, It executes the entire process of pulling, transforming & fixing data.
        Returns:
             Final Spark DataFrame converted from Pandas DataFrame post-execution.

        """

        try:
            self.pre_read_validation()
            pdf = self._get_data()
            pdf = _prepare_pandas_to_convert_to_spark(pdf)

            # The below is to fix the compatibility issues between Pandas 2.0 and PySpark.
            pd.DataFrame.iteritems = pd.DataFrame.items
            df = self.spark.createDataFrame(data=pdf, schema=self.spark_schema)
            return df

        except Exception as e:
            logging.exception(str(e))
            raise e

    def read_stream(self) -> DataFrame:
        """
        By default, the streaming operation is not supported but child classes can override if ISO supports streaming.

        Returns:
             Final Spark DataFrame after all the processing.

        """

        raise NotImplementedError(
            f"{self.__class__.__name__} connector doesn't support stream operation."
        )

pre_read_validation()

Ensures all the required options are provided and performs other validations. Returns: True if all checks are passed.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
175
176
177
178
179
180
181
182
183
184
185
186
def pre_read_validation(self) -> bool:
    """
    Ensures all the required options are provided and performs other validations.
    Returns:
         True if all checks are passed.

    """
    for key in self.required_options:
        if key not in self.options:
            raise ValueError(f"Required option `{key}` is missing.")

    return self._validate_options()

read_batch()

Spark entrypoint, It executes the entire process of pulling, transforming & fixing data. Returns: Final Spark DataFrame converted from Pandas DataFrame post-execution.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
def read_batch(self) -> DataFrame:
    """
    Spark entrypoint, It executes the entire process of pulling, transforming & fixing data.
    Returns:
         Final Spark DataFrame converted from Pandas DataFrame post-execution.

    """

    try:
        self.pre_read_validation()
        pdf = self._get_data()
        pdf = _prepare_pandas_to_convert_to_spark(pdf)

        # The below is to fix the compatibility issues between Pandas 2.0 and PySpark.
        pd.DataFrame.iteritems = pd.DataFrame.items
        df = self.spark.createDataFrame(data=pdf, schema=self.spark_schema)
        return df

    except Exception as e:
        logging.exception(str(e))
        raise e

read_stream()

By default, the streaming operation is not supported but child classes can override if ISO supports streaming.

Returns:

Type Description
DataFrame

Final Spark DataFrame after all the processing.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
213
214
215
216
217
218
219
220
221
222
223
224
def read_stream(self) -> DataFrame:
    """
    By default, the streaming operation is not supported but child classes can override if ISO supports streaming.

    Returns:
         Final Spark DataFrame after all the processing.

    """

    raise NotImplementedError(
        f"{self.__class__.__name__} connector doesn't support stream operation."
    )
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/ercot_daily_load_iso.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
class ERCOTDailyLoadISOSource(BaseISOSource):
    """
    The ERCOT Daily Load ISO Source is used to read daily load data from ERCOT using WebScrapping.
    It supports actual and forecast data. To read more about the reports, visit the following URLs
    (The urls are only accessible if the requester/client is in US)-

    For load type `actual`: [Actual System Load by Weather Zone](https://www.ercot.com/mp/data-products/
    data-product-details?id=NP6-345-CD)
    <br>
    For load type `forecast`: [Seven-Day Load Forecast by Weather Zone](https://www.ercot.com/mp/data-products/
    data-product-details?id=NP3-561-CD)


    Parameters:
        spark (SparkSession): Spark Session instance
        options (dict): A dictionary of ISO Source specific configurations (See Attributes table below)

    Attributes:
        load_type (list): Must be one of `actual` or `forecast`.
        date (str): Must be in `YYYY-MM-DD` format.
        certificate_pfx_key (str): The certificate key data or password received from ERCOT.
        certificate_pfx_key_contents (str): The certificate data received from ERCOT, it could be base64 encoded.

    Please check the BaseISOSource for available methods.

    BaseISOSource:
        ::: src.sdk.python.rtdip_sdk.pipelines.sources.spark.iso.base_iso
    """

    spark: SparkSession
    options: dict
    url_forecast: str = "https://mis.ercot.com/misapp/GetReports.do?reportTypeId=12312"
    url_actual: str = "https://mis.ercot.com/misapp/GetReports.do?reportTypeId=13101"
    url_prefix: str = "https://mis.ercot.com"
    query_datetime_format: str = "%Y-%m-%d"
    required_options = [
        "load_type",
        "date",
        "certificate_pfx_key",
        "certificate_pfx_key_contents",
    ]
    spark_schema = ERCOT_SCHEMA
    default_query_timezone = "UTC"

    def __init__(self, spark: SparkSession, options: dict) -> None:
        super().__init__(spark, options)
        self.spark = spark
        self.options = options
        self.load_type = self.options.get("load_type", "actual")
        self.date = self.options.get("date", "").strip()
        self.certificate_pfx_key = self.options.get("certificate_pfx_key", "").strip()
        self.certificate_pfx_key_contents = self.options.get(
            "certificate_pfx_key_contents", ""
        ).strip()

    def generate_temp_client_cert_files_from_pfx(self):
        password = self.certificate_pfx_key.encode()
        pfx: bytes = base64.b64decode(self.certificate_pfx_key_contents)

        if base64.b64encode(pfx) != self.certificate_pfx_key_contents.encode():
            pfx = self.certificate_pfx_key_contents

        key, cert, _ = pkcs12.load_key_and_certificates(data=pfx, password=password)
        key_bytes = key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.TraditionalOpenSSL,
            encryption_algorithm=serialization.NoEncryption(),
        )

        cert_bytes = cert.public_bytes(encoding=serialization.Encoding.PEM)
        return TempCertFiles(cert_bytes, key_bytes)

    def _pull_data(self) -> pd.DataFrame:
        """
        Pulls data from the ERCOT API and parses the zip files for CSV data.

        Returns:
            Raw form of data.
        """

        logging.info(f"Getting {self.load_type} data for date {self.date}")
        url = self.url_forecast
        req_date = datetime.strptime(self.date, self.query_datetime_format)

        if self.load_type == "actual":
            req_date = req_date + timedelta(days=1)
            url = self.url_actual

        url_lists, files = self.generate_urls_for_zip(url, req_date)
        dfs = []
        logging.info(f"Generated {len(url_lists)} URLs - {url_lists}")
        logging.info(f"Requesting files - {files}")

        for url in url_lists:
            df = self.download_zip(url)
            dfs.append(df)
        final_df = pd.concat(dfs)
        return final_df

    def download_zip(self, url) -> pd.DataFrame:
        logging.info(f"Downloading zip using {url}")
        with self.generate_temp_client_cert_files_from_pfx() as cert:
            response = requests.get(url, cert=cert)

        if not response.content:
            raise HTTPError("Empty Response was returned")

        logging.info("Unzipping the file")
        zf = ZipFile(BytesIO(response.content))
        csvs = [s for s in zf.namelist() if ".csv" in s]

        if len(csvs) == 0:
            raise ValueError("No data was found in the specified interval")

        df = pd.read_csv(zf.open(csvs[0]))
        return df

    def generate_urls_for_zip(self, url: str, date: datetime) -> (List[str], List[str]):
        logging.info(f"Finding urls list for date {date}")
        with self.generate_temp_client_cert_files_from_pfx() as cert:
            page_response = requests.get(url, timeout=5, cert=cert)

        page_content = BeautifulSoup(page_response.content, "html.parser")
        zip_info = []
        length = len(page_content.find_all("td", {"class": "labelOptional_ind"}))

        for i in range(0, length):
            zip_name = page_content.find_all("td", {"class": "labelOptional_ind"})[
                i
            ].text
            zip_link = page_content.find_all("a")[i].get("href")
            zip_info.append((zip_name, zip_link))

        date_str = date.strftime("%Y%m%d")
        zip_info = list(
            filter(
                lambda f_info: f_info[0].endswith("csv.zip") and date_str in f_info[0],
                zip_info,
            )
        )

        urls = []
        files = []

        if len(zip_info) == 0:
            raise ValueError(f"No file was found for date - {date_str}")

        # As Forecast is generated every hour, pick the latest one.
        zip_info = sorted(zip_info, key=lambda item: item[0], reverse=True)
        zip_info_item = zip_info[0]

        file_name, file_url = zip_info_item
        urls.append(self.url_prefix + file_url)
        files.append(file_name)

        return urls, files

    def _prepare_data(self, df: pd.DataFrame) -> pd.DataFrame:
        if self.load_type == "actual":
            df["Date"] = pd.to_datetime(df["OperDay"], format="%m/%d/%Y")

            df = df.rename(
                columns={
                    "COAST": "Coast",
                    "EAST": "East",
                    "FAR_WEST": "FarWest",
                    "NORTH": "North",
                    "NORTH_C": "NorthCentral",
                    "SOUTH_C": "SouthCentral",
                    "SOUTHERN": "Southern",
                    "WEST": "West",
                    "TOTAL": "SystemTotal",
                    "DSTFlag": "DstFlag",
                }
            )

        else:
            df = df.rename(columns={"DSTFlag": "DstFlag"})

            df["Date"] = pd.to_datetime(df["DeliveryDate"], format="%m/%d/%Y")

        return df

    def _validate_options(self) -> bool:
        try:
            datetime.strptime(self.date, self.query_datetime_format)
        except ValueError:
            raise ValueError(
                f"Unable to parse date. Please specify in {self.query_datetime_format} format."
            )
        return True

MISODailyLoadISOSource

Bases: BaseISOSource

The MISO Daily Load ISO Source is used to read daily load data from MISO API. It supports both Actual and Forecast data.

To read more about the available reports from MISO API, download the file - Market Reports

From the list of reports in the file, it pulls the report named Daily Forecast and Actual Load by Local Resource Zone.

Actual data is available for one day minus from the given date.

Forecast data is available for next 6 day (inclusive of given date).

Example

from rtdip_sdk.pipelines.sources import MISODailyLoadISOSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility

# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()

miso_source = MISODailyLoadISOSource(
    spark=spark,
    options={
        "load_type": "actual",
        "date": "20230520",
    }
)

miso_source.read_batch()

Parameters:

Name Type Description Default
spark SparkSession

Spark Session instance

required
options dict

A dictionary of ISO Source specific configurations (See Attributes table below)

required

Attributes:

Name Type Description
load_type str

Must be one of actual or forecast

date str

Must be in YYYYMMDD format.

Please check the BaseISOSource for available methods.

BaseISOSource

BaseISOSource

Bases: SourceInterface

Base class for all the ISO Sources. It provides common functionality and helps in reducing the code redundancy.

Parameters:

Name Type Description Default
spark SparkSession

Spark Session instance

required
options dict

A dictionary of ISO Source specific configurations

required
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
class BaseISOSource(SourceInterface):
    """
    Base class for all the ISO Sources. It provides common functionality and helps in reducing the code redundancy.

    Parameters:
        spark (SparkSession): Spark Session instance
        options (dict): A dictionary of ISO Source specific configurations
    """

    spark: SparkSession
    options: dict
    iso_url: str = "https://"
    query_datetime_format: str = "%Y%m%d"
    required_options: list = []
    spark_schema = StructType([StructField("id", IntegerType(), True)])
    default_query_timezone: str = "UTC"

    def __init__(self, spark: SparkSession, options: dict) -> None:
        self.spark = spark
        self.options = options
        self.query_timezone = pytz.timezone(
            self.options.get("query_timezone", self.default_query_timezone)
        )
        self.current_date = datetime.now(timezone.utc).astimezone(self.query_timezone)

    def _fetch_from_url(self, url_suffix: str) -> bytes:
        """
        Gets data from external ISO API.

        Args:
            url_suffix: String to be used as suffix to iso url.

        Returns:
            Raw content of the data received.

        """
        url = f"{self.iso_url}{url_suffix}"
        logging.info(f"Requesting URL - {url}")

        response = requests.get(url)
        code = response.status_code

        if code != 200:
            raise HTTPError(
                f"Unable to access URL `{url}`."
                f" Received status code {code} with message {response.content}"
            )

        return response.content

    def _get_localized_datetime(self, datetime_str: str) -> datetime:
        """
        Converts string datetime into Python datetime object with configured format and timezone.
        Args:
            datetime_str: String to be converted into datetime.

        Returns: Timezone aware datetime object.

        """
        parsed_dt = datetime.strptime(datetime_str, self.query_datetime_format)
        parsed_dt = parsed_dt.replace(tzinfo=self.query_timezone)
        return parsed_dt

    def _pull_data(self) -> pd.DataFrame:
        """
        Hits the fetch_from_url method with certain parameters to get raw data from API.

        All the children ISO classes must override this method and call the fetch_url method
        in it.

        Returns:
             Raw DataFrame from API.
        """

        return pd.read_csv(BytesIO(self._fetch_from_url("")))

    def _prepare_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Performs all the basic transformations to prepare data for further processing.
        All the children ISO classes must override this method.

        Args:
            df: Raw DataFrame, received from the API.

        Returns:
             Modified DataFrame, ready for basic use.

        """
        return df

    def _sanitize_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Another data transformation helper method to be called after prepare data.
        Used for advance data processing such as cleaning, filtering, restructuring.
        All the children ISO classes must override this method if there is any post-processing required.

        Args:
            df: Initial modified version of DataFrame, received after preparing the data.

        Returns:
             Final version of data after all the fixes and modifications.

        """
        return df

    def _get_data(self) -> pd.DataFrame:
        """
        Entrypoint method to return the final version of DataFrame.

        Returns:
            Modified form of data for specific use case.

        """
        df = self._pull_data()
        df = self._prepare_data(df)
        df = self._sanitize_data(df)

        # Reorder columns to keep the data consistent
        df = df[self.spark_schema.names]

        return df

    @staticmethod
    def system_type():
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        libraries = Libraries()
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def _validate_options(self) -> bool:
        """
        Performs all the options checks. Raises exception in case of any invalid value.
        Returns:
             True if all checks are passed.

        """
        return True

    def pre_read_validation(self) -> bool:
        """
        Ensures all the required options are provided and performs other validations.
        Returns:
             True if all checks are passed.

        """
        for key in self.required_options:
            if key not in self.options:
                raise ValueError(f"Required option `{key}` is missing.")

        return self._validate_options()

    def post_read_validation(self) -> bool:
        return True

    def read_batch(self) -> DataFrame:
        """
        Spark entrypoint, It executes the entire process of pulling, transforming & fixing data.
        Returns:
             Final Spark DataFrame converted from Pandas DataFrame post-execution.

        """

        try:
            self.pre_read_validation()
            pdf = self._get_data()
            pdf = _prepare_pandas_to_convert_to_spark(pdf)

            # The below is to fix the compatibility issues between Pandas 2.0 and PySpark.
            pd.DataFrame.iteritems = pd.DataFrame.items
            df = self.spark.createDataFrame(data=pdf, schema=self.spark_schema)
            return df

        except Exception as e:
            logging.exception(str(e))
            raise e

    def read_stream(self) -> DataFrame:
        """
        By default, the streaming operation is not supported but child classes can override if ISO supports streaming.

        Returns:
             Final Spark DataFrame after all the processing.

        """

        raise NotImplementedError(
            f"{self.__class__.__name__} connector doesn't support stream operation."
        )

pre_read_validation()

Ensures all the required options are provided and performs other validations. Returns: True if all checks are passed.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
175
176
177
178
179
180
181
182
183
184
185
186
def pre_read_validation(self) -> bool:
    """
    Ensures all the required options are provided and performs other validations.
    Returns:
         True if all checks are passed.

    """
    for key in self.required_options:
        if key not in self.options:
            raise ValueError(f"Required option `{key}` is missing.")

    return self._validate_options()

read_batch()

Spark entrypoint, It executes the entire process of pulling, transforming & fixing data. Returns: Final Spark DataFrame converted from Pandas DataFrame post-execution.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
def read_batch(self) -> DataFrame:
    """
    Spark entrypoint, It executes the entire process of pulling, transforming & fixing data.
    Returns:
         Final Spark DataFrame converted from Pandas DataFrame post-execution.

    """

    try:
        self.pre_read_validation()
        pdf = self._get_data()
        pdf = _prepare_pandas_to_convert_to_spark(pdf)

        # The below is to fix the compatibility issues between Pandas 2.0 and PySpark.
        pd.DataFrame.iteritems = pd.DataFrame.items
        df = self.spark.createDataFrame(data=pdf, schema=self.spark_schema)
        return df

    except Exception as e:
        logging.exception(str(e))
        raise e

read_stream()

By default, the streaming operation is not supported but child classes can override if ISO supports streaming.

Returns:

Type Description
DataFrame

Final Spark DataFrame after all the processing.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
213
214
215
216
217
218
219
220
221
222
223
224
def read_stream(self) -> DataFrame:
    """
    By default, the streaming operation is not supported but child classes can override if ISO supports streaming.

    Returns:
         Final Spark DataFrame after all the processing.

    """

    raise NotImplementedError(
        f"{self.__class__.__name__} connector doesn't support stream operation."
    )
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/miso_daily_load_iso.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
class MISODailyLoadISOSource(BaseISOSource):
    """
    The MISO Daily Load ISO Source is used to read daily load data from MISO API. It supports both Actual and Forecast data.

    To read more about the available reports from MISO API, download the file -
    [Market Reports](https://cdn.misoenergy.org/Market%20Reports%20Directory115139.xlsx)

    From the list of reports in the file, it pulls the report named
    `Daily Forecast and Actual Load by Local Resource Zone`.

    Actual data is available for one day minus from the given date.

    Forecast data is available for next 6 day (inclusive of given date).


    Example
    --------
    ```python
    from rtdip_sdk.pipelines.sources import MISODailyLoadISOSource
    from rtdip_sdk.pipelines.utilities import SparkSessionUtility

    # Not required if using Databricks
    spark = SparkSessionUtility(config={}).execute()

    miso_source = MISODailyLoadISOSource(
        spark=spark,
        options={
            "load_type": "actual",
            "date": "20230520",
        }
    )

    miso_source.read_batch()
    ```

    Parameters:
        spark (SparkSession): Spark Session instance
        options (dict): A dictionary of ISO Source specific configurations (See Attributes table below)

    Attributes:
        load_type (str): Must be one of `actual` or `forecast`
        date (str): Must be in `YYYYMMDD` format.

    Please check the BaseISOSource for available methods.

    BaseISOSource:
        ::: src.sdk.python.rtdip_sdk.pipelines.sources.spark.iso.base_iso
    """

    spark: SparkSession
    options: dict
    iso_url: str = "https://docs.misoenergy.org/marketreports/"
    query_datetime_format: str = "%Y%m%d"
    required_options = ["load_type", "date"]
    spark_schema = MISO_SCHEMA
    default_query_timezone = "US/Central"

    def __init__(self, spark: SparkSession, options: dict) -> None:
        super().__init__(spark, options)
        self.spark = spark
        self.options = options
        self.load_type = self.options.get("load_type", "actual")
        self.date = self.options.get("date", "").strip()

    def _pull_data(self) -> pd.DataFrame:
        """
        Pulls data from the MISO API and parses the Excel file.

        Returns:
            Raw form of data.
        """

        logging.info(f"Getting {self.load_type} data for date {self.date}")
        df = pd.read_excel(self._fetch_from_url(f"{self.date}_df_al.xls"), skiprows=4)

        return df

    def _prepare_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Creates a new `date_time` column and removes null values.

        Args:
            df: Raw form of data received from the API.

        Returns:
            Data after basic transformations.

        """

        df.drop(
            df.index[(df["HourEnding"] == "HourEnding") | df["MISO MTLF (MWh)"].isna()],
            inplace=True,
        )
        df.rename(columns={"Market Day": "date"}, inplace=True)

        df["date_time"] = pd.to_datetime(df["date"]) + pd.to_timedelta(
            df["HourEnding"].astype(int) - 1, "h"
        )
        df.drop(["HourEnding", "date"], axis=1, inplace=True)

        data_cols = df.columns[df.columns != "date_time"]
        df[data_cols] = df[data_cols].astype(float)

        df.reset_index(inplace=True, drop=True)

        return df

    def _sanitize_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Filter outs Actual or Forecast data based on `load_type`.
        Args:
            df: Data received after preparation.

        Returns:
            Final data either containing Actual or Forecast values.

        """

        skip_col_suffix = ""

        if self.load_type == "actual":
            skip_col_suffix = "MTLF (MWh)"

        elif self.load_type == "forecast":
            skip_col_suffix = "ActualLoad (MWh)"

        df = df[[x for x in df.columns if not x.endswith(skip_col_suffix)]]
        df = df.dropna()
        df.columns = [str(x.split(" ")[0]).upper() for x in df.columns]

        rename_cols = {
            "LRZ1": "Lrz1",
            "LRZ2_7": "Lrz2_7",
            "LRZ3_5": "Lrz3_5",
            "LRZ4": "Lrz4",
            "LRZ6": "Lrz6",
            "LRZ8_9_10": "Lrz8_9_10",
            "MISO": "Miso",
            "DATE_TIME": "Datetime",
        }

        df = df.rename(columns=rename_cols)

        return df

    def _validate_options(self) -> bool:
        """
        Validates the following options:
            - `date` must be in the correct format.
            - `load_type` must be valid.

        Returns:
            True if all looks good otherwise raises Exception.

        """

        try:
            date = self._get_localized_datetime(self.date)
        except ValueError:
            raise ValueError("Unable to parse Date. Please specify in YYYYMMDD format.")

        if date > self.current_date:
            raise ValueError("Query date can't be in future.")

        valid_load_types = ["actual", "forecast"]

        if self.load_type not in valid_load_types:
            raise ValueError(
                f"Invalid load_type `{self.load_type}` given. Supported values are {valid_load_types}."
            )

        return True

MISOHistoricalLoadISOSource

Bases: MISODailyLoadISOSource

The MISO Historical Load ISO Source is used to read historical load data from MISO API.

To read more about the available reports from MISO API, download the file - Market Reports

From the list of reports in the file, it pulls the report named Historical Daily Forecast and Actual Load by Local Resource Zone.

Example

from rtdip_sdk.pipelines.sources import MISOHistoricalLoadISOSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility

# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()

miso_source = MISOHistoricalLoadISOSource(
    spark=spark,
    options={
        "start_date": "20230510",
        "end_date": "20230520",
    }
)

miso_source.read_batch()

Parameters:

Name Type Description Default
spark SparkSession

Spark Session instance

required
options dict

A dictionary of ISO Source specific configurations (See Attributes table below)

required

Attributes:

Name Type Description
start_date str

Must be in YYYYMMDD format.

end_date str

Must be in YYYYMMDD format.

fill_missing str

Set to "true" to fill missing Actual load with Forecast load. Default - true.

Please check the BaseISOSource for available methods.

BaseISOSource

BaseISOSource

Bases: SourceInterface

Base class for all the ISO Sources. It provides common functionality and helps in reducing the code redundancy.

Parameters:

Name Type Description Default
spark SparkSession

Spark Session instance

required
options dict

A dictionary of ISO Source specific configurations

required
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
class BaseISOSource(SourceInterface):
    """
    Base class for all the ISO Sources. It provides common functionality and helps in reducing the code redundancy.

    Parameters:
        spark (SparkSession): Spark Session instance
        options (dict): A dictionary of ISO Source specific configurations
    """

    spark: SparkSession
    options: dict
    iso_url: str = "https://"
    query_datetime_format: str = "%Y%m%d"
    required_options: list = []
    spark_schema = StructType([StructField("id", IntegerType(), True)])
    default_query_timezone: str = "UTC"

    def __init__(self, spark: SparkSession, options: dict) -> None:
        self.spark = spark
        self.options = options
        self.query_timezone = pytz.timezone(
            self.options.get("query_timezone", self.default_query_timezone)
        )
        self.current_date = datetime.now(timezone.utc).astimezone(self.query_timezone)

    def _fetch_from_url(self, url_suffix: str) -> bytes:
        """
        Gets data from external ISO API.

        Args:
            url_suffix: String to be used as suffix to iso url.

        Returns:
            Raw content of the data received.

        """
        url = f"{self.iso_url}{url_suffix}"
        logging.info(f"Requesting URL - {url}")

        response = requests.get(url)
        code = response.status_code

        if code != 200:
            raise HTTPError(
                f"Unable to access URL `{url}`."
                f" Received status code {code} with message {response.content}"
            )

        return response.content

    def _get_localized_datetime(self, datetime_str: str) -> datetime:
        """
        Converts string datetime into Python datetime object with configured format and timezone.
        Args:
            datetime_str: String to be converted into datetime.

        Returns: Timezone aware datetime object.

        """
        parsed_dt = datetime.strptime(datetime_str, self.query_datetime_format)
        parsed_dt = parsed_dt.replace(tzinfo=self.query_timezone)
        return parsed_dt

    def _pull_data(self) -> pd.DataFrame:
        """
        Hits the fetch_from_url method with certain parameters to get raw data from API.

        All the children ISO classes must override this method and call the fetch_url method
        in it.

        Returns:
             Raw DataFrame from API.
        """

        return pd.read_csv(BytesIO(self._fetch_from_url("")))

    def _prepare_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Performs all the basic transformations to prepare data for further processing.
        All the children ISO classes must override this method.

        Args:
            df: Raw DataFrame, received from the API.

        Returns:
             Modified DataFrame, ready for basic use.

        """
        return df

    def _sanitize_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Another data transformation helper method to be called after prepare data.
        Used for advance data processing such as cleaning, filtering, restructuring.
        All the children ISO classes must override this method if there is any post-processing required.

        Args:
            df: Initial modified version of DataFrame, received after preparing the data.

        Returns:
             Final version of data after all the fixes and modifications.

        """
        return df

    def _get_data(self) -> pd.DataFrame:
        """
        Entrypoint method to return the final version of DataFrame.

        Returns:
            Modified form of data for specific use case.

        """
        df = self._pull_data()
        df = self._prepare_data(df)
        df = self._sanitize_data(df)

        # Reorder columns to keep the data consistent
        df = df[self.spark_schema.names]

        return df

    @staticmethod
    def system_type():
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        libraries = Libraries()
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def _validate_options(self) -> bool:
        """
        Performs all the options checks. Raises exception in case of any invalid value.
        Returns:
             True if all checks are passed.

        """
        return True

    def pre_read_validation(self) -> bool:
        """
        Ensures all the required options are provided and performs other validations.
        Returns:
             True if all checks are passed.

        """
        for key in self.required_options:
            if key not in self.options:
                raise ValueError(f"Required option `{key}` is missing.")

        return self._validate_options()

    def post_read_validation(self) -> bool:
        return True

    def read_batch(self) -> DataFrame:
        """
        Spark entrypoint, It executes the entire process of pulling, transforming & fixing data.
        Returns:
             Final Spark DataFrame converted from Pandas DataFrame post-execution.

        """

        try:
            self.pre_read_validation()
            pdf = self._get_data()
            pdf = _prepare_pandas_to_convert_to_spark(pdf)

            # The below is to fix the compatibility issues between Pandas 2.0 and PySpark.
            pd.DataFrame.iteritems = pd.DataFrame.items
            df = self.spark.createDataFrame(data=pdf, schema=self.spark_schema)
            return df

        except Exception as e:
            logging.exception(str(e))
            raise e

    def read_stream(self) -> DataFrame:
        """
        By default, the streaming operation is not supported but child classes can override if ISO supports streaming.

        Returns:
             Final Spark DataFrame after all the processing.

        """

        raise NotImplementedError(
            f"{self.__class__.__name__} connector doesn't support stream operation."
        )

pre_read_validation()

Ensures all the required options are provided and performs other validations. Returns: True if all checks are passed.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
175
176
177
178
179
180
181
182
183
184
185
186
def pre_read_validation(self) -> bool:
    """
    Ensures all the required options are provided and performs other validations.
    Returns:
         True if all checks are passed.

    """
    for key in self.required_options:
        if key not in self.options:
            raise ValueError(f"Required option `{key}` is missing.")

    return self._validate_options()

read_batch()

Spark entrypoint, It executes the entire process of pulling, transforming & fixing data. Returns: Final Spark DataFrame converted from Pandas DataFrame post-execution.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
def read_batch(self) -> DataFrame:
    """
    Spark entrypoint, It executes the entire process of pulling, transforming & fixing data.
    Returns:
         Final Spark DataFrame converted from Pandas DataFrame post-execution.

    """

    try:
        self.pre_read_validation()
        pdf = self._get_data()
        pdf = _prepare_pandas_to_convert_to_spark(pdf)

        # The below is to fix the compatibility issues between Pandas 2.0 and PySpark.
        pd.DataFrame.iteritems = pd.DataFrame.items
        df = self.spark.createDataFrame(data=pdf, schema=self.spark_schema)
        return df

    except Exception as e:
        logging.exception(str(e))
        raise e

read_stream()

By default, the streaming operation is not supported but child classes can override if ISO supports streaming.

Returns:

Type Description
DataFrame

Final Spark DataFrame after all the processing.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
213
214
215
216
217
218
219
220
221
222
223
224
def read_stream(self) -> DataFrame:
    """
    By default, the streaming operation is not supported but child classes can override if ISO supports streaming.

    Returns:
         Final Spark DataFrame after all the processing.

    """

    raise NotImplementedError(
        f"{self.__class__.__name__} connector doesn't support stream operation."
    )
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/miso_historical_load_iso.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
class MISOHistoricalLoadISOSource(MISODailyLoadISOSource):
    """
    The MISO Historical Load ISO Source is used to read historical load data from MISO API.

    To read more about the available reports from MISO API, download the file -
     [Market Reports](https://cdn.misoenergy.org/Market%20Reports%20Directory115139.xlsx)

    From the list of reports in the file, it pulls the report named
     `Historical Daily Forecast and Actual Load by Local Resource Zone`.

    Example
    --------
    ```python
    from rtdip_sdk.pipelines.sources import MISOHistoricalLoadISOSource
    from rtdip_sdk.pipelines.utilities import SparkSessionUtility

    # Not required if using Databricks
    spark = SparkSessionUtility(config={}).execute()

    miso_source = MISOHistoricalLoadISOSource(
        spark=spark,
        options={
            "start_date": "20230510",
            "end_date": "20230520",
        }
    )

    miso_source.read_batch()
    ```

    Parameters:
        spark (SparkSession): Spark Session instance
        options (dict): A dictionary of ISO Source specific configurations (See Attributes table below)

    Attributes:
        start_date (str): Must be in `YYYYMMDD` format.
        end_date (str): Must be in `YYYYMMDD` format.
        fill_missing (str): Set to `"true"` to fill missing Actual load with Forecast load. Default - `true`.

    Please check the BaseISOSource for available methods.

    BaseISOSource:
        ::: src.sdk.python.rtdip_sdk.pipelines.sources.spark.iso.base_iso
    """

    spark: SparkSession
    options: dict
    required_options = ["start_date", "end_date"]

    def __init__(self, spark: SparkSession, options: dict):
        super().__init__(spark, options)
        self.start_date = self.options.get("start_date", "")
        self.end_date = self.options.get("end_date", "")
        self.fill_missing = bool(self.options.get("fill_missing", "true") == "true")

    def _get_historical_data_for_date(self, date: datetime) -> pd.DataFrame:
        logging.info(f"Getting historical data for date {date}")
        df = pd.read_excel(
            self._fetch_from_url(
                f"{date.strftime(self.query_datetime_format)}_dfal_HIST.xls"
            ),
            skiprows=5,
        )

        if date.month == 12 and date.day == 31:
            expected_year_rows = (
                pd.Timestamp(date.year, 12, 31).dayofyear * 24 * 7
            )  # Every hour has 7 zones.
            received_year_rows = (
                len(df[df["MarketDay"] != "MarketDay"]) - 2
            )  # Last 2 rows are invalid.

            if expected_year_rows != received_year_rows:
                logging.warning(
                    f"Didn't receive full year historical data for year {date.year}."
                    f" Expected {expected_year_rows} but Received {received_year_rows}"
                )

        return df

    def _pull_data(self) -> pd.DataFrame:
        """
        Pulls data from the MISO API and parses the Excel file.

        Returns:
            Raw form of data.
        """

        logging.info(
            f"Historical load requested from {self.start_date} to {self.end_date}"
        )

        start_date = self._get_localized_datetime(self.start_date)
        end_date = self._get_localized_datetime(self.end_date)

        dates = pd.date_range(
            start_date, end_date + timedelta(days=365), freq="Y", inclusive="left"
        )
        logging.info(f"Generated date ranges are - {dates}")

        # Collect all historical data on yearly basis.
        df = pd.concat(
            [
                self._get_historical_data_for_date(min(date, self.current_date))
                for date in dates
            ],
            sort=False,
        )

        return df

    def _prepare_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Creates a new `Datetime` column, removes null values and pivots the data.

        Args:
            df: Raw form of data received from the API.

        Returns:
            Data after basic transformations and pivoting.

        """

        df = df[df["MarketDay"] != "MarketDay"]

        # Fill missing actual values with the forecast values to avoid gaps.
        if self.fill_missing:
            df = df.fillna({"ActualLoad (MWh)": df["MTLF (MWh)"]})

        df = df.rename(
            columns={
                "MarketDay": "date",
                "HourEnding": "hour",
                "ActualLoad (MWh)": "load",
                "LoadResource Zone": "zone",
            }
        )
        df = df.dropna()

        df["date_time"] = pd.to_datetime(df["date"]) + pd.to_timedelta(
            df["hour"].astype(int) - 1, "h"
        )

        df.drop(["hour", "date"], axis=1, inplace=True)
        df["load"] = df["load"].astype(float)

        df = df.pivot_table(
            index="date_time", values="load", columns="zone"
        ).reset_index()

        df.columns = [str(x.split(" ")[0]).upper() for x in df.columns]

        rename_cols = {
            "LRZ1": "Lrz1",
            "LRZ2_7": "Lrz2_7",
            "LRZ3_5": "Lrz3_5",
            "LRZ4": "Lrz4",
            "LRZ6": "Lrz6",
            "LRZ8_9_10": "Lrz8_9_10",
            "MISO": "Miso",
            "DATE_TIME": "Datetime",
        }

        df = df.rename(columns=rename_cols)

        return df

    def _sanitize_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Filter outs data outside the requested date range.

        Args:
            df: Data received after preparation.

        Returns:
            Final data after all the transformations.

        """

        start_date = self._get_localized_datetime(self.start_date)
        end_date = self._get_localized_datetime(self.end_date).replace(
            hour=23, minute=59, second=59
        )

        df = df[
            (df["Datetime"] >= start_date.replace(tzinfo=None))
            & (df["Datetime"] <= end_date.replace(tzinfo=None))
        ]

        df = df.sort_values(by="Datetime", ascending=True).reset_index(drop=True)

        expected_rows = ((min(end_date, self.current_date) - start_date).days + 1) * 24

        actual_rows = len(df)

        logging.info(f"Rows Expected = {expected_rows}, Rows Found = {actual_rows}")

        return df

    def _validate_options(self) -> bool:
        """
        Validates the following options:
            - `start_date` & `end_data` must be in the correct format.
            - `start_date` must be behind `end_data`.
            - `start_date` must not be in the future (UTC).

        Returns:
            True if all looks good otherwise raises Exception.

        """

        try:
            start_date = self._get_localized_datetime(self.start_date)
        except ValueError:
            raise ValueError(
                "Unable to parse Start date. Please specify in YYYYMMDD format."
            )

        try:
            end_date = self._get_localized_datetime(self.end_date)
        except ValueError:
            raise ValueError(
                "Unable to parse End date. Please specify in YYYYMMDD format."
            )

        if start_date > self.current_date:
            raise ValueError("Start date can't be in future.")

        if start_date > end_date:
            raise ValueError("Start date can't be ahead of End date.")

        return True

PJMDailyLoadISOSource

Bases: BaseISOSource

The PJM Daily Load ISO Source is used to read daily load data from PJM API. It supports both Actual and Forecast data. Actual will return 1 day, Forecast will return 7 days.

To read more about the reports, visit the following URLs -
Actual doc: ops_sum_prev_period
Forecast doc: load_frcstd_7_day

Example

from rtdip_sdk.pipelines.sources import PJMDailyLoadISOSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility

# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()

pjm_source = PJMDailyLoadISOSource(
    spark=spark,
    options={
        "api_key": "{api_key}",
        "load_type": "actual"
    }
)

pjm_source.read_batch()

Parameters:

Name Type Description Default
spark SparkSession

Spark Session instance

required
options dict

A dictionary of ISO Source specific configurations (See Attributes table below)

required

Attributes:

Name Type Description
api_key str

Must be a valid key from PJM, see api url

load_type str

Must be one of actual or forecast

Please check the BaseISOSource for available methods.

BaseISOSource

BaseISOSource

Bases: SourceInterface

Base class for all the ISO Sources. It provides common functionality and helps in reducing the code redundancy.

Parameters:

Name Type Description Default
spark SparkSession

Spark Session instance

required
options dict

A dictionary of ISO Source specific configurations

required
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
class BaseISOSource(SourceInterface):
    """
    Base class for all the ISO Sources. It provides common functionality and helps in reducing the code redundancy.

    Parameters:
        spark (SparkSession): Spark Session instance
        options (dict): A dictionary of ISO Source specific configurations
    """

    spark: SparkSession
    options: dict
    iso_url: str = "https://"
    query_datetime_format: str = "%Y%m%d"
    required_options: list = []
    spark_schema = StructType([StructField("id", IntegerType(), True)])
    default_query_timezone: str = "UTC"

    def __init__(self, spark: SparkSession, options: dict) -> None:
        self.spark = spark
        self.options = options
        self.query_timezone = pytz.timezone(
            self.options.get("query_timezone", self.default_query_timezone)
        )
        self.current_date = datetime.now(timezone.utc).astimezone(self.query_timezone)

    def _fetch_from_url(self, url_suffix: str) -> bytes:
        """
        Gets data from external ISO API.

        Args:
            url_suffix: String to be used as suffix to iso url.

        Returns:
            Raw content of the data received.

        """
        url = f"{self.iso_url}{url_suffix}"
        logging.info(f"Requesting URL - {url}")

        response = requests.get(url)
        code = response.status_code

        if code != 200:
            raise HTTPError(
                f"Unable to access URL `{url}`."
                f" Received status code {code} with message {response.content}"
            )

        return response.content

    def _get_localized_datetime(self, datetime_str: str) -> datetime:
        """
        Converts string datetime into Python datetime object with configured format and timezone.
        Args:
            datetime_str: String to be converted into datetime.

        Returns: Timezone aware datetime object.

        """
        parsed_dt = datetime.strptime(datetime_str, self.query_datetime_format)
        parsed_dt = parsed_dt.replace(tzinfo=self.query_timezone)
        return parsed_dt

    def _pull_data(self) -> pd.DataFrame:
        """
        Hits the fetch_from_url method with certain parameters to get raw data from API.

        All the children ISO classes must override this method and call the fetch_url method
        in it.

        Returns:
             Raw DataFrame from API.
        """

        return pd.read_csv(BytesIO(self._fetch_from_url("")))

    def _prepare_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Performs all the basic transformations to prepare data for further processing.
        All the children ISO classes must override this method.

        Args:
            df: Raw DataFrame, received from the API.

        Returns:
             Modified DataFrame, ready for basic use.

        """
        return df

    def _sanitize_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Another data transformation helper method to be called after prepare data.
        Used for advance data processing such as cleaning, filtering, restructuring.
        All the children ISO classes must override this method if there is any post-processing required.

        Args:
            df: Initial modified version of DataFrame, received after preparing the data.

        Returns:
             Final version of data after all the fixes and modifications.

        """
        return df

    def _get_data(self) -> pd.DataFrame:
        """
        Entrypoint method to return the final version of DataFrame.

        Returns:
            Modified form of data for specific use case.

        """
        df = self._pull_data()
        df = self._prepare_data(df)
        df = self._sanitize_data(df)

        # Reorder columns to keep the data consistent
        df = df[self.spark_schema.names]

        return df

    @staticmethod
    def system_type():
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        libraries = Libraries()
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def _validate_options(self) -> bool:
        """
        Performs all the options checks. Raises exception in case of any invalid value.
        Returns:
             True if all checks are passed.

        """
        return True

    def pre_read_validation(self) -> bool:
        """
        Ensures all the required options are provided and performs other validations.
        Returns:
             True if all checks are passed.

        """
        for key in self.required_options:
            if key not in self.options:
                raise ValueError(f"Required option `{key}` is missing.")

        return self._validate_options()

    def post_read_validation(self) -> bool:
        return True

    def read_batch(self) -> DataFrame:
        """
        Spark entrypoint, It executes the entire process of pulling, transforming & fixing data.
        Returns:
             Final Spark DataFrame converted from Pandas DataFrame post-execution.

        """

        try:
            self.pre_read_validation()
            pdf = self._get_data()
            pdf = _prepare_pandas_to_convert_to_spark(pdf)

            # The below is to fix the compatibility issues between Pandas 2.0 and PySpark.
            pd.DataFrame.iteritems = pd.DataFrame.items
            df = self.spark.createDataFrame(data=pdf, schema=self.spark_schema)
            return df

        except Exception as e:
            logging.exception(str(e))
            raise e

    def read_stream(self) -> DataFrame:
        """
        By default, the streaming operation is not supported but child classes can override if ISO supports streaming.

        Returns:
             Final Spark DataFrame after all the processing.

        """

        raise NotImplementedError(
            f"{self.__class__.__name__} connector doesn't support stream operation."
        )

pre_read_validation()

Ensures all the required options are provided and performs other validations. Returns: True if all checks are passed.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
175
176
177
178
179
180
181
182
183
184
185
186
def pre_read_validation(self) -> bool:
    """
    Ensures all the required options are provided and performs other validations.
    Returns:
         True if all checks are passed.

    """
    for key in self.required_options:
        if key not in self.options:
            raise ValueError(f"Required option `{key}` is missing.")

    return self._validate_options()

read_batch()

Spark entrypoint, It executes the entire process of pulling, transforming & fixing data. Returns: Final Spark DataFrame converted from Pandas DataFrame post-execution.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
def read_batch(self) -> DataFrame:
    """
    Spark entrypoint, It executes the entire process of pulling, transforming & fixing data.
    Returns:
         Final Spark DataFrame converted from Pandas DataFrame post-execution.

    """

    try:
        self.pre_read_validation()
        pdf = self._get_data()
        pdf = _prepare_pandas_to_convert_to_spark(pdf)

        # The below is to fix the compatibility issues between Pandas 2.0 and PySpark.
        pd.DataFrame.iteritems = pd.DataFrame.items
        df = self.spark.createDataFrame(data=pdf, schema=self.spark_schema)
        return df

    except Exception as e:
        logging.exception(str(e))
        raise e

read_stream()

By default, the streaming operation is not supported but child classes can override if ISO supports streaming.

Returns:

Type Description
DataFrame

Final Spark DataFrame after all the processing.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
213
214
215
216
217
218
219
220
221
222
223
224
def read_stream(self) -> DataFrame:
    """
    By default, the streaming operation is not supported but child classes can override if ISO supports streaming.

    Returns:
         Final Spark DataFrame after all the processing.

    """

    raise NotImplementedError(
        f"{self.__class__.__name__} connector doesn't support stream operation."
    )
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/pjm_daily_load_iso.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
class PJMDailyLoadISOSource(BaseISOSource):
    """
    The PJM Daily Load ISO Source is used to read daily load data from PJM API.
    It supports both Actual and Forecast data. Actual will return 1 day, Forecast will return 7 days.

    To read more about the reports, visit the following URLs -
    <br>
    Actual doc:    [ops_sum_prev_period](https://dataminer2.pjm.com/feed/ops_sum_prev_period/definition)
    <br>
    Forecast doc:  [load_frcstd_7_day](https://dataminer2.pjm.com/feed/load_frcstd_7_day/definition)

    Example
    --------
    ```python
    from rtdip_sdk.pipelines.sources import PJMDailyLoadISOSource
    from rtdip_sdk.pipelines.utilities import SparkSessionUtility

    # Not required if using Databricks
    spark = SparkSessionUtility(config={}).execute()

    pjm_source = PJMDailyLoadISOSource(
        spark=spark,
        options={
            "api_key": "{api_key}",
            "load_type": "actual"
        }
    )

    pjm_source.read_batch()
    ```

    Parameters:
        spark (SparkSession): Spark Session instance
        options (dict): A dictionary of ISO Source specific configurations (See Attributes table below)

    Attributes:
        api_key (str): Must be a valid key from PJM, see api url
        load_type (str): Must be one of `actual` or `forecast`

    Please check the BaseISOSource for available methods.

    BaseISOSource:
        ::: src.sdk.python.rtdip_sdk.pipelines.sources.spark.iso.base_iso
    """

    spark: SparkSession
    spark_schema = PJM_SCHEMA
    options: dict
    iso_url: str = "https://api.pjm.com/api/v1/"
    query_datetime_format: str = "%Y-%m-%d %H:%M"
    required_options = ["api_key", "load_type"]
    default_query_timezone = "US/Eastern"

    def __init__(self, spark: SparkSession, options: dict) -> None:
        super().__init__(spark, options)
        self.spark: SparkSession = spark
        self.options: dict = options
        self.load_type: str = self.options.get("load_type", "").strip()
        self.api_key: str = self.options.get("api_key", "").strip()
        self.days: int = self.options.get("days", 7)

    def _fetch_from_url(self, url_suffix: str, start_date: str, end_date: str) -> bytes:
        """
        Gets data from external ISO API.

        Args:
            url_suffix: String to be used as suffix to iso url.

        Returns:
            Raw content of the data received.
        """

        url = f"{self.iso_url}{url_suffix}"
        headers = {"Ocp-Apim-Subscription-Key": self.api_key}
        logging.info(
            f"Requesting URL - {url}, start_date={start_date}, end_date={end_date}, load_type={self.load_type}"
        )
        load_key = (
            "datetime_beginning_ept"
            if self.load_type != "forecast"
            else "forecast_datetime_beginning_ept"
        )
        feed = (
            "ops_sum_prev_period"
            if self.load_type != "forecast"
            else "load_frcstd_7_day"
        )
        query = {
            "startRow": "1",
            load_key: f"{start_date}to{end_date}",
            "format": "csv",
            "download": "true",
        }
        query_s = "&".join(["=".join([k, v]) for k, v in query.items()])
        new_url = f"{url}{feed}?{query_s}"
        response = requests.get(new_url, headers=headers)
        code = response.status_code

        if code != 200:
            raise requests.HTTPError(
                f"Unable to access URL `{url}`."
                f" Received status code {code} with message {response.content}"
            )
        return response.content

    def _pull_data(self) -> pd.DataFrame:
        """
        Pulls data from the PJM API and parses the return.

        Returns:
            Raw form of data.
        """
        start_date = self.current_date - timedelta(days=1)
        start_date = start_date.replace(hour=0, minute=0)
        end_date = (start_date + timedelta(days=self.days)).replace(hour=23)
        start_date_str = start_date.strftime(self.query_datetime_format)
        end_date_str = end_date.strftime(self.query_datetime_format)
        df = pd.read_csv(
            BytesIO(self._fetch_from_url("", start_date_str, end_date_str))
        )

        return df

    def _prepare_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Creates a new date time column and removes null values. Renames columns

        Args:
            df: Raw form of data received from the API.

        Returns:
            Data after basic transformations.

        """

        if self.load_type == "forecast":
            df = df.rename(
                columns={
                    "forecast_datetime_beginning_utc": "start_time",
                    "forecast_area": "zone",
                    "forecast_datetime_ending_utc": "end_time",
                    "forecast_load_mw": "load",
                }
            )
        else:
            df = df.rename(
                columns={
                    "datetime_beginning_utc": "start_time",
                    "area": "zone",
                    "datetime_ending_utc": "end_time",
                    "actual_load": "load",
                }
            )

        df = df[["start_time", "end_time", "zone", "load"]]
        df = df.replace({np.nan: None, "": None})

        date_cols = ["start_time", "end_time"]
        for col in date_cols:
            df[col] = pd.to_datetime(df[col], format="%m/%d/%Y %I:%M:%S %p")

        df["load"] = df["load"].astype(float)
        df = df.replace({np.nan: None, "": None})
        df.columns = list(map(lambda x: x.upper(), df.columns))

        rename_cols = {
            "START_TIME": "StartTime",
            "END_TIME": "EndTime",
            "ZONE": "Zone",
            "LOAD": "Load",
        }

        df = df.rename(columns=rename_cols)

        df.reset_index(inplace=True, drop=True)

        return df

    def _validate_options(self) -> bool:
        """
        Validates the following options:
            - `load_type` must be valid.

        Returns:
            True if all looks good otherwise raises Exception.
        """

        valid_load_types = ["actual", "forecast"]

        if self.load_type not in valid_load_types:
            raise ValueError(
                f"Invalid load_type `{self.load_type}` given. Supported values are {valid_load_types}."
            )

        return True

PJMDailyPricingISOSource

Bases: BaseISOSource

The PJM Daily Pricing ISO Source is used to retrieve Real-Time and Day-Ahead hourly data from PJM API. Real-Time will return data for T - 3 to T days and Day-Ahead will return T - 3 to T + 1 days data.

API: https://api.pjm.com/api/v1/ (must be a valid apy key from PJM)

Real-Time doc: https://dataminer2.pjm.com/feed/rt_hrl_lmps/definition

Day-Ahead doc: https://dataminer2.pjm.com/feed/da_hrl_lmps/definition

Example

from rtdip_sdk.pipelines.sources import PJMDailyPricingISOSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility

# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()

pjm_source = PJMDailyPricingISOSource(
    spark=spark,
    options={
        "api_key": "{api_key}",
        "load_type": "real_time"
    }
)

pjm_source.read_batch()

Parameters:

Name Type Description Default
spark SparkSession

Spark Session instance

required
options dict

A dictionary of ISO Source specific configurations (See Attributes table below)

required

Attributes:

Name Type Description
api_key str

Must be a valid key from PJM, see api url

load_type str

Must be one of real_time or day_ahead

Please check the BaseISOSource for available methods.

BaseISOSource

BaseISOSource

Bases: SourceInterface

Base class for all the ISO Sources. It provides common functionality and helps in reducing the code redundancy.

Parameters:

Name Type Description Default
spark SparkSession

Spark Session instance

required
options dict

A dictionary of ISO Source specific configurations

required
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
class BaseISOSource(SourceInterface):
    """
    Base class for all the ISO Sources. It provides common functionality and helps in reducing the code redundancy.

    Parameters:
        spark (SparkSession): Spark Session instance
        options (dict): A dictionary of ISO Source specific configurations
    """

    spark: SparkSession
    options: dict
    iso_url: str = "https://"
    query_datetime_format: str = "%Y%m%d"
    required_options: list = []
    spark_schema = StructType([StructField("id", IntegerType(), True)])
    default_query_timezone: str = "UTC"

    def __init__(self, spark: SparkSession, options: dict) -> None:
        self.spark = spark
        self.options = options
        self.query_timezone = pytz.timezone(
            self.options.get("query_timezone", self.default_query_timezone)
        )
        self.current_date = datetime.now(timezone.utc).astimezone(self.query_timezone)

    def _fetch_from_url(self, url_suffix: str) -> bytes:
        """
        Gets data from external ISO API.

        Args:
            url_suffix: String to be used as suffix to iso url.

        Returns:
            Raw content of the data received.

        """
        url = f"{self.iso_url}{url_suffix}"
        logging.info(f"Requesting URL - {url}")

        response = requests.get(url)
        code = response.status_code

        if code != 200:
            raise HTTPError(
                f"Unable to access URL `{url}`."
                f" Received status code {code} with message {response.content}"
            )

        return response.content

    def _get_localized_datetime(self, datetime_str: str) -> datetime:
        """
        Converts string datetime into Python datetime object with configured format and timezone.
        Args:
            datetime_str: String to be converted into datetime.

        Returns: Timezone aware datetime object.

        """
        parsed_dt = datetime.strptime(datetime_str, self.query_datetime_format)
        parsed_dt = parsed_dt.replace(tzinfo=self.query_timezone)
        return parsed_dt

    def _pull_data(self) -> pd.DataFrame:
        """
        Hits the fetch_from_url method with certain parameters to get raw data from API.

        All the children ISO classes must override this method and call the fetch_url method
        in it.

        Returns:
             Raw DataFrame from API.
        """

        return pd.read_csv(BytesIO(self._fetch_from_url("")))

    def _prepare_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Performs all the basic transformations to prepare data for further processing.
        All the children ISO classes must override this method.

        Args:
            df: Raw DataFrame, received from the API.

        Returns:
             Modified DataFrame, ready for basic use.

        """
        return df

    def _sanitize_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Another data transformation helper method to be called after prepare data.
        Used for advance data processing such as cleaning, filtering, restructuring.
        All the children ISO classes must override this method if there is any post-processing required.

        Args:
            df: Initial modified version of DataFrame, received after preparing the data.

        Returns:
             Final version of data after all the fixes and modifications.

        """
        return df

    def _get_data(self) -> pd.DataFrame:
        """
        Entrypoint method to return the final version of DataFrame.

        Returns:
            Modified form of data for specific use case.

        """
        df = self._pull_data()
        df = self._prepare_data(df)
        df = self._sanitize_data(df)

        # Reorder columns to keep the data consistent
        df = df[self.spark_schema.names]

        return df

    @staticmethod
    def system_type():
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        libraries = Libraries()
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def _validate_options(self) -> bool:
        """
        Performs all the options checks. Raises exception in case of any invalid value.
        Returns:
             True if all checks are passed.

        """
        return True

    def pre_read_validation(self) -> bool:
        """
        Ensures all the required options are provided and performs other validations.
        Returns:
             True if all checks are passed.

        """
        for key in self.required_options:
            if key not in self.options:
                raise ValueError(f"Required option `{key}` is missing.")

        return self._validate_options()

    def post_read_validation(self) -> bool:
        return True

    def read_batch(self) -> DataFrame:
        """
        Spark entrypoint, It executes the entire process of pulling, transforming & fixing data.
        Returns:
             Final Spark DataFrame converted from Pandas DataFrame post-execution.

        """

        try:
            self.pre_read_validation()
            pdf = self._get_data()
            pdf = _prepare_pandas_to_convert_to_spark(pdf)

            # The below is to fix the compatibility issues between Pandas 2.0 and PySpark.
            pd.DataFrame.iteritems = pd.DataFrame.items
            df = self.spark.createDataFrame(data=pdf, schema=self.spark_schema)
            return df

        except Exception as e:
            logging.exception(str(e))
            raise e

    def read_stream(self) -> DataFrame:
        """
        By default, the streaming operation is not supported but child classes can override if ISO supports streaming.

        Returns:
             Final Spark DataFrame after all the processing.

        """

        raise NotImplementedError(
            f"{self.__class__.__name__} connector doesn't support stream operation."
        )

pre_read_validation()

Ensures all the required options are provided and performs other validations. Returns: True if all checks are passed.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
175
176
177
178
179
180
181
182
183
184
185
186
def pre_read_validation(self) -> bool:
    """
    Ensures all the required options are provided and performs other validations.
    Returns:
         True if all checks are passed.

    """
    for key in self.required_options:
        if key not in self.options:
            raise ValueError(f"Required option `{key}` is missing.")

    return self._validate_options()

read_batch()

Spark entrypoint, It executes the entire process of pulling, transforming & fixing data. Returns: Final Spark DataFrame converted from Pandas DataFrame post-execution.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
def read_batch(self) -> DataFrame:
    """
    Spark entrypoint, It executes the entire process of pulling, transforming & fixing data.
    Returns:
         Final Spark DataFrame converted from Pandas DataFrame post-execution.

    """

    try:
        self.pre_read_validation()
        pdf = self._get_data()
        pdf = _prepare_pandas_to_convert_to_spark(pdf)

        # The below is to fix the compatibility issues between Pandas 2.0 and PySpark.
        pd.DataFrame.iteritems = pd.DataFrame.items
        df = self.spark.createDataFrame(data=pdf, schema=self.spark_schema)
        return df

    except Exception as e:
        logging.exception(str(e))
        raise e

read_stream()

By default, the streaming operation is not supported but child classes can override if ISO supports streaming.

Returns:

Type Description
DataFrame

Final Spark DataFrame after all the processing.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
213
214
215
216
217
218
219
220
221
222
223
224
def read_stream(self) -> DataFrame:
    """
    By default, the streaming operation is not supported but child classes can override if ISO supports streaming.

    Returns:
         Final Spark DataFrame after all the processing.

    """

    raise NotImplementedError(
        f"{self.__class__.__name__} connector doesn't support stream operation."
    )
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/pjm_daily_pricing_iso.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
class PJMDailyPricingISOSource(BaseISOSource):
    """
    The PJM Daily Pricing ISO Source is used to retrieve Real-Time and Day-Ahead hourly data from PJM API.
    Real-Time will return data for T - 3 to T days and Day-Ahead will return T - 3 to T + 1 days data.

    API:             <a href="https://api.pjm.com/api/v1/">https://api.pjm.com/api/v1/</a>  (must be a valid apy key from PJM)

    Real-Time doc:    <a href="https://dataminer2.pjm.com/feed/rt_hrl_lmps/definition">https://dataminer2.pjm.com/feed/rt_hrl_lmps/definition</a>

    Day-Ahead doc:    <a href="https://dataminer2.pjm.com/feed/da_hrl_lmps/definition">https://dataminer2.pjm.com/feed/da_hrl_lmps/definition</a>

    Example
    --------
    ```python
    from rtdip_sdk.pipelines.sources import PJMDailyPricingISOSource
    from rtdip_sdk.pipelines.utilities import SparkSessionUtility

    # Not required if using Databricks
    spark = SparkSessionUtility(config={}).execute()

    pjm_source = PJMDailyPricingISOSource(
        spark=spark,
        options={
            "api_key": "{api_key}",
            "load_type": "real_time"
        }
    )

    pjm_source.read_batch()
    ```

    Parameters:
       spark (SparkSession): Spark Session instance
       options (dict): A dictionary of ISO Source specific configurations (See Attributes table below)

    Attributes:
        api_key (str): Must be a valid key from PJM, see api url
        load_type (str): Must be one of `real_time` or `day_ahead`

    Please check the BaseISOSource for available methods.

    BaseISOSource:
        ::: src.sdk.python.rtdip_sdk.pipelines.sources.spark.iso.base_iso
    """

    spark: SparkSession
    spark_schema = PJM_PRICING_SCHEMA
    options: dict
    iso_url: str = "https://api.pjm.com/api/v1/"
    query_datetime_format: str = "%Y-%m-%d %H:%M"
    required_options = ["api_key", "load_type"]
    default_query_timezone = "US/Eastern"

    def __init__(self, spark: SparkSession, options: dict) -> None:
        super().__init__(spark, options)
        self.spark: SparkSession = spark
        self.options: dict = options
        self.load_type: str = self.options.get("load_type", "").strip()
        self.api_key: str = self.options.get("api_key", "").strip()
        self.days: int = self.options.get("days", 3)

    def _fetch_paginated_data(
        self, url_suffix: str, start_date: str, end_date: str
    ) -> bytes:
        """
        Fetches data from the PJM API with pagination support.

        Args:
            url_suffix: String to be used as suffix to ISO URL.
            start_date: Start date for the data retrieval.
            end_date: End date for the data retrieval.

        Returns:
            Raw content of the data received.
        """
        headers = {"Ocp-Apim-Subscription-Key": self.api_key}
        items = []
        query = {
            "startRow": "1",
            "rowCount": "5",
            "datetime_beginning_ept": f"{start_date}to{end_date}",
        }
        query_s = "&".join(["=".join([k, v]) for k, v in query.items()])
        base_url = f"{self.iso_url}{url_suffix}?{query_s}"

        next_page = base_url

        logging.info(
            f"Requesting URL - {base_url}, start_date={start_date}, end_date={end_date}, load_type={self.load_type}"
        )

        while next_page:
            now = datetime.now()
            logging.info(f"Timestamp: {now}")
            response = requests.get(next_page, headers=headers)
            code = response.status_code

            if code != 200:
                raise requests.HTTPError(
                    f"Unable to access URL `{next_page}`."
                    f" Received status code {code} with message {response.content}"
                )

            data = response.json()

            logging.info(f"Data for page {next_page}:")
            items.extend(data["items"])
            next_urls = list(filter(lambda item: item["rel"] == "next", data["links"]))
            next_page = next_urls[0]["href"] if next_urls else None
            time.sleep(10)

        return items

    def _pull_data(self) -> pd.DataFrame:
        """
        Pulls data from the PJM API and parses the return.

        Returns:
            Raw form of data.
        """
        start_date = self.current_date - timedelta(self.days)
        start_date = start_date.replace(hour=0, minute=0)
        end_date = (start_date + timedelta(days=self.days)).replace(hour=23)
        start_date_str = start_date.strftime(self.query_datetime_format)
        end_date_str = end_date.strftime(self.query_datetime_format)

        if self.load_type == "day_ahead":
            url_suffix = "da_hrl_lmps"
        else:
            url_suffix = "rt_hrl_lmps"

        data = self._fetch_paginated_data(url_suffix, start_date_str, end_date_str)

        df = pd.DataFrame(data)
        logging.info(f"Data fetched successfully: {len(df)} rows")

        return df

    def _prepare_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Creates a new date time column and removes null values. Renames columns

        Args:
            df: Raw form of data received from the API.

        Returns:
            Data after basic transformations.

        """

        if self.load_type == "day_ahead":
            df = df.rename(
                columns={
                    "datetime_beginning_utc": "StartTime",
                    "pnode_id": "PnodeId",
                    "pnode_name": "PnodeName",
                    "voltage": "Voltage",
                    "equipment": "Equipment",
                    "type": "Type",
                    "zone": "Zone",
                    "system_energy_price_da": "SystemEnergyPrice",
                    "total_lmp_da": "TotalLmp",
                    "congestion_price_da": "CongestionPrice",
                    "marginal_loss_price_da": "MarginalLossPrice",
                    "version_nbr": "VersionNbr",
                }
            )
        else:
            df = df.rename(
                columns={
                    "datetime_beginning_utc": "StartTime",
                    "pnode_id": "PnodeId",
                    "pnode_name": "PnodeName",
                    "voltage": "Voltage",
                    "equipment": "Equipment",
                    "type": "Type",
                    "zone": "Zone",
                    "system_energy_price_rt": "SystemEnergyPrice",
                    "total_lmp_rt": "TotalLmp",
                    "congestion_price_rt": "CongestionPrice",
                    "marginal_loss_price_rt": "MarginalLossPrice",
                    "version_nbr": "VersionNbr",
                }
            )

        df = df[
            [
                "StartTime",
                "PnodeId",
                "PnodeName",
                "Voltage",
                "Equipment",
                "Type",
                "Zone",
                "SystemEnergyPrice",
                "TotalLmp",
                "CongestionPrice",
                "MarginalLossPrice",
                "VersionNbr",
            ]
        ]

        df = df.replace({np.nan: None, "": None})

        df["StartTime"] = pd.to_datetime(df["StartTime"])
        df = df.replace({np.nan: None, "": None})

        df.reset_index(inplace=True, drop=True)

        return df

    def _validate_options(self) -> bool:
        """
        Validates the following options:
            - `load_type` must be valid.

        Returns:
            True if all looks good otherwise raises Exception.
        """

        valid_load_types = ["real_time", "day_ahead"]

        if self.load_type not in valid_load_types:
            raise ValueError(
                f"Invalid load_type `{self.load_type}` given. Supported values are {valid_load_types}."
            )

        return True

PJMHistoricalPricingISOSource

Bases: PJMDailyPricingISOSource

The PJM Historical Pricing ISO Source is used to retrieve historical Real-Time and Day-Ahead hourly data from the PJM API.

API: https://api.pjm.com/api/v1/ (must be a valid apy key from PJM)

Real-Time doc: https://dataminer2.pjm.com/feed/rt_hrl_lmps/definition

Day-Ahead doc: https://dataminer2.pjm.com/feed/da_hrl_lmps/definition

The PJM Historical Pricing ISO Source accesses the same PJM endpoints as the daily pricing source but is tailored for retrieving data within a specified historical range defined by the start_date and end_date attributes.

Example

from rtdip_sdk.pipelines.sources import PJMHistoricalPricingISOSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility

# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()

pjm_source = PJMHistoricalPricingISOSource(
    spark=spark,
    options={
        "api_key": "{api_key}",
        "start_date": "2023-05-10",
        "end_date": "2023-05-20",
    }
)

pjm_source.read_batch()

Parameters:

Name Type Description Default
spark SparkSession

The Spark Session instance.

required
options dict

A dictionary of ISO Source specific configurations.

required

Attributes:

Name Type Description
api_key str

A valid key from PJM required for authentication.

load_type str

The type of data to retrieve, either real_time or day_ahead.

start_date str

Must be in YYYY-MM-DD format.

end_date str

Must be in YYYY-MM-DD format.

Please refer to the BaseISOSource for available methods and further details.

BaseISOSource: ::: src.sdk.python.rtdip_sdk.pipelines.sources.spark.iso.base_iso

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/pjm_historical_pricing_iso.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
class PJMHistoricalPricingISOSource(PJMDailyPricingISOSource):
    """
    The PJM Historical Pricing ISO Source is used to retrieve historical Real-Time and Day-Ahead hourly data from the PJM API.

    API:             <a href="https://api.pjm.com/api/v1/">https://api.pjm.com/api/v1/</a>  (must be a valid apy key from PJM)

    Real-Time doc:    <a href="https://dataminer2.pjm.com/feed/rt_hrl_lmps/definition">https://dataminer2.pjm.com/feed/rt_hrl_lmps/definition</a>

    Day-Ahead doc:    <a href="https://dataminer2.pjm.com/feed/da_hrl_lmps/definition">https://dataminer2.pjm.com/feed/da_hrl_lmps/definition</a>

    The PJM Historical Pricing ISO Source accesses the same PJM endpoints as the daily pricing source but is tailored for retrieving data within a specified historical range defined by the `start_date` and `end_date` attributes.

    Example
    --------
    ```python
    from rtdip_sdk.pipelines.sources import PJMHistoricalPricingISOSource
    from rtdip_sdk.pipelines.utilities import SparkSessionUtility

    # Not required if using Databricks
    spark = SparkSessionUtility(config={}).execute()

    pjm_source = PJMHistoricalPricingISOSource(
        spark=spark,
        options={
            "api_key": "{api_key}",
            "start_date": "2023-05-10",
            "end_date": "2023-05-20",
        }
    )

    pjm_source.read_batch()
    ```

    Parameters:
        spark (SparkSession): The Spark Session instance.
        options (dict): A dictionary of ISO Source specific configurations.

    Attributes:
        api_key (str): A valid key from PJM required for authentication.
        load_type (str): The type of data to retrieve, either `real_time` or `day_ahead`.
        start_date (str): Must be in `YYYY-MM-DD` format.
        end_date (str): Must be in `YYYY-MM-DD` format.

    Please refer to the BaseISOSource for available methods and further details.

    BaseISOSource: ::: src.sdk.python.rtdip_sdk.pipelines.sources.spark.iso.base_iso"""

    spark: SparkSession
    options: dict
    required_options = ["api_key", "load_type", "start_date", "end_date"]

    def __init__(self, spark: SparkSession, options: dict) -> None:
        super().__init__(spark, options)
        self.spark: SparkSession = spark
        self.options: dict = options
        self.start_date: str = self.options.get("start_date", "")
        self.end_date: str = self.options.get("end_date", "")
        self.user_datetime_format = "%Y-%m-%d"

    def _pull_data(self) -> pd.DataFrame:
        """
        Pulls historical pricing data from the PJM API within the specified date range.

        Returns:
            pd.DataFrame: A DataFrame containing the raw historical pricing data retrieved from the PJM API.
        """

        logging.info(
            f"Historical data requested from {self.start_date} to {self.end_date}"
        )

        start_date_str = datetime.strptime(
            self.start_date, self.user_datetime_format
        ).replace(hour=0, minute=0)
        end_date_str = datetime.strptime(
            self.end_date, self.user_datetime_format
        ).replace(hour=23)

        if self.load_type == "day_ahead":
            url_suffix = "da_hrl_lmps"
        else:
            url_suffix = "rt_hrl_lmps"

        data = self._fetch_paginated_data(url_suffix, start_date_str, end_date_str)

        df = pd.DataFrame(data)
        logging.info(f"Data fetched successfully: {len(df)} rows")

        return df

    def _validate_options(self) -> bool:
        """
        Validates all parameters including the following examples:
            - `start_date` & `end_data` must be in the correct format.
            - `start_date` must be behind `end_data`.
            - `start_date` must not be in the future (UTC).

        Returns:
            True if all looks good otherwise raises Exception.

        """
        super()._validate_options()
        try:
            start_date = datetime.strptime(self.start_date, self.user_datetime_format)
        except ValueError:
            raise ValueError(
                f"Unable to parse Start date. Please specify in {self.user_datetime_format} format."
            )

        try:
            end_date = datetime.strptime(self.end_date, self.user_datetime_format)
        except ValueError:
            raise ValueError(
                f"Unable to parse End date. Please specify in {self.user_datetime_format} format."
            )

        if start_date > datetime.now(timezone.utc).replace(tzinfo=None) - timedelta(
            days=1
        ):
            raise ValueError("Start date can't be in future.")

        if start_date > end_date:
            raise ValueError("Start date can't be ahead of End date.")

        if end_date > datetime.now(timezone.utc).replace(tzinfo=None) - timedelta(
            days=1
        ):
            raise ValueError("End date can't be in future.")

        return True

PJMHistoricalLoadISOSource

Bases: PJMDailyLoadISOSource

The PJM Historical Load ISO Source is used to read historical load data from PJM API.

To read more about the reports, visit the following URLs -
Actual doc: ops_sum_prev_period
Forecast doc: load_frcstd_7_day

Historical is the same PJM endpoint as Actual, but is called repeatedly within a range established by the start_date & end_date attributes

Example

from rtdip_sdk.pipelines.sources import PJMHistoricalLoadISOSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility

# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()

pjm_source = PJMHistoricalLoadISOSource(
    spark=spark,
    options={
        "api_key": "{api_key}",
        "start_date": "20230510",
        "end_date": "20230520",
    }
)

pjm_source.read_batch()

Parameters:

Name Type Description Default
spark SparkSession

Spark Session instance

required
options dict

A dictionary of ISO Source specific configurations (See Attributes table below)

required

Attributes:

Name Type Description
api_key str

Must be a valid key from PJM, see PJM documentation

start_date str

Must be in YYYY-MM-DD format.

end_date str

Must be in YYYY-MM-DD format.

query_batch_days int

(optional) Number of days must be < 160 as per PJM & is defaulted to 120

sleep_duration int

(optional) Number of seconds to sleep between request, defaulted to 5 seconds, used to manage requests to PJM endpoint

request_count int

(optional) Number of requests made to PJM endpoint before sleep_duration, currently defaulted to 1

Please check the BaseISOSource for available methods.

BaseISOSource

BaseISOSource

Bases: SourceInterface

Base class for all the ISO Sources. It provides common functionality and helps in reducing the code redundancy.

Parameters:

Name Type Description Default
spark SparkSession

Spark Session instance

required
options dict

A dictionary of ISO Source specific configurations

required
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
class BaseISOSource(SourceInterface):
    """
    Base class for all the ISO Sources. It provides common functionality and helps in reducing the code redundancy.

    Parameters:
        spark (SparkSession): Spark Session instance
        options (dict): A dictionary of ISO Source specific configurations
    """

    spark: SparkSession
    options: dict
    iso_url: str = "https://"
    query_datetime_format: str = "%Y%m%d"
    required_options: list = []
    spark_schema = StructType([StructField("id", IntegerType(), True)])
    default_query_timezone: str = "UTC"

    def __init__(self, spark: SparkSession, options: dict) -> None:
        self.spark = spark
        self.options = options
        self.query_timezone = pytz.timezone(
            self.options.get("query_timezone", self.default_query_timezone)
        )
        self.current_date = datetime.now(timezone.utc).astimezone(self.query_timezone)

    def _fetch_from_url(self, url_suffix: str) -> bytes:
        """
        Gets data from external ISO API.

        Args:
            url_suffix: String to be used as suffix to iso url.

        Returns:
            Raw content of the data received.

        """
        url = f"{self.iso_url}{url_suffix}"
        logging.info(f"Requesting URL - {url}")

        response = requests.get(url)
        code = response.status_code

        if code != 200:
            raise HTTPError(
                f"Unable to access URL `{url}`."
                f" Received status code {code} with message {response.content}"
            )

        return response.content

    def _get_localized_datetime(self, datetime_str: str) -> datetime:
        """
        Converts string datetime into Python datetime object with configured format and timezone.
        Args:
            datetime_str: String to be converted into datetime.

        Returns: Timezone aware datetime object.

        """
        parsed_dt = datetime.strptime(datetime_str, self.query_datetime_format)
        parsed_dt = parsed_dt.replace(tzinfo=self.query_timezone)
        return parsed_dt

    def _pull_data(self) -> pd.DataFrame:
        """
        Hits the fetch_from_url method with certain parameters to get raw data from API.

        All the children ISO classes must override this method and call the fetch_url method
        in it.

        Returns:
             Raw DataFrame from API.
        """

        return pd.read_csv(BytesIO(self._fetch_from_url("")))

    def _prepare_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Performs all the basic transformations to prepare data for further processing.
        All the children ISO classes must override this method.

        Args:
            df: Raw DataFrame, received from the API.

        Returns:
             Modified DataFrame, ready for basic use.

        """
        return df

    def _sanitize_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Another data transformation helper method to be called after prepare data.
        Used for advance data processing such as cleaning, filtering, restructuring.
        All the children ISO classes must override this method if there is any post-processing required.

        Args:
            df: Initial modified version of DataFrame, received after preparing the data.

        Returns:
             Final version of data after all the fixes and modifications.

        """
        return df

    def _get_data(self) -> pd.DataFrame:
        """
        Entrypoint method to return the final version of DataFrame.

        Returns:
            Modified form of data for specific use case.

        """
        df = self._pull_data()
        df = self._prepare_data(df)
        df = self._sanitize_data(df)

        # Reorder columns to keep the data consistent
        df = df[self.spark_schema.names]

        return df

    @staticmethod
    def system_type():
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        libraries = Libraries()
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def _validate_options(self) -> bool:
        """
        Performs all the options checks. Raises exception in case of any invalid value.
        Returns:
             True if all checks are passed.

        """
        return True

    def pre_read_validation(self) -> bool:
        """
        Ensures all the required options are provided and performs other validations.
        Returns:
             True if all checks are passed.

        """
        for key in self.required_options:
            if key not in self.options:
                raise ValueError(f"Required option `{key}` is missing.")

        return self._validate_options()

    def post_read_validation(self) -> bool:
        return True

    def read_batch(self) -> DataFrame:
        """
        Spark entrypoint, It executes the entire process of pulling, transforming & fixing data.
        Returns:
             Final Spark DataFrame converted from Pandas DataFrame post-execution.

        """

        try:
            self.pre_read_validation()
            pdf = self._get_data()
            pdf = _prepare_pandas_to_convert_to_spark(pdf)

            # The below is to fix the compatibility issues between Pandas 2.0 and PySpark.
            pd.DataFrame.iteritems = pd.DataFrame.items
            df = self.spark.createDataFrame(data=pdf, schema=self.spark_schema)
            return df

        except Exception as e:
            logging.exception(str(e))
            raise e

    def read_stream(self) -> DataFrame:
        """
        By default, the streaming operation is not supported but child classes can override if ISO supports streaming.

        Returns:
             Final Spark DataFrame after all the processing.

        """

        raise NotImplementedError(
            f"{self.__class__.__name__} connector doesn't support stream operation."
        )

pre_read_validation()

Ensures all the required options are provided and performs other validations. Returns: True if all checks are passed.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
175
176
177
178
179
180
181
182
183
184
185
186
def pre_read_validation(self) -> bool:
    """
    Ensures all the required options are provided and performs other validations.
    Returns:
         True if all checks are passed.

    """
    for key in self.required_options:
        if key not in self.options:
            raise ValueError(f"Required option `{key}` is missing.")

    return self._validate_options()

read_batch()

Spark entrypoint, It executes the entire process of pulling, transforming & fixing data. Returns: Final Spark DataFrame converted from Pandas DataFrame post-execution.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
def read_batch(self) -> DataFrame:
    """
    Spark entrypoint, It executes the entire process of pulling, transforming & fixing data.
    Returns:
         Final Spark DataFrame converted from Pandas DataFrame post-execution.

    """

    try:
        self.pre_read_validation()
        pdf = self._get_data()
        pdf = _prepare_pandas_to_convert_to_spark(pdf)

        # The below is to fix the compatibility issues between Pandas 2.0 and PySpark.
        pd.DataFrame.iteritems = pd.DataFrame.items
        df = self.spark.createDataFrame(data=pdf, schema=self.spark_schema)
        return df

    except Exception as e:
        logging.exception(str(e))
        raise e

read_stream()

By default, the streaming operation is not supported but child classes can override if ISO supports streaming.

Returns:

Type Description
DataFrame

Final Spark DataFrame after all the processing.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
213
214
215
216
217
218
219
220
221
222
223
224
def read_stream(self) -> DataFrame:
    """
    By default, the streaming operation is not supported but child classes can override if ISO supports streaming.

    Returns:
         Final Spark DataFrame after all the processing.

    """

    raise NotImplementedError(
        f"{self.__class__.__name__} connector doesn't support stream operation."
    )
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/pjm_historical_load_iso.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
class PJMHistoricalLoadISOSource(PJMDailyLoadISOSource):
    """
    The PJM Historical Load ISO Source is used to read historical load data from PJM API.

    To read more about the reports, visit the following URLs -
    <br>
    Actual doc:    [ops_sum_prev_period](https://dataminer2.pjm.com/feed/ops_sum_prev_period/definition)
    <br>
    Forecast doc:  [load_frcstd_7_day](https://dataminer2.pjm.com/feed/load_frcstd_7_day/definition)

    Historical is the same PJM endpoint as Actual, but is called repeatedly within a range established by the
    start_date & end_date attributes

    Example
    --------
    ```python
    from rtdip_sdk.pipelines.sources import PJMHistoricalLoadISOSource
    from rtdip_sdk.pipelines.utilities import SparkSessionUtility

    # Not required if using Databricks
    spark = SparkSessionUtility(config={}).execute()

    pjm_source = PJMHistoricalLoadISOSource(
        spark=spark,
        options={
            "api_key": "{api_key}",
            "start_date": "20230510",
            "end_date": "20230520",
        }
    )

    pjm_source.read_batch()
    ```

    Parameters:
        spark (SparkSession): Spark Session instance
        options (dict): A dictionary of ISO Source specific configurations (See Attributes table below)

    Attributes:
        api_key (str): Must be a valid key from PJM, see PJM documentation
        start_date (str): Must be in `YYYY-MM-DD` format.
        end_date (str): Must be in `YYYY-MM-DD` format.

        query_batch_days (int): (optional) Number of days must be < 160 as per PJM & is defaulted to `120`
        sleep_duration (int): (optional) Number of seconds to sleep between request, defaulted to `5` seconds, used to manage requests to PJM endpoint
        request_count (int): (optional) Number of requests made to PJM endpoint before sleep_duration, currently defaulted to `1`

    Please check the BaseISOSource for available methods.

    BaseISOSource:
        ::: src.sdk.python.rtdip_sdk.pipelines.sources.spark.iso.base_iso"""

    spark: SparkSession
    options: dict
    required_options = ["api_key", "start_date", "end_date"]

    def __init__(self, spark: SparkSession, options: dict) -> None:
        super().__init__(spark, options)
        self.spark: SparkSession = spark
        self.options: dict = options
        self.api_key: str = self.options.get("api_key", "").strip()
        self.start_date: str = self.options.get("start_date", "")
        self.end_date: str = self.options.get("end_date", "")
        self.query_batch_days: int = self.options.get("query_batch_days", 120)
        self.sleep_duration: int = self.options.get("sleep_duration", 5)
        self.request_count: int = self.options.get("request_count", 1)
        self.load_type: str = "actual"
        self.user_datetime_format = "%Y-%m-%d"

    def _pull_data(self) -> pd.DataFrame:
        """
        Pulls data from the PJM API and parses the return including date ranges.

        Returns:
            Raw form of data.
        """

        logging.info(
            f"Historical load requested from {self.start_date} to {self.end_date}"
        )
        start_date = datetime.strptime(self.start_date, self.user_datetime_format)
        end_date = datetime.strptime(self.end_date, self.user_datetime_format).replace(
            hour=23
        )

        days_diff = (end_date - start_date).days
        logging.info(f"Expected hours for a single zone = {(days_diff + 1) * 24}")
        generated_days_ranges = []
        dates = pd.date_range(
            start_date, end_date, freq=pd.DateOffset(days=self.query_batch_days)
        )

        for date in dates:
            py_date = date.to_pydatetime()
            date_last = (py_date + timedelta(days=self.query_batch_days - 1)).replace(
                hour=23
            )