AWS Storage Blog
Implementing AWS DataSync with hundreds of millions of objects
Moving large volumes of data across your hybrid cloud environments can seem like a daunting task, especially when dealing with a litany of requirements that arise when working within the technical limits of network, storage, compute, and operating system layers, both on-premises and in the cloud. Users face additional challenges when balancing their Recovery Time Objectives (RTO) and Recovery Point Objectives (RPO) while optimizing task schedules and transfer times to meet their compliance needs. These challenges can take on many forms, including technical constraints such as latency and bandwidth quotas between hybrid environments, as well as the size and quantity of the data sets to be migrated.
On February 7th, 2024, AWS DataSync announced support for manifests, a new feature that enables you to provide a definite list of source files or objects to be transferred by your DataSync tasks. Using manifests, you can decrease your task execution times by specifying only the files or objects that need to be processed by your task.
In this post, we walk through how DataSync works with over 100 million source objects in Amazon Simple Storage Service (Amazon S3), and copies and syncs these objects to self-managed storage. We share recommendations that help you move your data more efficiently and prevent issues that can hinder your ability to scale, such as prolonged data transfer wait times. Additionally, this post walks you through how to use the new manifest file feature that has been released by DataSync.
DataSync overview
DataSync simplifies and accelerates data migrations to AWS and helps you move data quickly and securely between on-premises storage, edge locations, other clouds, and AWS Storage.
Users who leverage DataSync to move large amounts of data (such as over 100 million S3 objects) should think strategically about how to best optimize their data transfer tasks, while working within the inherent thresholds of DataSync.
Manifest file overview
A manifest is a list of files or objects that you want DataSync to transfer. For example, instead of copying every object in your S3 bucket, DataSync can copy only the objects you include in a manifest. This feature provides users with more precise control over what DataSync transfers. The manifest file is preferred over using include filters in the case that you want to target large sets of specific files.
Review of relevant quotas
The DataSync quotas documentation outlines the following:
The maximum number of files or objects per task is 50 million. This is the first limitation to overcome if the goal is to reach > 100 million objects. The preceding maximum filter characters and execution history quota are also relevant as we work through creating a solution.
Solution overview
This solution involves the following:
- Restructure the Data: Segment the source data to keep it within the DataSync maximum objects per task execution quota.
- Event Driven with Manifest File: Configure a task to only process the data that we know is new or changed.
- Large Batch with Includes Filter: Programmatically segment the source data into large batches to be queued as DataSync task executions.
Prerequisites
For this solution, you need:
- An AWS account
- A DataSync Agent
- Moderate Python programming skill
Solution walkthrough
In the following sections, we walk you through the various approaches.
Restructure the data
When configuring your DataSync tasks, structure your source data to define tasks with an object count under 50 million to avoid exceeding task quotas. The following examples provide options for performing this restructuring:
- Splitting the objects by path into separate buckets based on timestamp windows.
- Using a new path or bucket when the number of objects reaches a threshold.
However, depending on the specific workloads and other external factors related to the source data, these may not be practical.
Event-driven with manifest file
Using an event-driven approach makes sure that only the files that must be transferred by DataSync are in scope for a given task execution. We do this by calling start-task-execution
with a manifest-config
that contains a list of in scope files/objects for the DataSync task execution. Details on the application programming interface (API) code can be found in this DataSync documentation.
The manifest file restricts scanning and processing activities to only the files/objects identified in the manifest file. This reduces the overall DataSync execution time and minimizes Amazon S3 scan costs.
The creation of new objects in Amazon S3 generates an s3:ObjectCreated event in Amazon EventBridge. For each new object uploaded to Amazon S3, EventBridge sends a JSON payload describing the object to an Amazon Simple Queue Service (Amazon SQS) queue. Using Amazon SQS as the event destination captures these events persistently for processing.
AWS Lambda provides the compute resources needed to process the event payloads through direct synchronous integration with Amazon SQS. This integration provides simple buffering for records, which are configurable for up to five minutes with a batch window.
Depending on the frequency of object uploads on your source bucket, you can buffer for longer than the maximum five minutes. The advantage is that you have less invocations of your DataSync task by passing a larger number of objects. However, this introduces a delay in the transfer. You should evaluate the impact of transferring larger data sets versus any impact on RPO.
To implement a buffer time greater than five minutes, you can use an Amazon EventBridge scheduled rule to invoke Lambda.
Retrieving messages from Amazon SQS
To consume messages from the Amazon SQS queue, the Lambda function uses the boto3 Amazon SQS client library’s get_queue_attributes and receive_messages
API calls.
The Lambda function gets the queue’s length to understand the number of messages that are available. This value can be compared to the count of remaining items in the queue during execution. This makes sure that all messages are retrieved. Alternatively, you may want to cap the number of objects in a single task execution. This can be done by leaving remaining messages in the queue (which may affect RPO), or spawning multiple tasks (which may aid operationally if you need to halt or resume single tasks as part of the overall job after they are queued in DataSync).
<pre><code class="lang-json"></code></pre><pre><code class="lang-json">import boto3
import os
sqs_client = boto3.client('sqs')
queue_url = os.environ['QUEUE_URL']
# Get the SQS queue length
resp = sqs_client.get_queue_attributes(
QueueUrl=queue_url,
AttributeNames=[
'ApproximateNumberOfMessages',
'ApproximateNumberOfMessagesNotVisible'
]
)
approximate_queue_length = resp['Attributes']['ApproximateNumberOfMessages']</code></pre>
Once the number of messages in the Amazon SQS queue is known, we consider the total number of messages to send to each DataSync task for optimal performance. Overloading a single DataSync task with messages can result in delays. This affects the overall processing time of new messages that land on the Amazon SQS queue, as DataSync processes tasks executions sequentially and tasks queue until the active DataSync task complete.
To help alleviate this, we batch Amazon SQS messages to reduce overhead on a DataSync task. For example, batching 1000 messages over five minutes would result in a single DataSync task execution supporting 3.3 new objects per second. Alongside choosing the number of messages to batch at one time, you should also drain the Amazon SQS queue at a set cadence, such as every five minutes to make sure you meet a required RPO.
When it comes time to retrieve the Amazon SQS messages, the receive_message
API can receive up to a maximum of 10 messages from an Amazon SQS queue in a single API call. In the following code example, we use a while loop to retrieve messages from the queue until the message count reaches 1000, or another amount you define.
Given the high volume, yield statements are used in the function to optimize performance, whereby the list of values retrieved from the queue are not stored in memory.
QUEUE_URL = os.environ[“queue_url”] MAX_RECEIVE = 20000
def process_chunks(queue_length: int):
processed_msg_count = 0
max_receive = MAX_RECEIVE
receive_count = 0
# receive from SQS queue & build list of messages
while receive_count < (max_receive / 10) and processed_msg_count < queue_length:
receive_count += 1
try:
resp = sqs_client.receive_message(
QueueUrl=QUEUE_URL, AttributeNames=["All"], MaxNumberOfMessages=10
)
except Exception as e:
logging.error("SQS Retrieve Error")
logging.error(traceback.format_exc())
try:
msgs = []
for message in resp["Messages"]:
msgs.append(message)
processed_msg_count += 1
yield msgs # Messages to be sent to DataSync
except KeyError:
return
Building the manifest
Once the messages are retrieved from the Amazon SQS queue, we must perform a number of actions on the messages prior to passing these to DataSync.
Steps to build the manifest file:
- Extract the S3 object’s ‘Key’ value from each Amazon SQS message.
- Write the manifest file to Amazon S3.
- Make sure there are no duplicates.
- Make sure there are no folders or prefixes (note that only objects and files can be included in a manifest).
When writing the manifest file to Amazon S3 the following rules must be followed:
- Specify the full object path, you cannot specify just a folder/path/directory. You can specify the specific version of an object (comma-delimited) to transfer.
- Separate each object with a new line character.
- Save the manifest file as a csv file (file-name.csv).
Required IAM Permissions:
- Lambda: The Lambda function’s execution role must include the s3:PutObject permission on the path (bucket/folder) containing the manifest file.
- DataSync: The DataSync Task AWS Identity and Access Management (IAM) role must include the s3:GetObject permission on the path (bucket/folder) where the manifest file is stored and s3:GetObjectVersion if specifying an object version.
Another consideration is the name of the manifest file. The manifest file must not be deleted or amended during task execution or after task execution if we want to re-run the same task. One strategy is to use the execution-ID from the Lambda invocation to name the file. This prevents overlap and gives the name to the execution API. This technique needs some cleaning up of the manifest bucket over time, as Lambda would continually generate new files.
Extract the S3 object’s ‘Key’ value from each Amazon SQS message:
- The list of messages from the Amazon SQS Queue can now be loaded into a separate function to extract the S3 object’s Key value from each message. This function also handles any directories that are part of this list, and excludes them from the manifest file.
def process_chunk_data(messages: list):
manifest_data = []
processed_msgs = []
for msg in messages:
msg_body = msg["Body"]
payload = json.loads(msg_body)
msg_key = payload["detail"]["object"]["key"]
if msg_key[-1] != "/": # Prevents Dirs from manifest
manifest_data.append(msg_key)
else:
logger.info(f"Directory found. Skipping from manifest: {msg_key}")
processed_msgs.extend(
[{"Id": msg["MessageId"], "ReceiptHandle": msg["ReceiptHandle"]}]
)
return (manifest_data, processed_msgs)
To make sure we have a list of unique elements, without duplicates, that is saved to our manifest file, we use the Python set method on the list variable (such as manifest_data
) that contains our list of object keys.
Write the manifest file to Amazon S3:
- Once the manifest file has been loaded with objects to be transferred, the file should now be uploaded to the manifest bucket.
To keep the file name unique, this example uses context.aws_request_id
from the Lambda invocation, as previously mentioned.
def publish_manifest(manifest_data: set, aws_request_id: str, idx: int):
filename = f"{S3_MANIFEST_FOLDER}{aws_request_id}_{idx}.csv"
result = s3_resource.Object(S3_BUCKET_NAME, filename).put(
Body=','.join(manifest_data)[1:]
)
if result.get("ResponseMetadata").get("HTTPStatusCode") == 200:
logger.info(f"Manifest Upload: Successful: Size: {len(manifest_data)} File: {S3_BUCKET_NAME} {filename}")
else:
sys.exit(1)
return filename
Invoking DataSync
Once the manifest file has been created and uploaded to Amazon S3, the DataSync task is executed. The Lambda function now calls the StartTaskExecution API, referring to the path of the manifest file.
ds_client = boto3.client('datasync')
DS_TASK_ARN = os.environ[ds_task_arn']
ds_client.start_task_execution(
TaskArn=DS_TASK_ARN,
OverrideOptions={
"VerifyMode": "ONLY_FILES_TRANSFERRED",
"TransferMode": "ALL",
},
ManifestConfig={
"Action": "TRANSFER",
"Format": "CSV",
"Source": {
"S3": {
"ManifestObjectPath": manifest_file,
"S3BucketArn": S3_BUCKET_MANIFEST_ARN,
"BucketAccessRoleArn": DS_S3_ROLE_ARN
}
}
}
)
Deleting successfully transferred messages from the Amazon SQS Queue
Once the DataSync task has successfully run, the messages in the Amazon SQS Queue can be safely removed. The previous process_chunk_data
function captured which messages should be deleted upon a successful task execution in the variable processed_msgs
. This value is passed into the following delete_messages
function.
QUEUE_URL = os.environ["queue_url"]
def delete_messages(processed_msgs: list):
total_msgs = len(processed_msgs)
processed_msg_buffer = processed_msgs
deleted_msg_count = 0
while len(processed_msg_buffer) > 0:
delete_msg_list = processed_msg_buffer[0:10]
processed_msg_buffer = processed_msg_buffer[10:]
try:
resp = sqs_client.delete_message_batch(
QueueUrl=QUEUE_URL, Entries=delete_msg_list
)
deleted_msg_count += len(resp["Successful"])
except Exception as e:
logging.error(traceback.format_exc())
if deleted_msg_count != total_msgs:
raise RuntimeError(
f"Failed to delete messages: total messages={total_msgs!r} resp={resp!r}"
)
else:
logger.info(f"Deleted {deleted_msg_count} from the queue")
return total_msgs
Large batch with includes filter
Using DataSync to move large batches is suitable if you need to initially seed a large set of data. An offline alternative could be AWS Snowball depending on how often you wish to repeat the task and the timelines you are working toward. DataSync is also suitable when running an infrequent validation of your full dataset in conjunction with an event-driven approach.
With DataSync, you can use StartTaskExecution with the VerifyMode option set to POINT_IN_TIME_CONSISTENT
to check and validate the full data set. In this case, as the intent is to transfer and validate the full data set, the use of manifests is not needed. If VerifyMode is used with manifests for other use cases, then ONLY_FILES_TRANSFERRED
is required to target only the files identified and transferred as per the operation.
Object quotas
When using a batched approach, there are some further theoretical maximums to discuss to understand where the upper quotas are.
- 50 million is the maximum number of files or objects per task execution.
- 50 executions is the maximum number of task executions that can be concurrently queued for a single task.
- 2.5 billion is the maximum objects queued per task, if the maximum number of tasks are queued.
First, evaluate if the task is large enough for our needs. If it is not, then consider having multiple tasks defined with multiple target agent sets.
Filter quotas
To include objects for the task, we use an includes filter, and a maximum length for the filter is 102,400 characters. When and how this quota is reached is dependent on the path structure used in the object keys. A longer path prefix means that the concatenation of the filters reaches the maximum length faster.
Creating a filter string
When creating a filter, include objects without going over the task maximum as an efficient use of task invocations.
The filter object looks like this:
includes = [{
"FilterType": "SIMPLE_PATTERN",
"Value": str(datasync_filter)
}]
Batching (UUID-based)
Filter length is predictable when a large flat Key Prefix uses universally unique identifier (UUID). A UUID is constructed through a hexadecimal base16 (Hex) that makes sure each prefix is unique. The following table provides further details.
The number of objects included in a filter decreases as the number of digits included increases. An inclusion of too many digits increases the number of filters prefix needed and may result in an overall filter length that is too long. If we don’t have enough digits, then we have too many objects. Investigation is needed to find the optimal number of included digits for the prefix filter.
For example, if we have a path prefix of 21 characters in the object key, then a single filter with 1 digit would be 21+1=22 characters.
With one digit this would result in 16 prefix groups. We also use a delimiter of | so the length of the filter string in this case would be (22*16)+15=367 characters.
At 3 digits, the filter is 64+3=67 characters, with 4096 prefix groups. This results in a filter string length (24*4096)+21=102,399 characters, which is within the maximum length. Adding additional digits would make the string too long and would exceed the maximum.
These examples are all based on the fact the UUID has a large enough sample set to have enough entropy to evenly distribute objects over the digit match prefixes.
Code sample (large batch-UUID)
The following sample code illustrates how to implement the preceding optimal features.
Use Python to generate a full set of UUID Prefixes:
datasync_filter = ''
number_of_digits = 3
prefix = 'path/to/objects/'
hex_range = range(0, 16**number_of_digits)
uuid_prefixes = [f"{i:x}".zfill(number_of_digits) for i in hex_range]
# These can then be joined with the path prefix to create filter.
for uuid_prefix in uuid_prefixes:
# Append the Prefix to the Filter, adding delimiter for all but the first
if datasync_filter:
datasync_filter += '|'
datasync_filter += f'{prefix}{uuid_prefix}*'
datasync_filter
can then be used in the includes
filter.
Cleaning up
Delete example resources if you no longer need them to avoid incurring future costs. This should include the Lambda functions, Amazon SQS queues, and any sample data used in Amazon S3.
Conclusion
In this post, we discussed techniques that can be used when looking to batch, process, transfer, and validate even the largest data sets with AWS DataSync, even when these data sets may increase to over 100 million objects. We explored manifests, and how you can use this new feature of DataSync as part of an event-driven architecture to decrease task execution times by specifying only the files or objects that must be processed by your task. Additionally, we demonstrated how to use DataSync with an include filter which can be useful for customers who need to perform an initial large bulk transfer of their data. We also outlined some of the quotas of the AWS DataSync service and provided recommendations on how you can move your data more efficiently to work around these thresholds.
To learn more about AWS DataSync and manifests, check out the following links: