O blog da AWS

CQRS na AWS: Sincronizando os Serviços de Command e Query com o Padrão Transactional Outbox, a Técnica Transaction Log Tailing e o Debezium Connector

Essa é a terceira parte da série sobre diferentes de se implementar o CQRS na AWS. A primeira parte foi uma introdução ao assunto e também analisamos um caso de uso em que temos um Amazon Aurora for PostgreSQL-Compatible Edition como banco de dados do serviço de comandos e um Amazon Elasticache for Redis como banco de dados do serviço de consultas. Para trocar informações, um evento é colocado em uma fila Amazon SQS, que é pesquisada por um componente de computação (uma função AWS Lambda, no exemplo apresentado) que atualiza o Redis com informações pré-computadas que estão prontas para serem recuperadas.

Na segunda parte, exploramos o mesmo caso de uso, mas começamos a usar o padrão Transactional Outbox (em português, algo como “caixa de saída transacional”). Lá, nós o exploramos com a técnica Polling Publisher (em português, algo como “publicador que efetua consultas”), em que pesquisamos de tempos em tempos a tabela outbox e publicamos os eventos que ainda não foram publicados. Para que essa ideia funcione, depois de publicarmos os eventos, limpamos a tabela outbox, para que o mesmo evento não seja publicado duas vezes.

Nesta terceira parte desta série, exploraremos a técnica Transaction Log Tailing (em português, algo como “ler a cauda/fim do log de transações”), que é outra forma de usar o padrão Transactional Outbox. Essa técnica lida com o log de transações de uma tabela em um banco de dados para publicar as alterações. O caso de uso é praticamente o mesmo, com os mesmos bancos de dados e o mesmo domínio.

Introdução

A maioria das aplicações que criamos precisa de um banco de dados para armazenar dados. Armazenamos o estado de nossos objetos de domínio e os usamos para várias finalidades, como processar e gerar relatórios. Às vezes, nosso banco de dados pode não ser ideal para recuperação de dados, seja por sua natureza ou devido a um modelo de domínio complexo, por exemplo. Para esses casos, podemos usar o padrão arquitetural CQRS, que sugere que, para um determinado bounded context (em português, algo como “contexto delimitado”) do nosso domínio, podemos ter dois serviços, sendo um para receber comandos (ou seja, operações que mudam de estado) e outro para consultas (que só recuperam dados). Dessa forma, cada serviço pode ter o banco de dados que melhor se encaixe. O desafio é como manter os dois bancos de dados sincronizados, o que pode ser feito com eventos publicados a partir do serviço de comandos, a serem consumidos pelo serviço de consultas.

A publicação confiável de eventos relacionados a coisas que já aconteceram em uma aplicação pode ser um desafio. Se estamos usando ou não o padrão CQRS, se não tomarmos cuidado, podemos acabar publicando informações que ainda não foram persistidas ou não as publicar em nenhum momento, conforme discutimos na primeira parte desta série, e as fontes de dados podem ficar fora de sincronia. De fato, por estarmos publicando eventos a partir do serviço de comandos para serem consumidos pelo serviço de consultas, já há uma consistência eventual, mas se não tomarmos cuidado, as fontes de dados podem ficar dessincronizadas por mais tempo do que se espera. No contexto de um banco de dados relacional, o padrão Transactional Outbox nos permite publicar eventos de forma confiável. Quando aplicado, um evento é persistido na tabela outbox na mesma transação em que os dados do aggregate principal são persistidos, e publicado posteriormente em algum momento.

O padrão Transactional Outbox. Neste exemplo, um aggregate Pedido é composto pela classe Pedido, que por sua vez possui uma lista de ItemPedido. Ao efetuar um pedido, o aggregate é persistido juntamente com um registro que representa o evento de criação do pedido.

Figura 1. O padrão Transactional Outbox. Neste exemplo, um aggregate Pedido é composto pela classe Pedido, que por sua vez possui uma lista de ItemPedido. Ao efetuar um pedido, o aggregate é persistido juntamente com um registro que representa o evento de criação do pedido.

