AWS Database Blog
Choosing the right number of shards for your large-scale Amazon DynamoDB table
As a general design best practice, you can optimize your use of Amazon DynamoDB throughput capacity by designing your application for uniform read and write activity across all logical partition keys in the table and its indexes. In doing so, you can prevent creating hot partitions that could consume a disproportionate amount of your table’s capacity.
One way to distribute write access evenly across partitions is to expand the partition keyspace. The strategy is to append partition keys with additional suffixes to increase the distribution across the underlying partitions, which is called write sharding.
However, this approach leads to some interesting design questions. When should you consider sharding? How many shards should you create for each partition key? When do you create the shards? How do you scale the number of shards? How does sharding affect your read and write patterns?
This post describes a dynamic write sharding mechanism for DynamoDB tables with composite primary keys (partition key and sort key). This mechanism enables you to optimize the write capacity of a DynamoDB table by adding new shards for partition keys on the fly, based on increased demand for write throughput.
Partitions, keys, and write sharding
DynamoDB excels at horizontally distributed workloads. When you create a table, DynamoDB allocates sufficient partitions to the table to handle your provisioned throughput requirements. At creation, DynamoDB evenly distributes the table’s capacity across the underlying partitions. DynamoDB may allocate additional partitions to the table after table creation. For more information, see Best Practices for Designing and Using Partition Keys Effectively and Partitions and Data Distribution.
DynamoDB uses a consistent internal hash function to distribute items to partitions, and an item’s partition key determines which partition DynamoDB stores it on. A group of items sharing an identical partition key (called a collection) map to the same partition, unless the collection exceeds the partition’s storage capacity.
Additionally, a single partition may hold items associated with more than one collection. Disproportionately high write activities on one or more collections mapped to the same partition can lead to a hot partition. A hot partition is when concentrated read and write activities exceed the provisioned capacity of the partition, or the maximum partition capacity, and manifest as capacity errors.
These capacity errors are identified through ProvisionedThroughputExceededException
in the provisioned capacity model and InternalServerError
in the on-demand capacity model. Experiencing these capacity errors is more likely with large collections, because items in the same collection map to the same underlying partition. For more information, see Error Handling and Read/Write Capacity Mode.
Write sharding is a mechanism to distribute a collection across a DynamoDB table’s partitions effectively. It increases write throughput per partition key by distributing the write operations for a partition key across multiple partitions. Write throughput for individual partition keys can therefore exceed the underlying partition capacity, and minimize capacity errors at the DynamoDB partition level.
Additionally, the partition capacities are used more evenly by spreading the write operations across DynamoDB’s partitions. This leads to more efficient use of the table capacity and lowers costs.
You can implement write sharding from the client side by suffixing simple values to your partition key. Write sharding through suffixing is effective because even a one-byte change to the partition key produces a different output in the internal hash function, and places the item on a different partition. For more information, see Using Write Sharding to Distribute Workloads Evenly.
DynamoDB provides native features of burst capacity, which can mitigate temporary demand bursts, and adaptive capacity, which can repurpose capacity between partitions to match unbalanced access patterns. Write sharding is a complementary mechanism to distribute traffic evenly across DynamoDB’s partitions.
Write sharding example with a pre-determined number of shards
An example of a table that you would want to shard is an audit log for a file system, where the partition key is a file path and the sort key is the access timestamp. If this file becomes extremely popular and is shared frequently, a single DynamoDB partition receives a disproportionate number of write requests.
The following table shows the same item appearing in three different sharding schemes:
- In the first method, the data sits inside one shard.
- In the second method, the writer chooses a random number between 1 and 10 for ten shards, and suffixes it onto the partition key before updating the item. You need to make subsequent reads for the partition key against each of the 10 shards.
- In the third method, to determine the shard number, the writer hashes the partition key concatenated with the sort key, and performs modulo of the hash against the pre-determined number of shards (10).
This method is useful when optimizing a query to find the audit item based on the file path and time of access. For example, to identify which shard to query to find the audit item storing information about access to the file at a particular time, you can calculate the hash of File Path and timestamp instead of querying all 10 shards to find an item.
Method | Method Description |
Partition Key | Sort Key | # of Shards | Shard Algorithm | Sample Partition Key | Sample Sort Key (Access Timestamp) |
1 | Original item | <File Path> | <timestamp> | 1 | None | /shared/firetvGen2.txt | 123456789101 |
2 | Random write shard | <File Path>_<Shard> | <timestamp> | 10 | RANDOM(1, 10) | /shared/firetvGen2.txt_3 | 123456789101 |
3 | Calculated write shard | <File Path>_<Shard> | <timestamp> | 10 | md5(<File Path> + timestamp) % 10 + 1 | /shared/firetvGen2.txt_6 | 123456789101 |
Determining the number of shards for a partition key
The preceding example uses a pre-determined number of shards for its partition keys. This strategy is effective when the read and write capacity needs for each partition key and future growth are well understood. However, this is not always the case; some use cases have variable and non-deterministic read and write patterns for each partition key.
A dynamic sharding mechanism, which can automatically add new shards for partition keys based on feedback from DynamoDB, can prevent under-sizing the number of shards and lets you scale to virtually any limit. For more information, see Amazon DynamoDB auto scaling: Performance and cost optimization at any scale.
Dynamic sharding mechanism
For the dynamic sharding mechanism, create a separate DynamoDB table to track shard metadata related to each partition key (subsequently referred to as a partition key metadata table). DynamoDB readers and writers in your application determine the current number of shards for a given partition key by querying the metadata table.
The metadata table has the following schema for each item.
Type | Description | Data Type |
Partition key: file_path |
Value same as partition key of the main table | Type same as partition key of the main table |
Attribute 1: number_of_shards |
Number of shards | Number |
Attribute 2: last_updated |
Shard number last updated | Number |
Attribute 3: shard_history |
Shard history | StringSet |
While the main DynamoDB table uses a composite primary key, the partition key metadata table uses only the partition key from the main table as its primary key. This also translates to a smaller metadata table compared to the main table, because each item in the partition key metadata table represents a collection in the main table.
The following table describes a sample item in the partition key metadata table, using the audit log example from the previous section.
file_path | number_of_shards | last_updated | shard_history |
/shared/firetvGen2.txt | 2 | 1562858912 | { “1561758912:1”, “1562858912:2” } |
Here the partition key /shared/firetvGen2.txt
has two shards, as noted in the number_of_shards
attribute.
The last_updated
attribute holds the timestamp (represented as epoch time) of when the number of shards last updated. last_updated
helps enforce a cooldown period for updating the number of shards for a partition key. Before increasing the number of shards, writers need to check when the number of shards last updated. If time elapsed since the last update (calculated as a difference between the current time and the time from the metadata table) is less than the cooldown period (pre-determined), they retry the write operation on the main table, instead of updating the metadata table, because the number of shards recently updated.
The shard_history
attribute holds the timestamp and shard count for a reasonable number of shard updates. The data type of this attribute is a DynamoDB StringSet to hold a set of strings. The value of an entry would look similar to { “1561758912:1
”, “1562858912:2
” }, with each string in the set representing a value of epoch time and shard count separated by a colon.
The benefits of holding the shard history are twofold. First, you can make your logic to add shards more advanced by querying for the recent history of shards to see if you’re ramping up too fast or too slow. Second, you may want to decrease the number of shards in the future, especially if the throughput demand for the partition key reduces over time, and you need to optimize the cost of reads.
Writer behavior
Before writing to the main table, writers first query the partition key metadata table to retrieve the number of shards for the partition key (number_of_shards
attribute). If the partition key metadata table does not return an item, indicating the writer is attempting to insert items in the main table with a new partition key, the writer inserts a new item in the partition key metadata table with number_of_shards
as 1 and other attributes as per the schema described in the previous table.
If an item is present in the partition key metadata table, the writer determines the number of shards, randomly selects a shard to write from the shard space (for example, shard space is 1–5 for five shards), and appends the shard number to the partition key before writing.
If the writer experiences a partition capacity error while writing to the main table, the writer updates the item in the metadata table by incrementing the number of shards (number_of_shards
), inserting the current timestamp (last_updated
), and updating shard_history
, as described in the previous section. It then attempts to write the item in the main table by appending the new shard number to the partition key. This subsequently spreads all new writes by writers across the larger sharded space for the partition key. For more information about partition capacity errors, see the following section “Detecting a partition vs. table capacity constraint”
For provisioned capacity tables, if the error is related to the table capacity, you can use the UpdateTable
operation to increase the provisioned capacity of the table (as a response to the throttle). For more information, see UpdateTable in the DynamoDB Developer Guide. If configured, DynamoDB auto scaling updates the write capacity of the table automatically. For more information, see Managing Throughput Capacity Automatically with DynamoDB Auto Scaling.
Reader behavior
Any queries by a reader to the main DynamoDB table need to query all shards for a given partition key. The reader first retrieves the number of shards from the partition key metadata table (number_of_shards
) for the partition key, and directs queries to the main table for each of the shards.
One or more capacity errors occurring in the read operations on the main table may be related to constrained table or partition read capacity. For a provisioned capacity table, if the table capacity is constrained, you can update read capacity units on the table using the UpdateTable
operation as a response to the throttle. If enabled, DynamoDB auto scaling updates the read capacity of the table automatically.
However, if one or more partition capacities are constrained and the table has enough read capacity, increasing the table read capacity is effective only up to the partition maximum read capacity. In this case, using global secondary index (GSI) tables and caching are effective strategies to scale the reads on the main table. For more information, see Global Secondary Indexes.
To scale read throughput, you can use GSIs with the same schema as the main table effectively. You can multiply the available read throughput by creating one or more GSIs and balancing the read operations across the main table and the GSIs. Additionally, consider exploring the use of Amazon DynamoDB Accelerator, Amazon Elasticache, or a first-tier local cache to hold a copy of the partition key metadata table.
Detecting a partition vs. table capacity constraint
A partition capacity error occurs when the capacity requests on a partition exceed either its provisioned capacity or the partition’s maximum capacity. When an error occurs, a leading indicator for a partition capacity error is that the Amazon CloudWatch metrics for consumed capacity on the table (ConsumedReadCapacityUnits
or ConsumedWriteCapacityUnits
) is lower than the provisioned capacity of the table (ProvisionedReadCapacityUnits
or ProvisionedWriteCapacityUnits
). An application experiencing a write or read error can compare these metrics by querying CloudWatch. If there is sufficient table capacity, there is likely a hot partition driving requests against one or more partition keys mapped to the same partition. For more information, see DynamoDB Metrics and Dimensions.
Scaling reads for the partition key metadata table
Introducing a separate partition key metadata table into the request flow adds an additional dependency to the application that you should consider. In particular, both the readers and writers to the main table need to query the partition key metadata table to determine the number of shards for a partition key. Therefore, it is important to consider scaling the reads on the partition key metadata table.
The partition key metadata table should ideally use on-demand capacity, so you don’t need to manage capacity on your own and also to lower operational burden. Additionally, you should consider the techniques discussed previously for scaling the read capacity of the main table, including the use of GSIs and caching mechanisms.
Updating the partition key metadata table
If you have a contentious system with multiple writers who may attempt to update the partition key metadata table, give additional consideration to implementing the cooldown logic. To prevent a situation in which multiple writers attempt to update the metadata table immediately after the cooldown period has expired, you could implement a random back-off, before the writers attempt to update the metadata table.
For example, each writer picks a random number between 1 and 10 seconds and counts down to 0 following the expiration of the cooldown timer, before attempting to update the metadata table. This lowers the frequency of attempted shard count updates.
Additionally, DynamoDB provides a built-in mechanism for optimistic locking to prevent multiple writers from updating the same item simultaneously. Before updating the number of shards for a partition key, the writer queries the metadata table with the partition key and notes the last_updated
value. To increase the number of shards, the writer should use a conditional UpdateItem
API call. For more information, see Conditional Updates in the DynamoDB Developer Guide.
The condition checks for the last_updated
attribute for equivalence against the value of the timestamp noted previously. This makes sure multiple writers don’t update the item in the metadata table inconsistently.
For example, for updating the item described in the example item table, including updating the last_updated
and shard_history
attributes, writers could use the UpdateItem
API with a conditional expression that checks the equivalence of last_updated
to the timestamp value of 1562858912
.
Conclusion
This post described a sharding solution and design considerations to optimize the write capacity of a DynamoDB table for applications with non-deterministic or disproportionate write distribution across their keyspace. The dynamic sharding mechanism also enables write throughput for partition keys to grow over time and scale to virtually any limit, while making sure to distribute the items across the DynamoDB partitions.
Like any distributed system designed for scale, the parameters you choose for your dynamic sharding implementation vary based on your application access patterns. With that in mind, for the benefit of the larger DynamoDB community, please share your learnings from implementations in the comments section.
About the Authors
Anuj Dewangan is a Sr. Solution Architect with Amazon Web Services. Anuj helps some of AWS’s largest strategic customers architect, deploy and operate distributed systems and applications at web scale on AWS.
Sean Shriver is a Sr. NoSQL Solutions Architect with Amazon Web Services. Sean helps high profile and strategic customers with migrations, design reviews, AWS SDK optimizations and Proof-of-Concept testing for Amazon DynamoDB.