Skip to content

Write to Rest API

SparkRestAPIDestination

Bases: DestinationInterface

The Spark Rest API Destination is used to write data to a Rest API.

The payload sent to the API is constructed by converting each row in the DataFrame to Json.

Note

While it is possible to use the write_batch method, it is easy to overwhlem a Rest API with large volumes of data. Consider reducing data volumes when writing to a Rest API in Batch mode to prevent API errors including throtting.

Parameters:

Name Type Description Default
data DataFrame

Dataframe to be merged into a Delta Table

required
options dict

A dictionary of options for streaming writes

required
url str

The Rest API Url

required
headers dict

A dictionary of headers to be provided to the Rest API

required
batch_size int

The number of DataFrame rows to be used in each Rest API call

required
method str

The method to be used when calling the Rest API. Allowed values are POST, PATCH and PUT

'POST'
parallelism int

The number of concurrent calls to be made to the Rest API

8
trigger str

Frequency of the write operation. Specify "availableNow" to execute a trigger once, otherwise specify a time period such as "30 seconds", "5 minutes"

'1 minutes'
query_name str

Unique name for the query in associated SparkSession

'DeltaRestAPIDestination'

Attributes:

Name Type Description
checkpointLocation str

Path to checkpoint files. (Streaming)

Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/rest_api.py
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
class SparkRestAPIDestination(DestinationInterface):
    """
    The Spark Rest API Destination is used to write data to a Rest API.

    The payload sent to the API is constructed by converting each row in the DataFrame to Json.

    !!! Note
        While it is possible to use the `write_batch` method, it is easy to overwhlem a Rest API with large volumes of data.
        Consider reducing data volumes when writing to a Rest API in Batch mode to prevent API errors including throtting.

    Args:
        data (DataFrame): Dataframe to be merged into a Delta Table
        options (dict): A dictionary of options for streaming writes
        url (str): The Rest API Url
        headers (dict): A dictionary of headers to be provided to the Rest API
        batch_size (int): The number of DataFrame rows to be used in each Rest API call
        method (str): The method to be used when calling the Rest API. Allowed values are POST, PATCH and PUT
        parallelism (int): The number of concurrent calls to be made to the Rest API
        trigger (str): Frequency of the write operation. Specify "availableNow" to execute a trigger once, otherwise specify a time period such as "30 seconds", "5 minutes"
        query_name (str): Unique name for the query in associated SparkSession

    Attributes:
        checkpointLocation (str): Path to checkpoint files. (Streaming)
    """

    data: DataFrame
    options: dict
    url: str
    headers: dict
    batch_size: int
    method: str
    parallelism: int
    trigger: str
    query_name: str

    def __init__(
        self,
        data: DataFrame,
        options: dict,
        url: str,
        headers: dict,
        batch_size: int,
        method: str = "POST",
        parallelism: int = 8,
        trigger="1 minutes",
        query_name: str = "DeltaRestAPIDestination",
    ) -> None:
        self.data = data
        self.options = options
        self.url = url
        self.headers = headers
        self.batch_size = batch_size
        self.method = method
        self.parallelism = parallelism
        self.trigger = trigger
        self.query_name = query_name

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK
        """
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        libraries = Libraries()
        libraries.add_pypi_library(get_default_package("api_requests"))
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def pre_write_validation(self):
        return True

    def post_write_validation(self):
        return True

    def _pre_batch_records_for_api_call(self, micro_batch_df: DataFrame):
        batch_count = math.ceil(micro_batch_df.count() / self.batch_size)
        micro_batch_df = (
            micro_batch_df.withColumn("content", to_json(struct(col("*"))))
            .withColumn("row_number", row_number().over(Window().orderBy(lit("A"))))
            .withColumn("batch_id", col("row_number") % batch_count)
        )
        return micro_batch_df.groupBy("batch_id").agg(
            concat_ws(",|", collect_list("content")).alias("payload")
        )

    def _api_micro_batch(self, micro_batch_df: DataFrame, epoch_id=None):  # NOSONAR
        url = self.url
        method = self.method
        headers = self.headers

        @udf("string")
        def _rest_api_execute(data):
            session = requests.Session()
            adapter = HTTPAdapter(max_retries=3)
            session.mount("http://", adapter)  # NOSONAR
            session.mount("https://", adapter)

            if method == "POST":
                response = session.post(url, headers=headers, data=data, verify=False)
            elif method == "PATCH":
                response = session.patch(url, headers=headers, data=data, verify=False)
            elif method == "PUT":
                response = session.put(url, headers=headers, data=data, verify=False)
            else:
                raise Exception("Method {} is not supported".format(method))  # NOSONAR

            if not (response.status_code == 200 or response.status_code == 201):
                raise Exception(
                    "Response status : {} .Response message : {}".format(
                        str(response.status_code), response.text
                    )
                )  # NOSONAR

            return str(response.status_code)

        micro_batch_df.persist()
        micro_batch_df = self._pre_batch_records_for_api_call(micro_batch_df)

        micro_batch_df = micro_batch_df.repartition(self.parallelism)

        (
            micro_batch_df.withColumn(
                "rest_api_response_code", _rest_api_execute(micro_batch_df["payload"])
            ).collect()
        )
        micro_batch_df.unpersist()

    def write_batch(self):
        """
        Writes batch data to a Rest API
        """
        try:
            return self._api_micro_batch(self.data)

        except Py4JJavaError as e:
            logging.exception(e.errmsg)
            raise e
        except Exception as e:
            logging.exception(str(e))
            raise e

    def write_stream(self):
        """
        Writes streaming data to a Rest API
        """
        try:
            TRIGGER_OPTION = (
                {"availableNow": True}
                if self.trigger == "availableNow"
                else {"processingTime": self.trigger}
            )
            query = (
                self.data.writeStream.trigger(**TRIGGER_OPTION)
                .foreachBatch(self._api_micro_batch)
                .queryName(self.query_name)
                .outputMode("update")
                .options(**self.options)
                .start()
            )

            while query.isActive:
                if query.lastProgress:
                    logging.info(query.lastProgress)
                time.sleep(10)

        except Py4JJavaError as e:
            logging.exception(e.errmsg)
            raise e
        except Exception as e:
            logging.exception(str(e))
            raise e

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK

Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/rest_api.py
 96
 97
 98
 99
100
101
102
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK
    """
    return SystemType.PYSPARK

write_batch()

Writes batch data to a Rest API

Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/rest_api.py
173
174
175
176
177
178
179
180
181
182
183
184
185
def write_batch(self):
    """
    Writes batch data to a Rest API
    """
    try:
        return self._api_micro_batch(self.data)

    except Py4JJavaError as e:
        logging.exception(e.errmsg)
        raise e
    except Exception as e:
        logging.exception(str(e))
        raise e

write_stream()

Writes streaming data to a Rest API

Source code in src/sdk/python/rtdip_sdk/pipelines/destinations/spark/rest_api.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def write_stream(self):
    """
    Writes streaming data to a Rest API
    """
    try:
        TRIGGER_OPTION = (
            {"availableNow": True}
            if self.trigger == "availableNow"
            else {"processingTime": self.trigger}
        )
        query = (
            self.data.writeStream.trigger(**TRIGGER_OPTION)
            .foreachBatch(self._api_micro_batch)
            .queryName(self.query_name)
            .outputMode("update")
            .options(**self.options)
            .start()
        )

        while query.isActive:
            if query.lastProgress:
                logging.info(query.lastProgress)
            time.sleep(10)

    except Py4JJavaError as e:
        logging.exception(e.errmsg)
        raise e
    except Exception as e:
        logging.exception(str(e))
        raise e