Existem duas técnicas para se lidar com tabelas outbox. Como analisamos a técnica Polling Publisher no último post, agora vamos dar uma olhada na técnica Transaction Log Tailing. Diferentemente da técnica anterior, essa técnica lida com o log de transações de uma tabela para recuperar o que já foi persistido e publicar as alterações. Também não exige necessariamente que a tabela outbox seja limpa, embora possa ser uma boa ideia economizar espaço de armazenamento.

É importante observar que lidar com uma tabela outbox significa capturar alterações de dados, ou CDC, que é a ideia de recuperar as alterações que aconteceram em dados de alguma forma. Há várias maneiras de se fazer isso. Uma delas é ter um componente observando o log de transações de uma tabela do banco, que é o log do banco de dados que contém tudo que já foi persistido nas tabelas, e é nessa ideia que se baseia a técnica Transaction Log Tailing.

Caso de uso: Amazon Aurora for PostgreSQL-Compatible Edition como banco de dados do serviço de comandos e Amazon Elasticache for Redis como banco de dados do serviço de consultas, usando o padrão Transactional Outbox e a técnica Transaction Log Tailing com o Debezium Connector

Assim como no primeiro e segundo casos que explorei nesta série de posts, temos um Aurora como banco de dados do serviço de comandos e um Redis como banco de dados do serviço de consultas. O Aurora conterá informações relacionadas a clientes, produtos e pedidos, e o Redis conterá informações pré-computadas relacionadas a clientes. Para testar a arquitetura, usaremos dois endpoints, sendo um para salvar pedidos e outro para recuperar informações relacionadas ao cliente.

Para implementar essa técnica, além do Aurora e do Redis, também precisaremos de um cluster Amazon Managed Streaming for Apache Kafka (Amazon MSK). Todos os clusters serão implantados em 3 subredes privadas em uma Amazon VPC personalizada. Também usaremos o Amazon EventBridge Pipes para passar dados do tópico do Kafka para um tópico do Amazon SNS, para que as alterações possam ser enviadas a vários consumidores.

Visão Geral da Solução

Assim como fizemos com a técnica Polling Publisher, continuaremos persistindo os dados referente a pedidos juntamente a um registro que representa o evento que acabou de ocorrer na tabela outbox. A diferença aqui é que, diferentemente da técnica anterior, não precisaremos de componentes que disparem a cada n minutos para recuperar dados de réplicas de leitura do Aurora. Em vez disso, teremos um componente que recuperará registros do log de transações do banco. Há várias opções, mas na solução proposta neste blog post, utilizaremos o Debezium Connector. O nome da técnica Transaction Log Tailing vem do fato de que os componentes que lidam com o log de transações o observam “a cauda” desse log, e tornam os registros disponíveis para serem utilizados em algum lugar. No caso do Debezium Connector, um tópico do Kafka.

Nessa opção de implementação do padrão arquitetural CQRS, acompanhamos o log de transações da tabela outbox no banco de dados. Para isso, usaremos um componente chamado Debezium Connector for PostgreSQL, que publicará alterações na tabela outbox em um tópico do Kafka sempre que elas ocorrerem. O Debezium Connector captura alterações em nível de registro nas tabelas indicadas pelo usuário em bancos de dados relacionais assim que elas são persistidas. Quando ele se conecta ao banco de dados pela primeira vez, ele tira um snapshot do estado atual das tabelas e o publica em um tópico do Kafka e, em seguida, continua monitorando essas tabelas e publicando alterações em nível de registro. Atualmente, no caso de um banco de dados Postgres, o Debezium só é capaz de enviar alterações em nível de registro para um tópico do Kafka, o que faz sentido, já que é um connector do Kafka.

É importante observar que, para que o Debezium Connector funcione, no serviço Amazon RDS, precisamos criar um parameter group do tipo cluster, definir o parâmetro rds.logical_replication como 1 e atribuir o parameter group ao nosso banco de dados Aurora (se o cluster do banco de dados foi criado antes de atribuir o parameter group ao cluster, também precisamos reiniciar as instâncias do cluster). Para obter instruções sobre como trabalhar com replicação lógica no Aurora Postgres, consulte a documentação Usar a Replicação Lógica do PostgreSQL com o Aurora.

