Skip to content

PJM Historical Pricing Load

PJMHistoricalPricingISOSource

Bases: PJMDailyPricingISOSource

The PJM Historical Pricing ISO Source is used to retrieve historical Real-Time and Day-Ahead hourly data from the PJM API.

API: https://api.pjm.com/api/v1/ (must be a valid apy key from PJM)

Real-Time doc: https://dataminer2.pjm.com/feed/rt_hrl_lmps/definition

Day-Ahead doc: https://dataminer2.pjm.com/feed/da_hrl_lmps/definition

The PJM Historical Pricing ISO Source accesses the same PJM endpoints as the daily pricing source but is tailored for retrieving data within a specified historical range defined by the start_date and end_date attributes.

Example

from rtdip_sdk.pipelines.sources import PJMHistoricalPricingISOSource
from rtdip_sdk.pipelines.utilities import SparkSessionUtility

# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()

pjm_source = PJMHistoricalPricingISOSource(
    spark=spark,
    options={
        "api_key": "{api_key}",
        "start_date": "2023-05-10",
        "end_date": "2023-05-20",
    }
)

pjm_source.read_batch()

Parameters:

Name Type Description Default
spark SparkSession

The Spark Session instance.

required
options dict

A dictionary of ISO Source specific configurations.

required

Attributes:

Name Type Description
api_key str

A valid key from PJM required for authentication.

load_type str

The type of data to retrieve, either real_time or day_ahead.

start_date str

Must be in YYYY-MM-DD format.

end_date str

Must be in YYYY-MM-DD format.

Please refer to the BaseISOSource for available methods and further details.

BaseISOSource: ::: src.sdk.python.rtdip_sdk.pipelines.sources.spark.iso.base_iso

Source code in src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/pjm_historical_pricing_iso.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
class PJMHistoricalPricingISOSource(PJMDailyPricingISOSource):
    """
    The PJM Historical Pricing ISO Source is used to retrieve historical Real-Time and Day-Ahead hourly data from the PJM API.

    API:             <a href="https://api.pjm.com/api/v1/">https://api.pjm.com/api/v1/</a>  (must be a valid apy key from PJM)

    Real-Time doc:    <a href="https://dataminer2.pjm.com/feed/rt_hrl_lmps/definition">https://dataminer2.pjm.com/feed/rt_hrl_lmps/definition</a>

    Day-Ahead doc:    <a href="https://dataminer2.pjm.com/feed/da_hrl_lmps/definition">https://dataminer2.pjm.com/feed/da_hrl_lmps/definition</a>

    The PJM Historical Pricing ISO Source accesses the same PJM endpoints as the daily pricing source but is tailored for retrieving data within a specified historical range defined by the `start_date` and `end_date` attributes.

    Example
    --------
    ```python
    from rtdip_sdk.pipelines.sources import PJMHistoricalPricingISOSource
    from rtdip_sdk.pipelines.utilities import SparkSessionUtility

    # Not required if using Databricks
    spark = SparkSessionUtility(config={}).execute()

    pjm_source = PJMHistoricalPricingISOSource(
        spark=spark,
        options={
            "api_key": "{api_key}",
            "start_date": "2023-05-10",
            "end_date": "2023-05-20",
        }
    )

    pjm_source.read_batch()
    ```

    Parameters:
        spark (SparkSession): The Spark Session instance.
        options (dict): A dictionary of ISO Source specific configurations.

    Attributes:
        api_key (str): A valid key from PJM required for authentication.
        load_type (str): The type of data to retrieve, either `real_time` or `day_ahead`.
        start_date (str): Must be in `YYYY-MM-DD` format.
        end_date (str): Must be in `YYYY-MM-DD` format.

    Please refer to the BaseISOSource for available methods and further details.

    BaseISOSource: ::: src.sdk.python.rtdip_sdk.pipelines.sources.spark.iso.base_iso"""

    spark: SparkSession
    options: dict
    required_options = ["api_key", "load_type", "start_date", "end_date"]

    def __init__(self, spark: SparkSession, options: dict) -> None:
        super().__init__(spark, options)
        self.spark: SparkSession = spark
        self.options: dict = options
        self.start_date: str = self.options.get("start_date", "")
        self.end_date: str = self.options.get("end_date", "")
        self.user_datetime_format = "%Y-%m-%d"

    def _pull_data(self) -> pd.DataFrame:
        """
        Pulls historical pricing data from the PJM API within the specified date range.

        Returns:
            pd.DataFrame: A DataFrame containing the raw historical pricing data retrieved from the PJM API.
        """

        logging.info(
            f"Historical data requested from {self.start_date} to {self.end_date}"
        )

        start_date_str = datetime.strptime(
            self.start_date, self.user_datetime_format
        ).replace(hour=0, minute=0)
        end_date_str = datetime.strptime(
            self.end_date, self.user_datetime_format
        ).replace(hour=23)

        if self.load_type == "day_ahead":
            url_suffix = "da_hrl_lmps"
        else:
            url_suffix = "rt_hrl_lmps"

        data = self._fetch_paginated_data(url_suffix, start_date_str, end_date_str)

        df = pd.DataFrame(data)
        logging.info(f"Data fetched successfully: {len(df)} rows")

        return df

    def _validate_options(self) -> bool:
        """
        Validates all parameters including the following examples:
            - `start_date` & `end_data` must be in the correct format.
            - `start_date` must be behind `end_data`.
            - `start_date` must not be in the future (UTC).

        Returns:
            True if all looks good otherwise raises Exception.

        """
        super()._validate_options()
        try:
            start_date = datetime.strptime(self.start_date, self.user_datetime_format)
        except ValueError:
            raise ValueError(
                f"Unable to parse Start date. Please specify in {self.user_datetime_format} format."
            )

        try:
            end_date = datetime.strptime(self.end_date, self.user_datetime_format)
        except ValueError:
            raise ValueError(
                f"Unable to parse End date. Please specify in {self.user_datetime_format} format."
            )

        if start_date > datetime.now(timezone.utc).replace(tzinfo=None) - timedelta(
            days=1
        ):
            raise ValueError("Start date can't be in future.")

        if start_date > end_date:
            raise ValueError("Start date can't be ahead of End date.")

        if end_date > datetime.now(timezone.utc).replace(tzinfo=None) - timedelta(
            days=1
        ):
            raise ValueError("End date can't be in future.")

        return True