AWS Database Blog

Implement serverless FIFO queues with filtering capabilities using Amazon DynamoDB transactions

Message queues allow different parts of a system to communicate and process operations asynchronously. Amazon Simple Queue Service (Amazon SQS) is a fully managed message queueing service that offers two types of message queues: standard queues and first-in-first-out (FIFO) queues. For some applications, such as a call center application, you need message filtering along with FIFO ordering of messages. This additional filtering capability can be implemented using a combination of Amazon Simple Notification Service (Amazon SNS) FIFO topics and Amazon SQS FIFO queues. However, this option can result in proliferation of topics, queues, and additional overhead to implement transactions across these different queues.

In this post, we walk you through a use case and an Amazon DynamoDB transactions-based queueing implementation that demonstrates both message filtering and ordered message processing capabilities. DynamoDB is a fully managed, serverless database that supports key-value and document data models. We chose a DynamoDB based implementation for three reasons. First, DynamoDB is serverless; second, DynamoDB has an on-demand pricing model, which offers simple pay-per-request pricing so users pay for actual usage instead of provisioning capacity in advance; and third, DynamoDB is highly available in a single AWS region and has a Global tables option for multi-Region fault tolerance.

Solution overview

We use a call center use case to demonstrate a message queue processing solution. A call center queuing system must match customer support requests in the order they’re received to the best available agents based on rules such as the requestor’s preferred language and the type of request. For our use case, we apply a rule that the language and gender attributes of the support agent align with preferences in the caller’s profile, as shown in Figure 1 that follows. We implement an agent queue that provides filtering capabilities so that a caller requesting a Spanish-speaking female agent is matched with an agent with a corresponding profile and who has been added the earliest (FIFO) to the queue.

SQS based implementation

Figure 1: Agent’s language and gender attributes match the preferences from the caller’s profile

An Amazon SQS FIFO queue is ideal for the call queue. However, SQS doesn’t offer filtering capabilities for the agent queue. Figure 2 that follows shows an approach that uses DynamoDB for the agent queue. DynamoDB supports two different kinds of primary keys – A simple primary key, composed of one attribute known as the partition key; A composite primary key composed of two attributes. In a composite primary key, the first attribute is the partition key, and the second attribute is the sort key. The agent queue table in this implementation uses a composite primary key as it gives additional flexibility when querying data, such as the ability to get results sorted by the sort key value for the same partition key.

The partition key for the agent queue table is a bitmask with each bit in the mask representing some agent capability, such as Spanish language ability. The sort key is the agent availability timestamp. These two attributes together ensure uniqueness of the item. When a new call enters the queue, the matcher constructs the bitmask and asks for the first record matching the bitmask. Because DynamoDB sorts a partition based on the sort key, the query returns the first available agent.

Figure 2: Agent queue implementation using DynamoDB

When we use the DynamoDB approach for the agent queue, we make two important assumptions:

  • First, we assume that there might be multiple agent matching requests running in parallel. To prevent a second thread from reading an item that another thread is working on, we use a combination of DynamoDB transactions and optimistic locking to enforce the equivalent of item visibility.
  • Second, we require that the process that inserts agent availability records in the table makes duplicate entries for agents with multiple possible combinations of skills, such as agents who speak both Spanish and English.

DynamoDB table design

We use a single-table design for this implementation of the agent queue. The table represents three types of items: agents, queue information, and agent queue.

The following example illustrates the agent table design.

Primary key Attributes
Partition key: pk Sort key: sk AgentName AgentID Gender Languages AgentStatus
AllAgents Agent#Andrew Andrew 929bb98b85fe M [“English”,”French”] attendingCall
Agent#Billy Billy 836c66055b77 M [“English”,”French”,”Spanish”] queued
Agent#Christine Christine bd14b59ac8e4 F [“Spanish”] queued
Agent#Courtney Courtney 43156c44832a F [“English”,”Spanish”] queued
Agent#Ellen Ellen 60f6a95b747c F [“English”,”French”,”Spanish”] queued
Agent#Remy Remy 56c4c66 bb98 T [“English”] queued