O Debezium Connector que estamos usando aqui é um source connector (em português, algo como “conector de origem”), o que significa que ele lê um log de transações e publica as alterações em um tópico do Kafka. A partir daí, os dados passam pelo pipe do EventBridge até que as alterações sejam enviadas para outros componentes. Outra opção seria ter um sink connector (em português, algo como “conector de destino”): enquanto um source connector envia dados de algum lugar para um tópico do Kafka, um sink connector envia dados de um tópico do Kafka para algum lugar.

Existem alguns connectors na comunidade que podem enviar dados de um tópico do Kafka para um tópico do SNS. Embora definitivamente funcione, a única consideração aqui é que, se algum tipo de tratamento de dados fosse necessário, como o que fazemos com o OrderEventAdapterLambda que os limpa, ou se algum tipo de filtragem fosse necessário, isso não seria possível com um sink connector, como é com o EventBridge Pipes. Também precisamos instalar um novo connector e seu plugin correspondente para cada destino para o qual queremos enviar informações. Um pipe do EventBridge é mais flexível e fácil de usar, pois não exige a instalação de novos componentes.

Em nosso exemplo, uma chamada POST será feita para o endpoint /orders, com as informações de um pedido a ser feito por um cliente. Essa chamada será validada por uma Lambda Authorizer (para fins de simplificação, usaremos basic authentication) e será processada posteriormente pela OrderReceiverLambda, que desempenhará a função do nosso serviço de comandos. Essa função Lambda, na mesma transação, inserirá dados nas tabelas que persistem informações relacionadas ao aggregate do pedido e também na tabela que corresponde a nossa tabela outbox. O log de transações dessa tabela será monitorado pelo Debezium Connector, que publicará as alterações feitas nele em um tópico do Kafka.

O tópico do Kafka que recebe as alterações feitas na tabela outbox será o source de um pipe do EventBridge. A segunda etapa do pipe será a função Lambda OrderAdapterLambda na fase de enrichment, cujo objetivo é limpar o evento entregue pelo source do pipe e também decodificar a chave e o valor do evento, que são codificados com em Base64.

Essa função do Lambda também pode desempenhar o papel de um filtro. Quando aplicamos a técnica Transaction Log Tailing com o Debezium Connector, podemos ter acesso não apenas aos dados do evento em si, mas também aos metadados sobre a transação em si (isso seria possível adicionando propriedades como transforms.unwrap.add.fields à configuração do nosso connector). Também poderíamos ter acesso à exclusão de registros (para permitir que o Debezium Connector capture eventos de exclusão da tabela outbox, teríamos que definir a propriedade delete_handling_mode para rewrite ou none).

Se tivéssemos habilitado o Debezium para capturar eventos de exclusão, essa filtragem seria importante porque, como estamos observando o log de transações da tabela outbox, receberíamos todos os eventos, incluindo exclusões, e no cenário que estamos explorando, estamos interessados apenas em inserções. Esses eventos de exclusão acontecem porque o OrderEventCleanerLambda limpa a tabela outbox, o que gera os eventos.

É verdade que é possível definir um filtro em um pipe do EventBridge. O problema é que os dados do evento são codificados com em Base64 e, para fazer a filtragem, precisamos decodificar os dados e verificar os metadados do evento para verificar se o campo op é igual a “c” (como em “created”), e não podemos fazer isso no filtro de um pipe do EventBridge, então poderíamos usar a função Lambda da fase de enriquecimento para fazer isso. Para seguir com o processamento dos eventos na fase de enriquecimento, poderíamos criar uma lista, decodificar os eventos e incluir na lista somente os eventos cujo campo op contido nos metadados de cada evento fosse igual a “c”, e retornar a lista na função Lambda.

