Skip to content

Write to Rest API

SparkRestAPIDestination

Bases: DestinationInterface

The Spark Rest API Destination is used to write data to a Rest API.

The payload sent to the API is constructed by converting each row in the DataFrame to Json.

Note

While it is possible to use the write_batch method, it is easy to overwhlem a Rest API with large volumes of data. Consider reducing data volumes when writing to a Rest API in Batch mode to prevent API errors including throtting.

Example

#Rest API Destination for Streaming Queries

from rtdip_sdk.pipelines.destinations import SparkRestAPIDestination

rest_api_destination = SparkRestAPIDestination(
    data=df,
    options={
        "checkpointLocation": "{/CHECKPOINT-LOCATION/}"
    },
    url="{REST-API-URL}",
    headers = {
        'Authorization': 'Bearer {}'.format("{TOKEN}")
    },
    batch_size=100,
    method="POST",
    parallelism=8,
    trigger="1 minute",
    query_name="DeltaRestAPIDestination",
    query_wait_interval=None
)

rest_api_destination.write_stream()
#Rest API Destination for Batch Queries

from rtdip_sdk.pipelines.destinations import SparkRestAPIDestination

rest_api_destination = SparkRestAPIDestination(
    data=df,
    options={},
    url="{REST-API-URL}",
    headers = {
        'Authorization': 'Bearer {}'.format("{TOKEN}")
    },
    batch_size=10,
    method="POST",
    parallelism=4,
    trigger="1 minute",
    query_name="DeltaRestAPIDestination",
    query_wait_interval=None
)

rest_api_destination.write_stream()

Parameters:

Name Type Description Default
data DataFrame

Dataframe to be merged into a Delta Table

required
options dict

A dictionary of options for streaming writes

required
url str

The Rest API Url

required
headers dict

A dictionary of headers to be provided to the Rest API

required
batch_size int

The number of DataFrame rows to be used in each Rest API call

required
method str

The method to be used when calling the Rest API. Allowed values are POST, PATCH and PUT

'POST'
parallelism int

The number of concurrent calls to be made to the Rest API

8
trigger optional str

Frequency of the write operation. Specify "availableNow" to execute a trigger once, otherwise specify a time period such as "30 seconds", "5 minutes". Set to "0 seconds" if you do not want to use a trigger. (stream) Default is 10 seconds

'1 minutes'
query_name str

Unique name for the query in associated SparkSession

'DeltaRestAPIDestination'
query_wait_interval optional int

If set, waits for the streaming query to complete before returning. (stream) Default is None

None

Attributes:

Name Type Description
checkpointLocation str

