O blog da AWS

CQRS na AWS: Sincronizando os Serviços de Command e Query Com o Padrão Transactional Outbox, a Técnica Transaction Log Tailing e o Amazon Database Migration Service

Esta é a quarta parte da série de blog posts que abordam as diferentes formas de se implementar CQRS com diferentes serviços da AWS. A primeira parte foi uma introdução ao assunto, bem como a análise de um caso de uso em que temos um Amazon Aurora for PostgreSQL como banco de dados do serviço de comandos e um Amazon Elasticache for Redis como banco de dados do serviço de consultas. Para trocar informações, um evento é colocado em uma fila do Amazon SQS, que é lida por um componente de computação (uma função AWS Lambda, no exemplo apresentado) que atualiza o Redis, com informações pré-computadas que estão prontas para serem recuperadas.

Na segunda parte, exploramos o mesmo caso de uso, mas começamos a usar o padrão Transactional Outbox. Lá, exploramos a técnica Polling Publisher, na qual pesquisamos de tempos em tempos a tabela outbox e publicamos os eventos que ainda não foram publicados. Para que isso funcione, depois de publicarmos novos eventos, limpamos a tabela outbox, para que o mesmo evento não seja publicado duas vezes (e mesmo assim, precisamos estar preparados para lidar com eventos publicados mais de uma vez de forma idempotente).

Na terceira parte, exploramos a técnica Transaction Log Tailing, que é outra forma de lidar com o padrão Transactional Outbox. Persistimos junto com o aggregate um evento que representa a situação que acabou de acontecer. A diferença é que não lemos mais eventos da tabela outbox para publicá-los. Em vez disso, teremos um componente que observará o log de transações da tabela outbox no banco de dados para publicar as alterações.

Esta quarta parte também explora a técnica Transaction Log Tailing, mas agora o componente que observará o log de transações da tabela outbox será o Amazon Database Migration Service (Amazon DMS) e publicará os eventos no Amazon Kinesis Data Streams. O caso de uso é praticamente o mesmo, com os mesmos bancos de dados e o mesmo domínio.

Introdução

A maioria das aplicações que criamos precisa de um banco de dados para armazenar dados. Armazenamos o estado de nossos objetos de domínio e os usamos para várias finalidades, como processar e gerar relatórios. Às vezes, nosso banco de dados pode não ser ideal para recuperação de dados, seja por sua natureza ou devido a um modelo de domínio complexo, por exemplo. Para esses casos, podemos usar o padrão arquitetural CQRS, que sugere que, para um determinado bounded context (em português, algo como “contexto delimitado”) do nosso domínio, podemos ter dois serviços, sendo um para receber comandos (ou seja, operações que mudam de estado) e outro para consultas (que só recuperam dados). Dessa forma, cada serviço pode ter o banco de dados que melhor se encaixe. O desafio é como manter os dois bancos de dados sincronizados, o que pode ser feito com eventos publicados a partir do serviço de comandos, a serem consumidos pelo serviço de consultas.

A publicação confiável de eventos relacionados a coisas que já aconteceram em uma aplicação pode ser um desafio. Se estamos usando ou não o padrão CQRS, se não tomarmos cuidado, podemos acabar publicando informações que ainda não foram persistidas ou não as publicar em nenhum momento, conforme discutimos na primeira parte desta série, e as fontes de dados podem ficar fora de sincronia. De fato, por estarmos publicando eventos a partir do serviço de comandos para serem consumidos pelo serviço de consultas, já há uma consistência eventual, mas se não tomarmos cuidado, as fontes de dados podem ficar dessincronizadas por mais tempo do que se espera. No contexto de um banco de dados relacional, o padrão Transactional Outbox nos permite publicar eventos de forma confiável. Quando aplicado, um evento é persistido na tabela outbox na mesma transação em que os dados do aggregate principal são persistidos, e publicado posteriormente em algum momento.

O padrão Transactional Outbox. Neste exemplo, um aggregate Pedido é composto pela classe Pedido, que por sua vez possui uma lista de ItemPedido. Ao efetuar um pedido, o aggregate é persistido juntamente com um registro que representa o evento de criação do pedido.

