PJM Daily Pricing Load

PJMDailyPricingISOSource

Bases: BaseISOSource

The PJM Daily Pricing ISO Source is used to retrieve Real-Time and Day-Ahead hourly data from PJM API. Real-Time will return data for T - 3 to T days and Day-Ahead will return T - 3 to T + 1 days data.

API: https://api.pjm.com/api/v1/ (must be a valid apy key from PJM)

Real-Time doc: https://dataminer2.pjm.com/feed/rt_hrl_lmps/definition

Day-Ahead doc: https://dataminer2.pjm.com/feed/da_hrl_lmps/definition

Example

from rtdip_sdk.pipelines.sources import PJMDailyPricingISOSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility

# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()

pjm_source = PJMDailyPricingISOSource(
    spark=spark,
    options={
        "api_key": "{api_key}",
        "load_type": "real_time"
    }
)

pjm_source.read_batch()

Parameters:

Name Type Description Default
spark SparkSession

Spark Session instance

required
options dict

A dictionary of ISO Source specific configurations (See Attributes table below)

required

Attributes:

Name Type Description
api_key str

Must be a valid key from PJM, see api url

load_type str

Must be one of real_time or day_ahead

Please check the BaseISOSource for available methods.

BaseISOSource

BaseISOSource

Bases: SourceInterface

Base class for all the ISO Sources. It provides common functionality and helps in reducing the code redundancy.

Parameters:

Name Type Description Default
spark SparkSession

Spark Session instance

required
options dict

A dictionary of ISO Source specific configurations

required
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
class BaseISOSource(SourceInterface):
    """
    Base class for all the ISO Sources. It provides common functionality and helps in reducing the code redundancy.

    Parameters:
        spark (SparkSession): Spark Session instance
        options (dict): A dictionary of ISO Source specific configurations
    """

    spark: SparkSession
    options: dict
    iso_url: str = "https://"
    query_datetime_format: str = "%Y%m%d"
    required_options: list = []
    spark_schema = StructType([StructField("id", IntegerType(), True)])
    default_query_timezone: str = "UTC"

    def __init__(self, spark: SparkSession, options: dict) -> None:
        self.spark = spark
        self.options = options
        self.query_timezone = pytz.timezone(
            self.options.get("query_timezone", self.default_query_timezone)
        )
        self.current_date = datetime.now(timezone.utc).astimezone(self.query_timezone)

    def _fetch_from_url(self, url_suffix: str) -> bytes:
        """
        Gets data from external ISO API.

        Args:
            url_suffix: String to be used as suffix to iso url.

        Returns:
            Raw content of the data received.

        """
        url = f"{self.iso_url}{url_suffix}"
        logging.info(f"Requesting URL - {url}")

        response = requests.get(url)
        code = response.status_code

        if code != 200:
            raise HTTPError(
                f"Unable to access URL `{url}`."
                f" Received status code {code} with message {response.content}"
            )

        return response.content

    def _get_localized_datetime(self, datetime_str: str) -> datetime:
        """
        Converts string datetime into Python datetime object with configured format and timezone.
        Args:
            datetime_str: String to be converted into datetime.

        Returns: Timezone aware datetime object.

        """
        parsed_dt = datetime.strptime(datetime_str, self.query_datetime_format)
        parsed_dt = parsed_dt.replace(tzinfo=self.query_timezone)
        return parsed_dt

    def _pull_data(self) -> pd.DataFrame:
        """
        Hits the fetch_from_url method with certain parameters to get raw data from API.

        All the children ISO classes must override this method and call the fetch_url method
        in it.

        Returns:
             Raw DataFrame from API.
        """

        return pd.read_csv(BytesIO(self._fetch_from_url("")))

    def _prepare_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Performs all the basic transformations to prepare data for further processing.
        All the children ISO classes must override this method.

        Args:
            df: Raw DataFrame, received from the API.

        Returns:
             Modified DataFrame, ready for basic use.

        """
        return df

    def _sanitize_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Another data transformation helper method to be called after prepare data.
        Used for advance data processing such as cleaning, filtering, restructuring.
        All the children ISO classes must override this method if there is any post-processing required.

        Args:
            df: Initial modified version of DataFrame, received after preparing the data.

        Returns:
             Final version of data after all the fixes and modifications.

        """
        return df

    def _get_data(self) -> pd.DataFrame:
        """
        Entrypoint method to return the final version of DataFrame.

        Returns:
            Modified form of data for specific use case.

        """
        df = self._pull_data()
        df = self._prepare_data(df)
        df = self._sanitize_data(df)

        # Reorder columns to keep the data consistent
        df = df[self.spark_schema.names]

        return df

    @staticmethod
    def system_type():
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        libraries = Libraries()
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def _validate_options(self) -> bool:
        """
        Performs all the options checks. Raises exception in case of any invalid value.
        Returns:
             True if all checks are passed.

        """
        return True

    def pre_read_validation(self) -> bool:
        """
        Ensures all the required options are provided and performs other validations.
        Returns:
             True if all checks are passed.

        """
        for key in self.required_options:
            if key not in self.options:
                raise ValueError(f"Required option `{key}` is missing.")

        return self._validate_options()

    def post_read_validation(self) -> bool:
        return True

    def read_batch(self) -> DataFrame:
        """
        Spark entrypoint, It executes the entire process of pulling, transforming & fixing data.
        Returns:
             Final Spark DataFrame converted from Pandas DataFrame post-execution.

        """

        try:
            self.pre_read_validation()
            pdf = self._get_data()
            pdf = _prepare_pandas_to_convert_to_spark(pdf)

            # The below is to fix the compatibility issues between Pandas 2.0 and PySpark.
            pd.DataFrame.iteritems = pd.DataFrame.items
            df = self.spark.createDataFrame(data=pdf, schema=self.spark_schema)
            return df

        except Exception as e:
            logging.exception(str(e))
            raise e

    def read_stream(self) -> DataFrame:
        """
        By default, the streaming operation is not supported but child classes can override if ISO supports streaming.

        Returns:
             Final Spark DataFrame after all the processing.

        """

        raise NotImplementedError(
            f"{self.__class__.__name__} connector doesn't support stream operation."
        )