Após a função Lambda da fase de enriquecimento, o evento é finalmente entregue a um tópico do SNS, que entrega o evento a duas filas SQS, sendo uma fila para ser lida por uma função Lambda que envia um e-mail de notificação e outra fila para ser lida por uma função Lambda que atualiza a chave do cliente no Redis.

A função Lambda que atualiza o Redis então publica um evento em outra fila opcional SQS, que é lida por uma função Lambda que exclui os eventos da tabela outbox que já foram processados. O evento é publicado pela função Lambda que atualiza o Redis apenas para garantir que a função Lambda opcional que remove um evento já publicado do Aurora só receba um evento após uma atualização bem-sucedida do Redis. Como não estamos recuperando eventos diretamente da tabela do banco de dados para publicar eventos, há outras maneiras de se limpar a tabela outbox, como com a extensão pg_partman, disponível para Amazon RDS PostgreSQL ou Amazon Aurora for PostgreSQL.

Um segundo endpoint, /clients/ {clientId}, recupera informações relacionadas ao cliente. A função Lambda que fornece essas informações representa o nosso serviço de consultas e as recupera da chave Redis que foi atualizada anteriormente. As informações retornadas contêm o nome do cliente, o e-mail e o valor total que já foi comprado pelo cliente.

Imagem demonstrando a arquitetura proposta, tendo o Aurora como banco de dados do serviço de comandos, e o Redis como banco de dados do serviço de consultas. A partir da inserção de um evento no Aurora, o Debezium lê o transaction log da tabela outbox e publica os eventos em um tópico MSK, que é lido por um pipe do EventBridge que efetua a publicação desses eventos em um tópico do SNS. O SNS então entrega os eventos em duas filas, sendo que uma delas é lida por uma Lambda que atualiza o Redis, com os últimos dados do cliente relacionado a cada evento.

Figura 2. Arquitetura proposta, tendo o Amazon Aurora PostgreSQL-Compatible Edition como banco de dados do serviço de comandos, e o Amazon Elasticache for Redis como banco de dados do serviço de consultas.

Assim como exploramos na segunda parte desta série, esta solução também utiliza o padrão Transactional Outbox. Dessa forma, os eventos que acontecerem nos bounded contexts da nossa aplicação serão garantidamente publicados em algum momento, e poderemos também ter mecanismos que removam os eventos da tabela outbox somente depois que eles forem consumidos.

Outra vantagem dessa solução é que as alterações são retiradas do log de transações da tabela outbox para um tópico do Kafka. Isso significa que o que estamos consumindo garantidamente já aconteceu na tabela outbox, que pode ser limpa em algum momento por um componente opcional que pode executar computação (como uma função Lambda) ou combinando extensões do Postgres, como pg_partman, com outras soluções da AWS, como o Amazon S3. E como o Debezium Connector está instalado no cluster MSK, que é multi-az, temos mais tolerância a falhas, portanto, é uma arquitetura mais resiliente.

Outra vantagem é que, diferentemente do que exploramos com a técnica Polling Publisher, ao observar o log de transações da tabela outbox, podemos capturar mais dados do que os presentes na própria tabela outbox, como metadados sobre a transação. Além disso, os eventos serão publicados a partir da tabela outbox tão logo forem persistidos, então o lag de replicação entre os bancos de dados será muito menor. Outro ponto é que, diferentemente das outras soluções, os eventos que foram persistidos na tabela outbox podem ser reproduzidos, pois também estão no tópico do Kafka. Se quisermos alimentar outro banco de dados ou reproduzir eventos para tentar simular um bug, todos os eventos podem ser novamente reproduzidos.

A desvantagem dessa arquitetura é que, embora seja mais resiliente, ela adiciona mais componentes e, portanto, temos mais complexidade e é também um pouco mais cara do que as que foram exploradas anteriormente.

Executando o Exemplo

Para executar o exemplo, os leitores deverão ter uma conta na AWS e um usuário com permissões de admin. Depois, basta executar o passo-a-passo fornecido no repositório de códigos desta série de blog posts sobre CQRS, no AWS Samples, hospedado no Github. Ao executar o passo-a-passo, os leitores terão a infraestrutura aqui apresentada nas suas próprias contas.