Figura 1. O padrão Transactional Outbox. Neste exemplo, um aggregate Pedido é composto pela classe Pedido, que por sua vez possui uma lista de ItemPedido. Ao efetuar um pedido, o aggregate é persistido juntamente com um registro que representa o evento de criação do pedido.

Há duas técnicas para se lidar com uma tabela outbox. Já exploramos uma maneira de implementá-la com a técnica Transaction Log Tailing, que é com um connector Kafka chamado Debezium Connector for PostgreSQL. Agora vamos explorar outra forma de implementá-la, com Amazon DMS e o Amazon Kinesis Data Streams. A chave dessa técnica é que ela observa o log de transações de uma tabela para recuperar o que já foi persistido e publicar as alterações. Também não exige necessariamente que a tabela outbox seja limpa, embora possa ser uma boa ideia economizar espaço de armazenamento.

Caso de uso: Amazon Aurora for PostgreSQL-Compatible Edition como banco de dados do serviço de comandos e Amazon Elasticache for Redis como banco de dados do serviço de consultas, usando o padrão Transactional Outbox e a técnica Transaction Log Tailing com o Amazon Database Migration Service

Assim como no primeiro e segundo casos que explorei nesta série de posts, temos um Aurora como banco de dados do serviço de comandos e um Redis como banco de dados do serviço de consultas. O Aurora conterá informações relacionadas a clientes, produtos e pedidos, e o Redis conterá informações pré-computadas relacionadas a clientes. Para testar a arquitetura, usaremos dois endpoints, sendo um para salvar pedidos e outro para recuperar informações relacionadas ao cliente.

Para implementar essa técnica, além do Amazon Aurora e do Amazon Elasticache for Redis, também precisaremos de um stream de dados do Amazon Kinesis Data Streams para receber os eventos publicados na tabela outbox. O serviço responsável por levar eventos do log de transações da tabela outbox para o stream de dados do Amazon Kinesis Data Streams é o Amazon DMS. Além de realizar migrações completas do banco de dados, ele também realiza replicações contínuas, também investigando o log de transações de uma tabela. E assim como foi mostrado na postagem anterior, também usaremos um canal Amazon EventBridge para passar dados do stream de dados do Amazon Kinesis Data Streams para um tópico do Amazon SNS, para que as alterações possam ser enviadas a vários consumidores.

Visão Geral da Solução

Nessa implementação, usaremos uma database migration task no Amazon DMS do tipo “Full load, ongoing replication”. Essa task será composta por uma replication instance e dois endpoints, sendo um para ler eventos do log de transações da tabela outbox e outro para enviar os eventos a um stream do Amazon Kinesis Data Streams. Toda a complexidade necessária para ler o log de transações da tabela outbox é abstraída pelo Amazon DMS.

É importante lembrar que, assim como fizemos com o Debezium Connector, para que a migration task do Amazon DMS funcione, no serviço do Aurora, precisamos criar um parameter group do tipo cluster, definir o parâmetro rds.logical_replication como 1 e atribuir o parameter group ao nosso banco de dados Aurora (se o cluster do banco de dados foi criado antes de atribuir o parameter group ao cluster, também precisamos reiniciar as instâncias do cluster). Para obter instruções sobre como trabalhar com replicação lógica no Aurora, consulte a documentação Usar a Replicação Lógica do PostgreSQL com o Aurora.

Assim como o Debezium Connector, nossa migration task do Amazon DMS também publicará inserções de novos eventos na tabela outbox em um stream do Amazon Kinesis Data Streams sempre que elas ocorrerem. Ele também captura alterações em nível de registro nas tabelas indicadas pelo usuário em bancos de dados relacionais assim que elas são persistidas. Quando ele se conecta ao banco de dados pela primeira vez, ele tira um snapshot do estado atual dessas tabelas e o publica em um stream do Amazon Kinesis Data Streams e, em seguida, continua monitorando essas tabelas e publicando alterações em nível de registro.