Each agent item represents an agent, their gender, a string set of languages that they speak, and their current status (available, queued, or attendingCall). For the purpose of explaining the solution, the AgentName attribute is used as the sort key instead of AgentID, which guarantees uniqueness.

The following table illustrates our design for queue information.

Primary key Attributes
Partition key: pk Sort key: sk QueueVersionId QueueDepth
AllQueues Q#English#F 12 2
Q#English#M 14 1
Q#English#T 13 1
Q#French#F 44 3
Q#French#M 35 1
Q#Spanish#F 46 1
Q#Spanish#M 15 2

The queue information items represent the current depth and versionID of each of the gender and language combinations queues. The QueueVersionID of the queue is used to implement optimistic locking. The QueueDepth attribute isn’t required, but is included to demonstrate the overall solution.

The following example shows the agent queue table design.

Primary key Attributes
Partition key: pk Sort key: sk AgentName AgentID Gender Languages
Q#English#F 2022/01/05-14:01:37.370825 Courtney 43156c44832a F [“English”,”Spanish”]
Q#English#F 2022/01/05-14:01:39.416341 Ellen 60f6a95b747c F [“English”,”French”,”Spanish”]
Q#English#T 2022/01/05-14:01:19.416341 Remy 56c4c66 bb98 T [“English”]
Q#English#M 2022/01/05-14:01:29.257873 Billy 836c66055b77 M [“English”,”French”,”Spanish”]
Q#French#F 2022/01/05-14:01:39.416341 Ellen 60f6a95b747c F [“English”,”French”,”Spanish”]
Q#French#M 2022/01/05-14:01:29.257873 Billy 836c66055b77 M [“English”,”French”,”Spanish”]
Q#Spanish#F 2022/01/05-14:01:34.330782 Christine bd14b59ac8e4 F [“Spanish”]
Q#Spanish#F 2022/01/05-14:01:37.370825 Courtney 43156c44832a F [“English”,”Spanish”]
Q#Spanish#F 2022/01/05-14:01:39.416341 Ellen 60f6a95b747c F [“English”,”French”,”Spanish”]
Q#Spanish#M 2022/01/05-14:01:29.257873 Billy 836c66055b77 M [“English”,”French”,”Spanish”]

The agent queue items represent messages in the queues. A female agent who speaks English and Spanish will have two Message items in the table. The partition key for one Message item is Q#English#F. The partition key for the other Message item is Q#Spanish#F. Both items have the same sort key, which is the timestamp of when this agent was added to the agent queue.

The AgentName, Gender, and Languages attributes in these Message items are used when dequeuing messages. Dequeuing a message for an agent requires the following corresponding changes to other items in the table:

  • Other Message items for the same agent must be deleted.
  • Queue property items must be updated to reflect changes in QueueVersionId and QueueDepth attributes.
  • Agent items must be updated to reflect changes to AgentStatus.

We walk through these details in subsequent sections.

In this example, we use Python scripts that use the AWS SDK to create the table and populate the various item types. The DynamoDB table and item types represent the agent queue, which provides FIFO and filtering capabilities. We then query this agent queue for a specific criterion to demonstrate the solution in action.

Prerequisites

For this solution, you must first complete the following prerequisites:

  1. Have Python version 3.7 and higher.
  2. Configure Amazon Web Services (AWS) credentials and AWS Region.
  3. Make sure the AWS Identity and Access Management (IAM) principal has access to perform operations on the DynamoDB table. A sample identity policy is provided in the code repository.
  4. Clone the GitHub repository:
    git clone git@github.com:aws-samples/aws-dynamodb-fifo.git
  5. Change the directory:
    cd aws-dynamodb-fifo
  6. Install dependent packages:
    pip install -r requirements.txt

Now you’re ready to set up the agent queue functionality, which you do using three different scripts.

Create a table

The first script creates a DynamoDB table with a composite primary key and on-demand capacity mode.

Run the following command:

python createTable.py

Figure 3 that follows shows the output verifying that the table has been created.

Creating AgentQueueFIFO table
Figure 3: DynamoDB table AgentQueueFIFO created

Initialize the table