O exemplo contém dois endpoints, sendo um para receber informações relacionadas a pedidos (representando o nosso serviço de comandos) e outro para recuperar informações relacionadas a clientes (representando nosso serviço de consultas). Para verificar se tudo funcionou corretamente, vá até o Amazon API Gateway e, na lista de APIs, entre na API “OrdersAPI”, e depois em Stages. Haverá somente uma stage chamada “prod”. Recupere o valor do campo “Invoke URL” e acrescente “/orders”. Esse é o endpoint que recebe informações relacionadas a pedidos.

Vamos realizar uma requisição POST a esse endpoint. Podemos usar qualquer ferramenta para efetuar as requisições, como cURL ou Postman. Como esse endpoint está protegido, também precisamos adicionar basic authentication. Se você estiver usando o Postman, será necessário recuperar nome de usuário e senha gerados na construção da infraestrutura. No API Gateway, vá até “API Keys” e copie o valor da coluna “API key” de “admin_key”. Esse valor contém nome de usuário e senha separados pelo caracter “:”, porém está codificado em Base64. Decodifique o valor, utilizando alguma ferramenta online, ou o próprio comando “base64” do Linux. O nome de usuário está à esquerda do caracter “:”, e a senha está à direita. Adicione uma “Authorization” do tipo “Basic Auth” e preencha os campos “Username” e “Password” com os valores recuperados. Adicione também um header “Content-Type”, com o valor “application/json”.

Se você estiver usando, por exemplo, cURL, não será necessário decodificar o valor da API key. Basta adicionar um header “Authorization” com o valor “Basic <valor da api key copiado da coluna API key>”. Adicione também um header “Content-Type”, com o valor “application/json”.

O payload para efetuar requisições para esse endpoint é o seguinte:

{
    "id_client": 1,
    "products": [{
        "id_product": 1,
        "quantity": 1
    }, {
        "id_product": 2,
        "quantity": 3
    }]
}

Isso representa um pedido que o cliente com id 1 fez, contendo produtos com ids 1 e 2. O total desse pedido é de $3000. Todas essas informações serão armazenadas no Aurora. Ao efetuar essa requisição POST, se tudo funcionou conforme o esperado, você deverá ver o seguinte resultado:

{
    "statusCode": 200,
    "body": "Order created successfully!"
}

Agora, vamos verificar se as informações relacionadas ao cliente foram enviadas ao Redis. Ao endpoint do API Gateway, que foi recuperado anteriormente, acrescente “/clients/1”. Esse é o endpoint que recupera informações relacionadas ao cliente. Vamos efetuar uma solicitação GET para esse endpoint. Assim como fizemos com o endpoint “/orders”, precisamos adicionar basic authentication. Siga as etapas explicadas anteriormente e efetue a requisição GET. Se tudo funcionou conforme o esperado, você verá uma saída semelhante à seguinte:

{
    "name": "Bob",
    "email": "bob@anemailprovider.com",
    "total": 3000.0,
    "last_purchase": 1700836837
}

Isso significa que conseguimos alimentar com sucesso o Redis com informações prontas para serem lidas, enquanto as mesmas informações estão no Aurora, em outro formato.

Limpando os Recursos

Pelo fato de que a infraestrutura criada é relativamente complexa e alguns recursos têm que ser criados após a criação da infraestrutura inicial, para limparmos os recursos, será necessário seguir alguns passos. Para excluir a infraestrutura criada, no console, vá até o Amazon MSK. Depois, em MSK Connect, vá até “Connectors”, selecione o connector “KafkaOrderEventConnector” e o exclua. Depois, vá até “Custom plugins”, selecione o plugin “debezium-plugin” e o exclua. Depois, vá ao Amazon CloudFormation, Stacks, e clique no radio button da stack “cqrsOnAws”. Exclua a stack. Essa deleção levará aproximadamente 30 minutos. Provavelmente não será possível excluir a stack inteira. Se isso acontecer, vá ao Amazon EC2 e, em “Network & Security”, vá até “Network Interfaces”. Exclua as duas interfaces que restaram, selecionando-as e clicando em “Delete” no menu “Actions”. Depois, volte ao CloudFormation, selecione a stack “cqrsOnAws” e clique em “Retry Delete”. Em seguida, escolha “Force delete this entire stack” e clique em “Delete”.