pre_read_validation()

Ensures all the required options are provided and performs other validations. Returns: True if all checks are passed.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
175
176
177
178
179
180
181
182
183
184
185
186
def pre_read_validation(self) -> bool:
    """
    Ensures all the required options are provided and performs other validations.
    Returns:
         True if all checks are passed.

    """
    for key in self.required_options:
        if key not in self.options:
            raise ValueError(f"Required option `{key}` is missing.")

    return self._validate_options()

read_batch()

Spark entrypoint, It executes the entire process of pulling, transforming & fixing data. Returns: Final Spark DataFrame converted from Pandas DataFrame post-execution.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
def read_batch(self) -> DataFrame:
    """
    Spark entrypoint, It executes the entire process of pulling, transforming & fixing data.
    Returns:
         Final Spark DataFrame converted from Pandas DataFrame post-execution.

    """

    try:
        self.pre_read_validation()
        pdf = self._get_data()
        pdf = _prepare_pandas_to_convert_to_spark(pdf)

        # The below is to fix the compatibility issues between Pandas 2.0 and PySpark.
        pd.DataFrame.iteritems = pd.DataFrame.items
        df = self.spark.createDataFrame(data=pdf, schema=self.spark_schema)
        return df

    except Exception as e:
        logging.exception(str(e))
        raise e

read_stream()

By default, the streaming operation is not supported but child classes can override if ISO supports streaming.

Returns:

Type Description
DataFrame