The second script initializes the table with:

  • A pool of agents – The script uses the Faker package to generate random agents. Each agent has a random name, AgentID, gender, and a list of up to three languages that they speak. The script creates five agents by default.
  • Queue properties – Six queue property items are created. Each item represents a combination of language and gender.

Run the following command:

python initializeAgentPoolAndQueueMetaData.py

Figure 4 that follows shows five agent items and six queue properties listed on the DynamoDB console.

Agents and their properties
Figure 4: Agents and their properties

Assign available agents to the queue

For each agent with an AgentStatus attribute value of available, the script creates new items and updates existing items.

Run the third script with the following command:

python addAvailAgentsToQueue.py

Figure 5 that follows shows the steps involved for each available agent that is about to be queued.

Steps for queuing each available agent

Figure 5: Steps for each agent

The script completes the following steps:

  1. Creates a message item type for each language spoken by the agent.

For instance, agent Alicia speaks two languages, English and Spanish. Two new items are added to the table, as shown in Figure 6 that follows. The sort key for both these items is the timestamp when the agent is added to the availability queue.
Two items for agent Alicia, who speaks English and Spanish
Figure 6: Two items for agent Alicia, who speaks both English and Spanish

  1. Updates the Agent item’s AgentStatus attribute value to queued.

Agent status updated to queued
Figure 7: AgentStatus updated to queued

  1. Increments QueueDepth and QueueVersionId of the corresponding queue metadata item.

QueueDepth and QueueVersionId updated
Figure 8: QueueDepth and QueueVersionId updated

All three actions are part of a DynamoDB TransactWriteItems operation. This ensures that all actions succeed together or fail together for the single logical business operation of adding available agents to queues.

The script loops through all agent items that have an AgentStatus attribute value of available and creates the other corresponding items. Figure 9 that follows is a view of the DynamoDB table with all five agents queued.

DynamoDB table showing all agents queued
Figure 9: DynamoDB table showing all agents queued

The solution is now ready.

Filter messages with FIFO ordering

To demonstrate the filtering capability along with FIFO capability, let’s assume a caller wants to speak to an agent who speaks Spanish and is male. The solution should return the first message that matches those criteria.

For the item with a composite key of AllQueues+Q#Spanish#M, there are four agents that match that criteria, as shown in figure 10 that follows.

Four agents that match the composite key
Figure 10: Four agents that match the composite key AllQueues+Q#Spanish#M

The use case requires that the caller be matched to the agent that matches the criteria (Spanish-speaking male) and is the first that was added to the queue (FIFO ordering). This requirement is met by the item for agent Chad, because the item matches the filter criteria (Spanish-speaking male) and is the first one added to the queue based on the timestamp in the sort key. This is shown in Figure 11 that follows.

Agent Chad matches all criteria and was first added to the queue
Figure 11: Agent Chad matches all criteria and was first added to the queue

When agent Chad is assigned to the caller, Chad must be dequeued so that no other caller is assigned the same agent. For instance, if a second caller is looking for a French-speaking male at the same time, that criteria also matches Chad.

This race condition is possible when requests for agents originate from multiple caller threads in parallel. We solve this problem by enforcing locking using DynamoDB transactions and optimistic locking using the QueueVersionId.

