AWS Big Data Blog
Optimize write throughput for Amazon Kinesis Data Streams
Amazon Kinesis Data Streams is used by many customers to capture, process, and store data streams at any scale. This level of unparalleled scale is enabled by dividing each data stream into multiple shards. Each shard in a stream has a 1 Mbps or 1,000 records per second write throughput limit. Whether your data streaming application is collecting clickstream data from a web application or recording telemetry data from billions of Internet of Things (IoT) devices, streaming applications are highly susceptible to a varying amount of data ingestion. Sometimes such a large and unexpected volume of data could be the thing we least expect. For instance, consider application logic with a retry mechanism when writing records to a Kinesis data stream. In case of a network failure, it’s common to buffer data locally and write them when connectivity is restored. Depending on the rate that data is buffered and the duration of connectivity issue, the local buffer can accumulate enough data that could saturate the available write throughput quota of a Kinesis data stream.
When an application attempts to write more data than what is allowed, it will receive write throughput exceeded errors. In some instances, not being able to address these errors in a timely manner can result in data loss, unhappy customers, and other undesirable outcomes. In this post, we explore the typical reasons behind write throughput exceeded errors, along with methods to identify them. We then guide you on swift responses to these events and provide several solutions for mitigation. Lastly, we delve into how on-demand capacity mode can be valuable in addressing these errors.
Why do we get write throughput exceeded errors?
Write throughput exceeded errors are generally caused by three different scenarios:
- The simplest is the case where the producer application is generating more data than the throughput available in the Kinesis data stream (the sum of all shards).
- Next, we have the case where data distribution is not even across all shards, known as hot shard issue.
- Write throughout errors can also be caused by an application choosing a partition key to write records at a rate exceeding the throughput offered by a single shard. This situation is somewhat similar to hot shard issue, but as we see later in this post, unlike a hot shard issue, you can’t solve this problem by adding more shards to the data stream. This behavior is commonly known as a hot key issue.
Before we discuss how to diagnose these issues, let’s look at how Kinesis data streams organize data and its relationship to write throughput exceeded errors.
A Kinesis data stream has one or more shards to store data. Each shard is assigned a key range in 128-bit integer space. If you view the details of a data stream using the describe-stream operation in the AWS Command Line Interface (AWS CLI), you can actually see this key range assignment:
When a producer application invokes the PutRecord or PutRecords API, the service calculates a MD5 hash for the PartitionKey specified in the record. The resulting hash is used to determine which shard to store that record. You can take more control over this process by setting the ExplicitHashKey property in the PutRecord request to a hash key that falls within a specific shard’s key range. For instance, setting ExplicitHashKey to 0 will guarantee that record is written to shard ID shardId-0 in the stream described in the preceding code snippet.
How partition keys are distributed across available shards plays a vital role in maximizing the available throughput in a Kinesis data stream. When the partition key being used is repeated frequently in a way that some keys are more frequent than the others, shards storing those records will be utilized more. We also get the same net effect if we use ExplicitHashKey and our logic for choosing the hash key is biased towards a subset of shards.
Imagine you have a fleet of web servers logging performance metrics for each web request served into a Kinesis data stream with two shards and you used a request URL as the partition key. Each time a request is served, the application makes a call to the PutRecord API carrying a 10-bytes record. Let’s say that you have a total of 10 URLs and each receives 10 requests per second. Under these circumstances, total throughput required for the workload is 1,000 bytes per second and 100 requests per second. If we assume perfect distribution of 10 URLs across the two shards, each shard will receive 500 bytes per second and 50 requests per second.
Now imagine one of these URLs went viral and it started receiving 1,000 requests per second. Although the situation is positive from a business point of view, you’re now on the brink of making users unhappy. After the page gained popularity, you’re now counting 1,040 requests per second for the shard storing the popular URL (1000 + 10 * 4). At this point, you’ll receive write throughput exceeded errors from that shard. You’re throttled based on the requests per second quota because even with increased requests, you’re still generating approximately 11 KB of data.
You can solve this problem either by using a UUID for each request as the partition key so that you share the total load across both shards, or by adding more shards to the Kinesis data stream. The method you choose depends on how you want to consume data. Changing the partition key to a UUID would be problematic if you want performance metrics from a given URL to be always processed by the same consumer instance or if you want to maintain the order of records on a per-URL basis.
Knowing the exact cause of write throughout exceeded errors is an important step in remediating them. In the next sections, we discuss how to identify the root cause and remediate this problem.
Identifying the cause of write throughput exceeded errors
The first step in solving a problem is that knowing that it exists. You can use the WriteProvisionedThrougputExceeded metric in Amazon CloudWatch in this case. You can correlate the spikes in the WriteProvisionedThrougputExceeded metric to the IncomingBytes and IncomingRecords metrics to identify whether an application is getting throttled due to the size of data or the number of records written.
Let’s look at a few tests we performed in a stream with two shards to illustrate various scenarios. In this instance, with two shards in our stream, total throughput available to our producer application is either 2 Mbps or 2,000 records per second.
In the first test, we ran a producer to write batches of 30 records, each being 100 KB, using the PutRecords API. As you can see in the graph on the left of the following figure, our WriteProvisionedThroughputExceedded errors count went up. The graph on the right shows that we are reaching the 2 Mbps limit, but our incoming records rate is much lower than the 2,000 records per second limit (Kinesis metrics are published at 1-minute intervals, hence 125.8 and 120,000 as upper limits).
The following figures show how the same three metrics changed when we changed the producer to write batches of 500 records, each being 50 bytes, in the second test. This time, we exceeded the 2,000 records per second throughput limit, but our incoming bytes rate is well under the limit.
Now that we know that problem exists, we should look for clues to see if we’re exceeding the overall throughput available in the stream or if we’re having a hot shard issue due to an imbalanced partition key distribution as discussed earlier. One approach to this is to use enhanced shard-level metrics. Prior to our tests, we enabled enhanced shard-level metrics, and we can see in the following figure that both shards equally reached their quota in our first test.
We have seen Kinesis data streams containing thousands of shards harnessing the power of infinite scale in Kinesis data streams. However, plotting enhanced shard-level metrics on a such large stream may not provide an easy to way to find out which shards are over-utilized. In that instance, it’s better to use CloudWatch Metrics Insights to run queries to view top-n items, as shown in the following code (adjust the LIMIT 5 clause accordingly):
Enhanced shard-level metrics are not enabled by default. If you didn’t enable them and you want to perform root cause analysis after an incident, this option isn’t very helpful. In addition, you can only query the most recent 3 hours of data. Enhanced shard-level metrics also incur additional costs for CloudWatch metrics and it may be cost prohibitive to have it always on in data streams with a lot of shards.
One interesting scenario is when the workload is bursty, which can make the resulting CloudWatch metrics graphs rather baffling. This is because Kinesis publishes CloudWatch metric data aggregated at 1-minute intervals. Consequently, although you can see write throughput exceeded errors, your incoming bytes/records graphs may be still within the limits. To illustrate this scenario, we changed our test to create a burst of writes exceeding the limits and then sleep for a few seconds. Then we repeated this cycle for several minutes to yield the graphs in the following figure, which show write throughput exceeded errors on the left, but the IncomingBytes and IncomingRecords graphs on the right seem fine.
To enhance the process of identifying write throughput exceeded errors, we developed a CLI tool called Kinesis Hot Shard Advisor (KHS). With KHS, you can view shard utilization when shard-level metrics are not enabled. This is particularly useful for investigating an issue retrospectively. It can also show most frequently written keys to a particular shard. KHS reports shard utilization by reading records and aggregating them per second intervals based on the ApproximateArrivalTimestamp in the record. Because of this, you can also understand shard utilization drivers during bursty write workloads.
By running the following command, we can get KHS to inspect the data that arrived in 1 minute during our first test and generate a report:
For the given time window, the summary section in the generated report shows the maximum bytes per second rate observed, total bytes ingested, maximum records per second observed, and the total number of records ingested for each shard.
Choosing a shard ID in the first column will display a graph of incoming bytes and records for that shard. This is similar to the graph you get in CloudWatch metrics, except the KHS graph reports on a per-second basis. For instance, in the following figure, we can see how the producer was going through a series of bursty writes followed by a throttling event during our test case.
Running the same command with the -aggregate-key option enables partition key distribution analysis. It generates an additional graph for each shard showing the key distribution, as shown in the following figure. For our test scenario, we can only see each key being used one time because we used a new UUID for each record.
Because KHS reports based on data stored in streams, it creates an enhanced fan-out consumer at startup to prevent using the read throughput quota available for other consumers. When the analysis is complete, it deletes that enhanced fan-out consumer.
Due its nature of reading data streams, KHS can transfer a lot of data during analysis. For instance, assume you have a stream with 100 shards. If all of them are fully utilized during a minute window specified using -from and -to arguments, the host running KHS will receive at least 1 MB * 100 * 60 = 6000 MB = approximately 6 GB data. To avoid this kind of excessive data transfer and speed up the analysis process, we recommend first using the WriteProvisionedThroughoutExceeded CloudWatch metric to identify a time period when you experienced throttling and use a small window (such as 10 seconds) with KHS. You can also run KHS in an Amazon Elastic Compute Cloud (Amazon EC2) instance in the same AWS Region as your Kinesis data stream to minimize network latency during reads.
KHS is designed to run in a single machine to diagnose large-scale workloads. Using a naive in-memory-based counting algorithm (such as a hash map storing the partition key and count) for partition key distribution analysis could easily exhaust the available memory in the host system. Therefore, we use a probabilistic data structure called count-min-sketch to estimate the number of times a key has been used. As a result, the number you see in the report should be taken as an approximate value rather than an absolute value. After all, with this report, we just want to find out if there’s an imbalance in the keys written to a shard.
Now that we understand what causes hot shards and how to identify them, let’s look at how to deal with this in producer applications and remediation steps.
Remediation steps
Having producers retry writes is a step towards making our producers resilient to write throughput exceeded errors. Consider our earlier sample application logging performance metrics data for each web request served by a fleet of web servers. When implementing this retry mechanism, you should remember that records that are not written to the Kinesis stream are going to be in host system’s memory. The first issue with this is, if the host crashes before the records could be written, you’ll experience data loss. Scenarios such as tracking web request performance data might be more forgiving for this type of data loss than scenarios like financial transactions. You should evaluate durability guarantees required for your application and employ techniques to achieve them.
The second issue is that records waiting to be written to the Kinesis data stream are going to consume the host system’s memory. When you start getting throttled and have some retry logic in place, you should notice that your memory utilization is going up. A retry mechanism should have a way to avoid exhausting the host system’s memory.
With the appropriate retry logic in place, if you receive write throughput exceeded errors, you can use the methods we discussed earlier to identify the cause. After you identify the root cause, you can choose the appropriate remediation step:
- If the producer application is exceeding the overall stream’s throughput, you can add more shards to the stream to increase its write throughput capacity. When adding shards, the Kinesis data stream makes the new shards available incrementally, minimizing the time that producers experience write throughput exceeded errors. To add shards to a stream, you can use the Kinesis console, the update-shard-count operation in the AWS CLI, the UpdateShardCount API through the AWS SDK, or the ShardCount property in the AWS CloudFormation template used to create the stream.
- If the producer application is exceeding the throughput limit of some shards (hot shard issue), pick one of the following options based on consumer requirements:
- If locality of data is required (records with the same partition key are always processed by the same consumer) or an order based on partition key is required, use the split-shard operation in the AWS CLI or the SplitShard API in the AWS SDK to split those shards.
- If locality or order based on the current partition key is not required, change the partition key scheme to increase its distribution.
- If the producer application is exceeding the throughput limit of a shard due to a single partition key (hot key issue), change the partition key scheme to increase its distribution.
Kinesis Data Streams also has an on-demand capacity mode. In on-demand capacity mode, Kinesis Data Streams automatically scales streams when needed. Additionally, you can switch between on-demand and provisioned capacity modes without causing an outage. This could be particularly useful when you’re experiencing write throughput exceeded errors but require immediate reaction to keep your application available to your users. In such instances, you can switch a provisioned capacity mode data stream to an on-demand data stream and let Kinesis Data Streams handle the required scale appropriately. You can then perform root cause analysis in the background and take corrective actions. Finally, if necessary, you can change the capacity mode back to provisioned.
Conclusion
You should now have a solid understanding of the common causes of write throughput exceeded errors in Kinesis data streams, how to diagnose them, and what actions to take to appropriately deal with them. We hope that this post will help you make your Kinesis Data Streams applications more robust. If you are just starting with Kinesis Data Streams, we recommend referring to the Developer Guide.
If you have any questions or feedback, please leave them in the comments section.
About the Authors
Buddhike de Silva is a Senior Specialist Solutions Architect at Amazon Web Services. Buddhike helps customers run large scale streaming analytics workloads on AWS and make the best out of their cloud journey.
Nihar Sheth is a Senior Product Manager at Amazon Web Services. He is passionate about developing intuitive product experiences that solve complex customer problems and enable customers to achieve their business goals.