Final Spark DataFrame after all the processing.

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
213
214
215
216
217
218
219
220
221
222
223
224
def read_stream(self) -> DataFrame:
    """
    By default, the streaming operation is not supported but child classes can override if ISO supports streaming.

    Returns:
         Final Spark DataFrame after all the processing.

    """

    raise NotImplementedError(
        f"{self.__class__.__name__} connector doesn't support stream operation."
    )
Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/pjm_daily_pricing_iso.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
class PJMDailyPricingISOSource(BaseISOSource):
    """
    The PJM Daily Pricing ISO Source is used to retrieve Real-Time and Day-Ahead hourly data from PJM API.
    Real-Time will return data for T - 3 to T days and Day-Ahead will return T - 3 to T + 1 days data.

    API:             <a href="https://api.pjm.com/api/v1/">https://api.pjm.com/api/v1/</a>  (must be a valid apy key from PJM)

    Real-Time doc:    <a href="https://dataminer2.pjm.com/feed/rt_hrl_lmps/definition">https://dataminer2.pjm.com/feed/rt_hrl_lmps/definition</a>

    Day-Ahead doc:    <a href="https://dataminer2.pjm.com/feed/da_hrl_lmps/definition">https://dataminer2.pjm.com/feed/da_hrl_lmps/definition</a>

    Example
    --------
    ```python
    from rtdip_sdk.pipelines.sources import PJMDailyPricingISOSource
    from rtdip_sdk.pipelines.utilities import SparkSessionUtility

    # Not required if using Databricks
    spark = SparkSessionUtility(config={}).execute()

    pjm_source = PJMDailyPricingISOSource(
        spark=spark,
        options={
            "api_key": "{api_key}",
            "load_type": "real_time"
        }
    )

    pjm_source.read_batch()
    ```

    Parameters:
       spark (SparkSession): Spark Session instance
       options (dict): A dictionary of ISO Source specific configurations (See Attributes table below)

    Attributes:
        api_key (str): Must be a valid key from PJM, see api url
        load_type (str): Must be one of `real_time` or `day_ahead`

    Please check the BaseISOSource for available methods.

    BaseISOSource:
        ::: src.sdk.python.rtdip_sdk.pipelines.sources.spark.iso.base_iso
    """

    spark: SparkSession
    spark_schema = PJM_PRICING_SCHEMA
    options: dict
    iso_url: str = "https://api.pjm.com/api/v1/"
    query_datetime_format: str = "%Y-%m-%d %H:%M"
    required_options = ["api_key", "load_type"]
    default_query_timezone = "US/Eastern"

    def __init__(self, spark: SparkSession, options: dict) -> None:
        super().__init__(spark, options)
        self.spark: SparkSession = spark
        self.options: dict = options
        self.load_type: str = self.options.get("load_type", "").strip()
        self.api_key: str = self.options.get("api_key", "").strip()
        self.days: int = self.options.get("days", 3)

    def _fetch_paginated_data(
        self, url_suffix: str, start_date: str, end_date: str
    ) -> bytes:
        """
        Fetches data from the PJM API with pagination support.

        Args:
            url_suffix: String to be used as suffix to ISO URL.
            start_date: Start date for the data retrieval.
            end_date: End date for the data retrieval.

        Returns:
            Raw content of the data received.
        """
        headers = {"Ocp-Apim-Subscription-Key": self.api_key}
        items = []
        query = {
            "startRow": "1",
            "rowCount": "5",
            "datetime_beginning_ept": f"{start_date}to{end_date}",
        }
        query_s = "&".join(["=".join([k, v]) for k, v in query.items()])
        base_url = f"{self.iso_url}{url_suffix}?{query_s}"

        next_page = base_url

        logging.info(
            f"Requesting URL - {base_url}, start_date={start_date}, end_date={end_date}, load_type={self.load_type}"
        )

        while next_page:
            now = datetime.now()
            logging.info(f"Timestamp: {now}")
            response = requests.get(next_page, headers=headers)
            code = response.status_code

            if code != 200:
                raise requests.HTTPError(
                    f"Unable to access URL `{next_page}`."
                    f" Received status code {code} with message {response.content}"
                )

            data = response.json()

            logging.info(f"Data for page {next_page}:")
            items.extend(data["items"])
            next_urls = list(filter(lambda item: item["rel"] == "next", data["links"]))
            next_page = next_urls[0]["href"] if next_urls else None
            time.sleep(10)

        return items

    def _pull_data(self) -> pd.DataFrame:
        """
        Pulls data from the PJM API and parses the return.

        Returns:
            Raw form of data.
        """
        start_date = self.current_date - timedelta(self.days)
        start_date = start_date.replace(hour=0, minute=0)
        end_date = (start_date + timedelta(days=self.days)).replace(hour=23)
        start_date_str = start_date.strftime(self.query_datetime_format)
        end_date_str = end_date.strftime(self.query_datetime_format)

        if self.load_type == "day_ahead":
            url_suffix = "da_hrl_lmps"
        else:
            url_suffix = "rt_hrl_lmps"

        data = self._fetch_paginated_data(url_suffix, start_date_str, end_date_str)

        df = pd.DataFrame(data)
        logging.info(f"Data fetched successfully: {len(df)} rows")

        return df

    def _prepare_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Creates a new date time column and removes null values. Renames columns

        Args:
            df: Raw form of data received from the API.

        Returns:
            Data after basic transformations.

        """

        if self.load_type == "day_ahead":
            df = df.rename(
                columns={
                    "datetime_beginning_utc": "StartTime",
                    "pnode_id": "PnodeId",
                    "pnode_name": "PnodeName",
                    "voltage": "Voltage",
                    "equipment": "Equipment",
                    "type": "Type",
                    "zone": "Zone",
                    "system_energy_price_da": "SystemEnergyPrice",
                    "total_lmp_da": "TotalLmp",
                    "congestion_price_da": "CongestionPrice",
                    "marginal_loss_price_da": "MarginalLossPrice",
                    "version_nbr": "VersionNbr",
                }
            )
        else:
            df = df.rename(
                columns={
                    "datetime_beginning_utc": "StartTime",
                    "pnode_id": "PnodeId",
                    "pnode_name": "PnodeName",
                    "voltage": "Voltage",
                    "equipment": "Equipment",
                    "type": "Type",
                    "zone": "Zone",
                    "system_energy_price_rt": "SystemEnergyPrice",
                    "total_lmp_rt": "TotalLmp",
                    "congestion_price_rt": "CongestionPrice",
                    "marginal_loss_price_rt": "MarginalLossPrice",
                    "version_nbr": "VersionNbr",
                }
            )

        df = df[
            [
                "StartTime",
                "PnodeId",
                "PnodeName",
                "Voltage",
                "Equipment",
                "Type",
                "Zone",
                "SystemEnergyPrice",
                "TotalLmp",
                "CongestionPrice",
                "MarginalLossPrice",
                "VersionNbr",
            ]
        ]

        df = df.replace({np.nan: None, "": None})

        df["StartTime"] = pd.to_datetime(df["StartTime"])
        df = df.replace({np.nan: None, "": None})

        df.reset_index(inplace=True, drop=True)

        return df

    def _validate_options(self) -> bool:
        """
        Validates the following options:
            - `load_type` must be valid.

        Returns:
            True if all looks good otherwise raises Exception.
        """

        valid_load_types = ["real_time", "day_ahead"]

        if self.load_type not in valid_load_types:
            raise ValueError(
                f"Invalid load_type `{self.load_type}` given. Supported values are {valid_load_types}."
            )

        return True