Let’s examine how this is achieved using a sample caller-matching flow. A caller is requesting a Spanish-speaking male agent (Q#Spanish#M). The flowchart in Figure 12 that follows shows the steps that take place.

Agent matching request flowchart
Figure 12: Agent matching request flowchart

The steps are as follows:

  1. A DynamoDB TransactGetItems operation captures a snapshot of the current metadata for all queues. This transaction operation ensures no other conflicting updates are in process while the items are being read. The QueueVersionID retrieved in this step is used to implement optimistic locking in subsequent steps.
  2. A query operation using strongly consistent reads retrieves a matching agent. Strongly consistent reads ensure that the query returns the most up-to-date data, reflecting the updates from all prior write operations. To find an agent who speaks Spanish and is male, we query using a partition key of Q#Spanish#M. The returned results are always sorted by the sort key value, which in this case is the timestamp of when an agent’s status is set as queued.
  3. The first item in the query result set is the agent that should be assigned to the caller, as shown in Figure 13 that follows.

First item that matches the query

Figure 13: First item that matches the query

To complete the step of identifying and assigning an agent, we use the Gender, Languages, sk (agent availability timestamp), and AgentName attribute values from this item to perform deletes and updates on other corresponding items in the table.

    1. Delete all queue items for the matched agent, as shown in Figure 14 that follows. For agent Chad, delete three items (one item for each language). The partition key for each of these three items that needs to be deleted is calculated using the languages attribute value {‘English’, ‘French’, ‘Spanish’} and the Gender attribute. The sort key for the delete operation is derived from the sk value.Items marked for removal
      Figure 14 – Items marked for removal
    2. Update the queue information items as shown in Figure 15 that follows. For agent Chad, the information must be updated for three queues: Q#English#M, Q#French#M, and Q#Spanish#M. The QueueVersionId is incremented by 1, and the QueueDepth is decremented by 1. ConditionExpression is used to validate that the current QueueVersionId of the item is identical to the corresponding QueueVersionId captured in the snapshot taken before the agent matching process began (Step 1). This optimistic locking strategy along with transaction operations ensures all race conditions are addressed.Update queue information
      Figure 15: Update queue information
    3. Update the AgentStatus for the AgentItem from queued to attendingCall, as shown in Figure 16 that follows.

Update agent status

Figure 16: Update agent status

To see this in action, run the following command:

python assignAgentToCallRequest.py Q#Spanish#M

The script finds a matching agent and performs the deletes and updates discussed in the previous section. Figure 17 that follows shows the changes to the items after running the script:

  • The number of items in the table has changed from 23 to 20. This reflects the three AgentQueue items that were deleted.
  • The queue metadata for Q#English#M, Q#French#M, and Q#Spanish#M have been updated. The QueueDepth has decreased by 1, and the queueVersionId has been incremented by 1.
  • The AgentStatus of the agent has been updated to attendingCall.

Changes following deletes and updates
Figure 17: Updates following deletes and updates

Clean up

To delete the DynamoDB table, run the following command:

python deleteTable.py

Considerations

You can extend this solution for last-in-first-out (LIFO) queues with filtering capabilities by setting the ScanIndexForward parameter to false when performing the query operation. This sorts the results of the query operation in descending order instead of the default ascending order behavior.

The solution in this post uses a single-table design, but because DynamoDB transactions operations can span multiple tables within the same AWS account and Region, you can create a similar solution using multiple tables.

The current limit of the number of unique items that can be part of a TransactWriteItems or TransactGetItems operations is 100. With the three language (n) and three gender (m) combination demonstrated in this post, the write transactions can have up to 10 items (n*m +1). For other use cases, you must consider the number of attributes and the possible values for each of the attributes to ensure the combination doesn’t exceed operations on more than 100 items.

This solution uses DynamoDB transactional read and write requests and strongly consistent read requests. Transactional read and write requests are priced at twice the standard read and write requests. Strongly consistent read requests are priced at twice the eventually consistent read requests. These additional costs should be considered when using this solution at scale.

Summary

In this post, we showed you an example of how to implement serverless FIFO queues with filtering capabilities using DynamoDB. You also learned that the standard characteristics of a queueing system such as message ordering, visibility, enqueuing, and dequeuing are met when multiple clients are interacting with this queue implementation. You can use the code samples in this post as a starting point for implementation of your use case.

Try this solution and share your feedback in the comments section.


About the Authors

Nikhil Penmetsa is a Senior Solutions Architect at Amazon Web Services. He helps organizations understand best practices around advanced cloud-based solutions. He is passionate about diving deep with customers to create solutions that are cost effective, secure and performant.

Randy DeFauw is a Senior Principal Solutions Architect at AWS. He holds an MSEE from the University of Michigan, where he worked on computer vision for autonomous vehicles. He also holds an MBA from Colorado State University. Randy has held a variety of positions in the technology space, ranging from software engineering to product management. In entered the Big Data space in 2013 and continues to explore that area. He is actively working on projects in the ML space and has presented at numerous conferences including Strata and GlueCon.