Em nosso exemplo, uma chamada POST será feita para o endpoint /orders, com as informações de um pedido a ser feito por um cliente. Essa chamada será validada por uma Lambda Authorizer (para fins de simplificação, usaremos basic authentication) e será processada posteriormente pelo OrderReceiverLambda, que desempenhará a função de nosso serviço de comandos. Essa função Lambda, na mesma transação, inserirá dados nas tabelas que persistem informações relacionadas ao aggregate do pedido e também na tabela que corresponde a nossa tabela outbox. O log de transações dessa tabela será monitorado pela migration task do Amazon DMS, que publicará os eventos em um stream do Amazon Kinesis Data Streams.

O stream do Amazon Kinesis Data Streams, que recebe as alterações feitas na tabela outbox, será a origem de um pipe do Amazon EventBridge. A segunda etapa do pipe será uma função Lambda, na fase de enrichment, cujo objetivo é limpar o evento entregue pelo Amazon Kinesis Data e também decodificar o valor do evento, que é codificado em Base64. Essa função Lambda também desempenha o papel de filtro. Essa filtragem é importante porque, como estamos acompanhando o log de transações da tabela outbox, estamos recebendo todos os eventos, incluindo exclusões, e no cenário que estamos explorando, estamos interessados apenas em inserções. Essas exclusões acontecem porque a função Lambda OrderEventCleanerLambda limpa a tabela outbox, o que gera eventos de exclusão.

É verdade que é possível definir um filtro em um pipe do EventBridge. O problema é que os dados do evento são codificados em Base64 e, para fazer a filtragem, precisamos decodificar os dados e verificar os metadados do evento para ver se o campo operation é igual a “insert”, e não podemos fazer isso no filtro de um pipe do EventBridge, então usamos a função Lambda da fase de enrichment para fazer isso. Para seguir com o processamento dos eventos na fase de enrichment, poderíamos criar uma lista, decodificar os eventos e incluir na lista somente os eventos cujo campo operation contido nos metadados de cada evento fosse igual a “insert”, e retornar a lista na função Lambda.

O evento é finalmente entregue a um tópico do Amazon SNS, que entrega o evento a duas filas Amazon SQS, sendo uma para ser pesquisada por uma função Lambda que envia um email de notificação e outra para ser lida por uma função Lambda que atualiza a chave do cliente no Redis (que, em nosso exemplo, é o banco de dados do nosso serviço de consultas). A função Lambda que envia email é somente um exemplo que ilustra que é possível tomar diferentes ações para cada evento.

A função Lambda que atualiza o Redis então publica um evento em outra fila opcional SQS, que é lida por uma função Lambda que exclui eventos da tabela outbox que já foram processados. O evento é publicado pela função Lambda que atualiza o Redis apenas para garantir que a função Lambda opcional que remove um evento já publicado do Aurora só receba um evento após uma atualização bem-sucedida do Redis. Remover os registros da tabela outbox após o processamento dos eventos é uma forma de mantê-la limpa. Outras formas incluem utilizar a extensão pg_partman, disponível para Amazon RDS PostgreSQL ou Amazon Aurora for PostgreSQL no caso do Postgres, ou trabalhar com arquivamento por partições no caso do MySQL.

Um segundo endpoint, /clients/{clientId}, recupera informações relacionadas ao cliente. A função Lambda que fornece essas informações representa nosso serviço de consultas e as recupera da chave no Redis que foi atualizada anteriormente. As informações retornadas contêm o nome do cliente, o e-mail e o valor total que já foi comprado pelo cliente.

Imagem demonstrando a arquitetura proposta, tendo o Aurora como banco de dados do serviço de comandos, e o Redis como banco de dados do serviço de consultas. A partir da inserção de um evento no Aurora, o Amazon Database Migration Service lê o transaction log da tabela outbox e publica os eventos em um stream do Amazon Kinesis Data Streams, que é lido por um pipe do EventBridge que efetua a publicação desses eventos em um tópico do SNS. O SNS então entrega os eventos em duas filas, sendo que uma delas é lida por uma Lambda que atualiza o Redis, com os últimos dados do cliente relacionado a cada evento.

Figura 2. Arquitetura proposta, tendo o Amazon Aurora PostgreSQL-Compatible Edition como banco de dados do serviço de comandos, e o Amazon Elasticache for Redis como banco de dados do serviço de consultas.

