Skip to content

S3 Copy Utility

S3CopyUtility

Bases: UtilitiesInterface

Copies an object from S3 to S3, from Local to S3 and S3 to local depending on the source and destination uri.

Example

from rtdip_sdk.pipelines.utilities import S3CopyUtility

s3_copy_utility = S3CopyUtility(
    source_uri="YOUR-SOURCE-URI",
    destination_uri="YOUR-DESTINATION-URI",
    source_version_id="YOUR-VERSION-ID",
    extra_args={},
    callback="YOUD-SID",
    source_client="PRINCIPAL",
    transfer_config=["ACTIONS"]
)

result = s3_bucket_policy_utility.execute()

Parameters:

Name Type Description Default
source_uri str

URI of the source object

required
destination_uri str

URI of the destination object

required
source_version_id optional str

Version ID of the source bucket

None
extra_args optional dict

Extra arguments that can be passed to the client operation. See here for a list of download arguments

None
callback optional function

Takes a UDF used for tracking the progress of the copy operation

None
source_client optional botocore or boto3 client

A different S3 client to use for the source bucket during the copy operation

None
transfer_config optional class

The transfer configuration used during the copy. See here for all parameters

None
Source code in src/sdk/python/rtdip_sdk/pipelines/utilities/aws/s3_copy_utility.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
class S3CopyUtility(UtilitiesInterface):
    """
    Copies an object from S3 to S3, from Local to S3 and S3 to local depending on the source and destination uri.

    Example
    --------
    ```python
    from rtdip_sdk.pipelines.utilities import S3CopyUtility

    s3_copy_utility = S3CopyUtility(
        source_uri="YOUR-SOURCE-URI",
        destination_uri="YOUR-DESTINATION-URI",
        source_version_id="YOUR-VERSION-ID",
        extra_args={},
        callback="YOUD-SID",
        source_client="PRINCIPAL",
        transfer_config=["ACTIONS"]
    )

    result = s3_bucket_policy_utility.execute()
    ```

    Parameters:
        source_uri (str): URI of the source object
        destination_uri (str): URI of the destination object
        source_version_id (optional str): Version ID of the source bucket
        extra_args (optional dict): Extra arguments that can be passed to the client operation. See [here](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/customizations/s3.html#boto3.s3.transfer.S3Transfer.ALLOWED_DOWNLOAD_ARGS){ target="_blank" } for a list of download arguments
        callback (optional function): Takes a UDF used for tracking the progress of the copy operation
        source_client (optional botocore or boto3 client): A different S3 client to use for the source bucket during the copy operation
        transfer_config (optional class): The transfer configuration used during the copy. See [here](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/customizations/s3.html#boto3.s3.transfer.TransferConfig){ target="_blank" } for all parameters

    """

    source_uri: str
    destination_uri: str
    destination_key: str
    extra_args: dict
    callback: str
    source_client: S3Transfer
    transfer_config: Config

    def __init__(
        self,
        source_uri: str,
        destination_uri: str,
        source_version_id: str = None,
        extra_args: dict = None,
        callback=None,
        source_client: S3Transfer = None,
        transfer_config: Config = None,
    ):
        self.source_uri = source_uri
        self.destination_uri = destination_uri
        self.source_version_id = source_version_id
        self.extra_args = extra_args
        self.callback = callback
        self.source_client = source_client
        self.transfer_config = transfer_config

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYTHON
        """
        return SystemType.PYTHON

    @staticmethod
    def libraries():
        libraries = Libraries()
        libraries.add_pypi_library(get_default_package("aws_boto3"))
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def execute(self) -> bool:
        # S3 to S3 Copy
        if self.source_uri.startswith(
            storage_objects_utils.S3_SCHEME
        ) and self.destination_uri.startswith(storage_objects_utils.S3_SCHEME):
            schema, source_domain, source_key = storage_objects_utils.validate_uri(
                self.source_uri
            )
            (
                schema,
                destination_domain,
                destination_key,
            ) = storage_objects_utils.validate_uri(self.destination_uri)

            s3 = boto3.resource(schema)
            copy_source = {"Bucket": source_domain, "Key": source_key}
            if self.source_version_id is not None:
                copy_source["VersionId"] = self.source_version_id

            try:
                s3.meta.client.copy(
                    copy_source,
                    destination_domain,
                    destination_key,
                    self.extra_args,
                    self.callback,
                    self.source_client,
                    self.transfer_config,
                )

            except Exception as ex:
                logging.error(ex)
                return False
        # Local File to S3 Copy (Upload)
        elif (os.path.isfile(self.source_uri)) and self.destination_uri.startswith(
            storage_objects_utils.S3_SCHEME
        ):
            (
                schema,
                destination_domain,
                destination_key,
            ) = storage_objects_utils.validate_uri(self.destination_uri)

            s3_client = boto3.client(schema)

            try:
                s3_client.upload_file(
                    self.source_uri, destination_domain, destination_key
                )
            except Exception as ex:
                logging.error(ex)
                return False
        # S3 to Local File Copy (Download)
        elif self.source_uri.startswith(
            storage_objects_utils.S3_SCHEME
        ) and not self.destination_uri.startswith(storage_objects_utils.S3_SCHEME):
            try:
                schema, source_domain, source_key = storage_objects_utils.validate_uri(
                    self.source_uri
                )
                s3 = boto3.client(schema)
                s3.download_file(source_domain, source_key, self.destination_uri)
            except Exception as ex:
                logging.error(ex)
                return False
        else:
            logging.error(
                "Not Implemented. From: %s \n\t to: %s",
                self.source_uri,
                self.destination_uri,
            )
        return True

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYTHON

Source code in src/sdk/python/rtdip_sdk/pipelines/utilities/aws/s3_copy_utility.py
85
86
87
88
89
90
91
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYTHON
    """
    return SystemType.PYTHON