Path to checkpoint files. (Streaming)

Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/rest_api.py
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
class SparkRestAPIDestination(DestinationInterface):
    """
    The Spark Rest API Destination is used to write data to a Rest API.

    The payload sent to the API is constructed by converting each row in the DataFrame to Json.

    !!! Note
        While it is possible to use the `write_batch` method, it is easy to overwhlem a Rest API with large volumes of data.
        Consider reducing data volumes when writing to a Rest API in Batch mode to prevent API errors including throtting.

    Example
    --------
    ```python
    #Rest API Destination for Streaming Queries

    from rtdip_sdk.pipelines.destinations import SparkRestAPIDestination

    rest_api_destination = SparkRestAPIDestination(
        data=df,
        options={
            "checkpointLocation": "{/CHECKPOINT-LOCATION/}"
        },
        url="{REST-API-URL}",
        headers = {
            'Authorization': 'Bearer {}'.format("{TOKEN}")
        },
        batch_size=100,
        method="POST",
        parallelism=8,
        trigger="1 minute",
        query_name="DeltaRestAPIDestination",
        query_wait_interval=None
    )

    rest_api_destination.write_stream()
    ```
    ```python
    #Rest API Destination for Batch Queries

    from rtdip_sdk.pipelines.destinations import SparkRestAPIDestination

    rest_api_destination = SparkRestAPIDestination(
        data=df,
        options={},
        url="{REST-API-URL}",
        headers = {
            'Authorization': 'Bearer {}'.format("{TOKEN}")
        },
        batch_size=10,
        method="POST",
        parallelism=4,
        trigger="1 minute",
        query_name="DeltaRestAPIDestination",
        query_wait_interval=None
    )

    rest_api_destination.write_stream()
    ```

    Parameters:
        data (DataFrame): Dataframe to be merged into a Delta Table
        options (dict): A dictionary of options for streaming writes
        url (str): The Rest API Url
        headers (dict): A dictionary of headers to be provided to the Rest API
        batch_size (int): The number of DataFrame rows to be used in each Rest API call
        method (str): The method to be used when calling the Rest API. Allowed values are POST, PATCH and PUT
        parallelism (int): The number of concurrent calls to be made to the Rest API
        trigger (optional str): Frequency of the write operation. Specify "availableNow" to execute a trigger once, otherwise specify a time period such as "30 seconds", "5 minutes". Set to "0 seconds" if you do not want to use a trigger. (stream) Default is 10 seconds
        query_name (str): Unique name for the query in associated SparkSession
        query_wait_interval (optional int): If set, waits for the streaming query to complete before returning. (stream) Default is None

    Attributes:
        checkpointLocation (str): Path to checkpoint files. (Streaming)
    """

    data: DataFrame
    options: dict
    url: str
    headers: dict
    batch_size: int
    method: str
    parallelism: int
    trigger: str
    query_name: str
    query_wait_interval: int

    def __init__(
        self,
        data: DataFrame,
        options: dict,
        url: str,
        headers: dict,
        batch_size: int,
        method: str = "POST",
        parallelism: int = 8,
        trigger="1 minutes",
        query_name: str = "DeltaRestAPIDestination",
        query_wait_interval: int = None,
    ) -> None:
        self.data = data
        self.options = options
        self.url = url
        self.headers = headers
        self.batch_size = batch_size
        self.method = method
        self.parallelism = parallelism
        self.trigger = trigger
        self.query_name = query_name
        self.query_wait_interval = query_wait_interval

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK
        """
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        libraries = Libraries()
        libraries.add_pypi_library(get_default_package("api_requests"))
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def pre_write_validation(self):
        return True

    def post_write_validation(self):
        return True

    def _pre_batch_records_for_api_call(self, micro_batch_df: DataFrame):
        batch_count = math.ceil(micro_batch_df.count() / self.batch_size)
        micro_batch_df = (
            micro_batch_df.withColumn("content", to_json(struct(col("*"))))
            .withColumn("row_number", row_number().over(Window().orderBy(lit("A"))))
            .withColumn("batch_id", col("row_number") % batch_count)
        )
        return micro_batch_df.groupBy("batch_id").agg(
            concat_ws(",|", collect_list("content")).alias("payload")
        )

    def _api_micro_batch(self, micro_batch_df: DataFrame, epoch_id=None):  # NOSONAR
        url = self.url
        method = self.method
        headers = self.headers

        @udf("string")
        def _rest_api_execute(data):
            session = requests.Session()
            adapter = HTTPAdapter(max_retries=3)
            session.mount("http://", adapter)  # NOSONAR
            session.mount("https://", adapter)

            if method == "POST":
                response = session.post(url, headers=headers, data=data, verify=False)
            elif method == "PATCH":
                response = session.patch(url, headers=headers, data=data, verify=False)
            elif method == "PUT":
                response = session.put(url, headers=headers, data=data, verify=False)
            else:
                raise Exception("Method {} is not supported".format(method))  # NOSONAR

            if not (response.status_code == 200 or response.status_code == 201):
                raise Exception(
                    "Response status : {} .Response message : {}".format(
                        str(response.status_code), response.text
                    )
                )  # NOSONAR

            return str(response.status_code)

        micro_batch_df.persist()
        micro_batch_df = self._pre_batch_records_for_api_call(micro_batch_df)

        micro_batch_df = micro_batch_df.repartition(self.parallelism)

        (
            micro_batch_df.withColumn(
                "rest_api_response_code", _rest_api_execute(micro_batch_df["payload"])
            ).collect()
        )
        micro_batch_df.unpersist()

    def write_batch(self):
        """
        Writes batch data to a Rest API
        """
        try:
            return self._api_micro_batch(self.data)

        except Py4JJavaError as e:
            logging.exception(e.errmsg)
            raise e
        except Exception as e:
            logging.exception(str(e))
            raise e

    def write_stream(self):
        """
        Writes streaming data to a Rest API
        """
        try:
            TRIGGER_OPTION = (
                {"availableNow": True}
                if self.trigger == "availableNow"
                else {"processingTime": self.trigger}
            )
            query = (
                self.data.writeStream.trigger(**TRIGGER_OPTION)
                .foreachBatch(self._api_micro_batch)
                .queryName(self.query_name)
                .outputMode("update")
                .options(**self.options)
                .start()
            )

            if self.query_wait_interval:
                while query.isActive:
                    if query.lastProgress:
                        logging.info(query.lastProgress)
                    time.sleep(self.query_wait_interval)

        except Py4JJavaError as e:
            logging.exception(e.errmsg)
            raise e
        except Exception as e:
            logging.exception(str(e))
            raise e

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK

Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/rest_api.py
149
150
151
152
153
154
155
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK
    """
    return SystemType.PYSPARK

write_batch()

Writes batch data to a Rest API

Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/rest_api.py
226
227
228
229
230
231
232
233
234
235
236
237
238
def write_batch(self):
    """
    Writes batch data to a Rest API
    """
    try:
        return self._api_micro_batch(self.data)

    except Py4JJavaError as e:
        logging.exception(e.errmsg)
        raise e
    except Exception as e:
        logging.exception(str(e))
        raise e

write_stream()

Writes streaming data to a Rest API

Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/rest_api.py
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
def write_stream(self):
    """
    Writes streaming data to a Rest API
    """
    try:
        TRIGGER_OPTION = (
            {"availableNow": True}
            if self.trigger == "availableNow"
            else {"processingTime": self.trigger}
        )
        query = (
            self.data.writeStream.trigger(**TRIGGER_OPTION)
            .foreachBatch(self._api_micro_batch)
            .queryName(self.query_name)
            .outputMode("update")
            .options(**self.options)
            .start()
        )

        if self.query_wait_interval:
            while query.isActive:
                if query.lastProgress:
                    logging.info(query.lastProgress)
                time.sleep(self.query_wait_interval)

    except Py4JJavaError as e:
        logging.exception(e.errmsg)
        raise e
    except Exception as e:
        logging.exception(str(e))
        raise e