A solução é quase exatamente a mesma explorada na terceira parte desta série, exceto que estamos usando uma migration task do Amazon DMS em vez do Debezium Connector para ler o log de transações da tabela outbox e estamos usando um stream do Amazon Kinesis Data Streams para receber eventos em vez de um tópico do Amazon Managed Streaming for Apache Kafka.

Uma das vantagens dessa solução é que as alterações são retiradas do log de transações da tabela outbox para um stream do Amazon Kinesis Data Streams, e a tabela outbox pode ser limpa em algum momento por um componente opcional que pode executar computação (como uma função Lambda) ou combinando extensões do Postgres, como pg_partman, com outras soluções da AWS, como o Amazon S3. E o Amazon DMS fornece as ferramentas necessárias para ler o log de transações da tabela outbox de forma transparente, para que não precisemos instalar nada.

Outra vantagem é que, diferentemente do Kafka, o endpoint de destino que compõe a migration task no Amazon DMS pode enviar dados para vários destinos. Nesse aspecto, essa solução é mais flexível do que a que utiliza o Kafka, na qual precisamos instalar diferentes connectors e seus plugins correspondentes para fazer isso. Estamos enviando dados para um stream do Amazon Kinesis Data Streams, mas podemos enviar dados para um bucket do S3, diferentes bancos de dados NoSQL, etc.

Esse é um ponto a considerar ao se decidir entre um Kafka connector e o Amazon DMS para ler o log de transações da tabela outbox. Podemos usar o Debezium para transportar eventos da tabela outbox para um tópico do Kafka. Essa é a ideia de levar mudanças de diferentes lugares para um tópico do Kafka, e cada fonte de dados precisa de um connector e um plugin correspondente. Da mesma forma, para levar eventos do tópico do Kafka a um destino, é necessário um novo connector e seu plugin correspondente. Para isso, primeiro precisamos verificar se esses connectors estão disponíveis ou até mesmo desenvolver nossos próprios, o que aumenta a complexidade geral e o TCO da solução. Nesse sentido, o Amazon DMS é mais flexível, pois pode ler de várias fontes de dados e entregar alterações também para muitos destinos diferentes.

Outro ponto é que os eventos que foram persistidos na tabela outbox podem ser reproduzidos, assim como a solução apresentada na terceira parte desta série, que usa um tópico do Kafka. Se quisermos alimentar outro banco de dados ou reproduzir eventos para tentar simular um bug, todos os eventos podem ser reproduzidos novamente.

A desvantagem dessa arquitetura é que, embora seja mais confiável, ela adiciona mais componentes e, portanto, temos mais complexidade e é também um pouco mais cara do que as que foram exploradas anteriormente.

O Amazon DMS é mais simples de se utilizar do que o Debezium. O Amazon DMS permite configurar endpoints de diversas origens e diversos destinos de forma simples, ao passo que o Debezium requer um connector e um plugin correspondente para cada origem e destino. Além disso, quando se utiliza somente o Amazon DMS, utiliza-se um serviço gerenciado da AWS. Por outro lado, o Debezium, por ser um projeto open-source, tem mais suporte da comunidade, e portanto, tem mais possibilidades de origens e destinos.

Executando o Exemplo

Para executar o exemplo, os leitores deverão ter uma conta na AWS e um usuário com permissões de admin. Depois, basta executar o passo-a-passo fornecido no repositório de códigos desta série de blog posts sobre CQRS, no AWS Samples, hospedado no Github. Ao executar o passo-a-passo, os leitores terão a infraestrutura aqui apresentada nas suas próprias contas.

O exemplo contém dois endpoints, sendo um para receber informações relacionadas a pedidos (representando o nosso serviço de comandos) e outro para recuperar informações relacionadas a clientes (representando nosso serviço de consultas). Para verificar se tudo funcionou corretamente, vá até o Amazon API Gateway e, na lista de APIs, entre na API “OrdersAPI”, e depois em “Stages”. Haverá somente uma stage chamada “prod”. Recupere o valor do campo “Invoke URL” e acrescente “/orders”. Esse é o endpoint que recebe informações relacionadas a pedidos.