Conclusão

Neste blog post, abordei a técnica Transaction Log Tailing, que é uma forma de implementar o padrão Transactional Outbox. Com isso, toda vez que estamos prestes a alterar o estado de um aggregate no serviço de comandos, persistimos em uma tabela Transactional Outbox um evento que representa a situação que acabou de acontecer ao aggregate, junto com o próprio aggregate. Para publicar as alterações, usamos um componente que observa o log de transações da tabela outbox no banco de dados e publica as alterações em nível de registro em outro componente. Em nosso exemplo, usamos o Debezium Connector, que é o componente que observa o log de transações de uma tabela e publica as alterações em um tópico do MSK.

Com o padrão Transactional Outbox, em algum momento todos os eventos serão publicados, então não há risco de não os publicar. É uma abordagem relativamente simples, mas o Transaction Log Tailing é um pouco mais complexo do que as outras abordagens e é também um pouco mais caro. No entanto, é mais resiliente e oferece a capacidade de reproduzir eventos, seja para alimentar outro banco de dados ou tentar reproduzir um bug.

No próximo post desta série, explorarei outra técnica para publicar eventos de uma tabela Transactional Outbox, na qual exploraremos outra forma da técnica de Transaction Log Tailing usando o Amazon Database Migration Service para publicar continuamente as alterações em nível de registro que ocorreram na tabela Transactional Outbox em outros componentes da nossa arquitetura.

Sobre o Autor

Roberto Perillo Roberto Perillo é arquiteto de soluções enterprise da AWS Brasil especialista em serverless, atendendo a clientes da indústria financeira, e atua na indústria de software desde 2001. Atuou por quase 20 anos como arquiteto de software e desenvolvedor Java antes de ingressar na AWS, em 2022. Possui graduação em Ciência da Computação, especialização em Engenharia de Software e mestrado também em Ciência da Computação. Um eterno aprendiz. Nas horas vagas, gosta de estudar, tocar guitarra, e também jogar boliche e futebol de botão com seu filho, Lorenzo!

Sobre o Colaborador

Luiz Santos Luiz Santos trabalha atualmente como Technical Account Manager (TAM) na AWS e é um entusiasta de tecnologia, sempre busca novos conhecimentos e possui maior profundidade sobre desenvolvimento de Software, Data Analytics, Segurança, Serverless e DevOps. Anteriormente, teve experiência como Arquiteto de Soluções AWS e SDE.

Sobre as Revisoras

Erika Nagamine Erika Nagamine é arquiteta de soluções especialista em Dados na AWS. Tem formação acadêmica sólida, graduada em Sistemas de Informação, com pós graduação em Administração de Banco de Dados, Engenharia de Dados e especialização em mineração de Dados Complexos pela Unicamp. Atua com clientes de diversos segmentos e já participou de mais de 200 projetos no Brasil e no mundo. Atualmente múltiplas certificações em dados, computação em nuvem, todas as certificações AWS, e ama compartilhar conhecimento em comunidades e é palestrante em eventos técnicos de destaque em todo o Brasil e no mundo.
Karine Ferrari Karine Ferrari é arquiteta de soluções na AWS com experiência em clientes SMB e Financial Services. Com 15 anos de experiência na área de tecnologia da informação atuando em instituições de grande porte e nos últimos 4 anos atua com arquitetura para projetos em cloud e modernização de aplicações. Possui experiência em implementar e fornecer documentações, guias e experimentações com intuito de evangelizar e apoiar as equipes de negócios para utilização de microserviços, APIs, mensageria, eventos e banco de dados em projetos em nuvem.