Vamos realizar uma requisição POST a esse endpoint. Podemos usar qualquer ferramenta para efetuar as requisições, como cURL ou Postman. Como esse endpoint está protegido, também precisamos adicionar basic authentication. Se você estiver usando o Postman, será necessário recuperar nome de usuário e senha gerados na construção da infraestrutura. No API Gateway, vá até “API Keys” e copie o valor da coluna “API key” de “admin_key”. Esse valor contém nome de usuário e senha separados pelo caracter “:”, porém está codificado em Base64. Decodifique o valor, utilizando alguma ferramenta online, ou o próprio comando “base64” do Linux. O nome de usuário está à esquerda do caracter “:”, e a senha está à direita. Adicione uma “Authorization” do tipo “Basic Auth” e preencha os campos “Username” e “Password” com os valores recuperados. Adicione também um header “Content-Type”, com o valor “application/json”.

Se você estiver usando, por exemplo, cURL, não será necessário decodificar o valor da API key. Basta adicionar um header “Authorization” com o valor “Basic <valor da api key copiado da coluna API key>”. Adicione também um header “Content-Type”, com o valor “application/json”.

payload para efetuar requisições para esse endpoint é o seguinte:

{
    "id_client": 1,
    "products": [{
        "id_product": 1,
        "quantity": 1
    }, {
        "id_product": 2,
        "quantity": 3
    }]
}

Isso representa um pedido que o cliente com id 1 fez, contendo produtos com ids 1 e 2. O total desse pedido é de $3000. Todas essas informações serão armazenadas no Aurora. Ao efetuar essa requisição POST, se tudo funcionou conforme o esperado, você deverá ver o seguinte resultado:

{
    "statusCode": 200,
    "body": "Order created successfully!"
}

Agora, vamos verificar se as informações relacionadas ao cliente foram enviadas ao Redis. Ao endpoint do API Gateway, que foi recuperado anteriormente, acrescente “/clients/1”. Esse é o endpoint que recupera informações relacionadas ao cliente. Vamos efetuar uma solicitação GET para esse endpoint. Assim como fizemos com o endpoint “/orders”, precisamos adicionar basic authentication. Siga as etapas explicadas anteriormente e efetue a requisição GET. Se tudo funcionou conforme o esperado, você verá uma saída semelhante à seguinte:

{
    "name": "Bob",
    "email": "bob@anemailprovider.com",
    "total": 3000.0,
    "last_purchase": 1700836837
}

Isso significa que conseguimos alimentar com sucesso o Redis com informações prontas para serem lidas, enquanto as mesmas informações estão no Aurora, em outro formato.

Limpando os Recursos

Para remover a infraestrutura que foi provisionada para não incorrer em custos, algumas etapas são necessárias. Como iniciar a migration task do Amazon DMS é uma etapa realizada após a criação da infraestrutura, será necessário seguir algumas etapas. Para excluir a infraestrutura criada, no console, acesse a página do Amazon DMS. Em seguida, no painel, deverá haver uma task ativa. Navegue até ele. Em “Database migration tasks”, selecione a task em execução e escolha “Stop” no menu “Actions” e confirme. Navegue até a página do Amazon CloudFormation, escolha a pilha “cqrsOnAws”, escolha “Delete” e confirme. Isso levará cerca de 40 minutos. Em seguida, vá para a página do Amazon EC2 e, em “Network & Security”, vá para “Network Interfaces”. Escolha a interface com a descrição “DMSNetworkInterface” e, no menu “Actions”, escolha “Delete” e confirme. Em seguida, acesse a página do Amazon VPC. Escolha a VPC cujo nome começa com “cqrsOnAws” e, no menu “Actions”, escolha “Delete” e confirme. Por fim, volte para a página do Amazon CloudFormation, escolha a stack cujo nome começa com “cqrsOnAws”, escolha “Retry Delete”, depois “Retry delete this entire stack” e depois “Delete”.

Conclusão

Neste blog post, exploramos outra forma de usar a técnica Transaction Log Tailing. A solução abordada aqui é quase a mesma apresentada no post anterior desta série, exceto pela introdução do Amazon DMS, para ler o log de transações da tabela outbox, e também de um stream do Amazon Kinesis Data Streams, para receber os eventos a serem publicados em outros serviços.

Para conectar o tópico do SNS, que entrega o evento a duas filas SQS diferentes, usamos um pipe do EventBridge. O source do pipe é o stream do Amazon Kinesis Data Streams, passando por uma função Lambda que limpa e filtra dados na fase de enrichment e, finalmente, é entregue ao tópico do SNS. A partir daí, o evento é entregue a duas filas SQS, que são lidas por funções Lambda, sendo uma delas a que atualiza o Redis, que é o banco de dados do nosso serviço de consultas.

Com o padrão Transactional Outbox, toda vez que estamos prestes a mudar o estado de um aggregate no serviço de comandos, persistimos em uma tabela outbox um evento que representa a situação que acabou de acontecer com o aggregate, junto com o próprio aggregate. Para publicar as alterações, usamos um componente que lê o log de transações da tabela outbox no banco de dados e publica as alterações em nível de registro em outro componente. No exemplo explorado nesta publicação, usamos uma migration task do Amazon DMS, que é o componente que lê o log de transações de uma tabela e publica as alterações em um stream do Amazon Kinesis Data Streams.

Com o padrão Transactional Outbox, em algum momento todos os eventos serão publicados, então não há risco de não publicá-los. É uma abordagem relativamente simples, mas o Transaction Log Tailing é um pouco mais complexo do que as outras abordagens e é também mais caro. No entanto, é mais resiliente e oferece a capacidade de reproduzir eventos, seja para alimentar outro banco de dados ou para tentar reproduzir um bug.

Agora que exploramos diferentes formas de implementar o CQRS na AWS, levando mudanças em um banco de dados relacional para um banco de dados NoSQL, abordaremos o oposto na próxima publicação: faremos as alterações de um banco de dados NoSQL (o Amazon DynamoDB, no nosso caso) para um banco de dados relacional (o Aurora, no nosso caso). Esse é o caso que exploraremos no próximo post desta série!

Sobre o Autor

Roberto Perillo Roberto Perillo é arquiteto de soluções enterprise da AWS Brasil especialista em serverless, atendendo a clientes da indústria financeira, e atua na indústria de software desde 2001. Atuou por quase 20 anos como arquiteto de software e desenvolvedor Java antes de ingressar na AWS, em 2022. Possui graduação em Ciência da Computação, especialização em Engenharia de Software e mestrado também em Ciência da Computação. Um eterno aprendiz. Nas horas vagas, gosta de estudar, tocar guitarra, e também jogar boliche e futebol de botão com seu filho, Lorenzo!

Sobre o Colaborador

Luiz Santos Luiz Santos trabalha atualmente como Technical Account Manager (TAM) na AWS e é um entusiasta de tecnologia, sempre busca novos conhecimentos e possui maior profundidade sobre desenvolvimento de Software, Data Analytics, Segurança, Serverless e DevOps. Anteriormente, teve experiência como Arquiteto de Soluções AWS e SDE.

Sobre os Revisores

Daniel Abib Daniel ABIB é arquiteto de soluções sênior na AWS, com mais de 25 anos trabalhando com gerenciamento de projetos, arquiteturas de soluções escaláveis, desenvolvimento de sistemas e CI/CD, microsserviços, arquitetura Serverless & Containers e segurança. Ele trabalha apoiando clientes corporativos, ajudando-os em sua jornada para a nuvem.
Erika Nagamine Erika Nagamine é arquiteta de soluções especialista em Dados na AWS. Tem formação acadêmica sólida, graduada em Sistemas de Informação, com pós graduação em Administração de Banco de Dados, Engenharia de Dados e especialização em mineração de Dados Complexos pela Unicamp. Atua com clientes de diversos segmentos e já participou de mais de 200 projetos no Brasil e no mundo. Atualmente múltiplas certificações em dados, computação em nuvem, todas as certificações AWS, e ama compartilhar conhecimento em comunidades e é palestrante em eventos técnicos de destaque em todo o Brasil e no mundo.