AWS Compute Blog
Using custom consumer group ID support for AWS Lambda event sources for MSK and self-managed Kafka
This post is written by Adam Wagner, Principal Serverless Specialist SA.
AWS Lambda already supports Amazon Managed Streaming for Apache Kafka (MSK) and self-managed Apache Kafka clusters as event sources. Today, AWS adds support for specifying a custom consumer group ID for the Lambda event source mappings (ESMs) for MSK and self-managed Kafka event sources.
With this feature, you can create a Lambda ESM that uses a consumer group that has already been created. This enables you to use Lambda as a Kafka consumer for topics that are replicated with MirrorMaker v2 or with consumer groups you create to start consuming at a particular offset or timestamp.
Overview
This blog post shows how to use this feature to enable Lambda to consume a Kafka topic starting at a specific timestamp. This can be useful if you must reprocess some data but don’t want to reprocess all of the data in the topic.
In this example application, a client application writes to a topic on the MSK cluster. It creates a consumer group that points to a specific timestamp within that topic as the starting point for consuming messages. A Lambda ESM is created using that existing consumer group that triggers a Lambda function. This processes and writes the messages to an Amazon DynamoDB table.
- A Kafka client writes messages to a topic in the MSK cluster.
- A Kafka consumer group is created with a starting point of a specific timestamp
- The Lambda ESM polls the MSK topic using the existing consumer group and triggers the Lambda function with batches of messages.
- The Lambda function writes the messages to DynamoDB
Step-by-step instructions
To get started, create an MSK cluster and a client Amazon EC2 instance from which to create topics and publish messages. If you don’t already have an MSK cluster, follow this blog on setting up an MSK cluster and using it as an event source for Lambda.
- On the client instance, set an environment variable to the MSK cluster bootstrap servers to make it easier to reference them in future commands:
export MSKBOOTSTRAP='b-1.mskcluster.oy1hqd.c23.kafka.us-east-1.amazonaws.com:9094,b-2.mskcluster.oy1hqd.c23.kafka.us-east-1.amazonaws.com:9094,b-3.mskcluster.oy1hqd.c23.kafka.us-east-1.amazonaws.com:9094'
- Create the topic. This example has a three-node MSK cluster so the replication factor is also set to three. The partition count is set to three in this example. In your applications, set this according to throughput and parallelization needs.
./bin/kafka-topics.sh --create --bootstrap-server $MSKBOOTSTRAP --replication-factor 3 --partitions 3 --topic demoTopic01
- Write messages to the topic using this Python script:
#!/usr/bin/env python3 import json import time from random import randint from uuid import uuid4 from kafka import KafkaProducer BROKERS = ['b-1.mskcluster.oy1hqd.c23.kafka.us-east-1.amazonaws.com:9094', 'b-2.mskcluster.oy1hqd.c23.kafka.us-east-1.amazonaws.com:9094', 'b-3.mskcluster.oy1hqd.c23.kafka.us-east-1.amazonaws.com:9094'] TOPIC = 'demoTopic01' producer = KafkaProducer(bootstrap_servers=BROKERS, security_protocol='SSL', value_serializer=lambda x: json.dumps(x).encode('utf-8')) def create_record(sequence_num): number = randint(1000000,10000000) record = {"id": sequence_num, "record_timestamp": int(time.time()), "random_number": number, "producer_id": str(uuid4()) } print(record) return record def publish_rec(seq): data = create_record(seq) producer.send(TOPIC, value=data).add_callback(on_send_success).add_errback(on_send_error) producer.flush() def on_send_success(record_metadata): print(record_metadata.topic, record_metadata.partition, record_metadata.offset) def on_send_error(excp): print('error writing to kafka', exc_info=excp) for num in range(1,10000000): publish_rec(num) time.sleep(0.5)
- Copy the script into a file on the client instance named producer.py. The script uses the kafka-python library, so first create a virtual environment and install the library.
python3 -m venv venv source venv/bin/activate pip3 install kafka-python
- Start the script. Leave it running for a few minutes to accumulate some messages in the topic.
- Previously, a Lambda function would choose between consuming messages starting at the beginning of the topic or starting with the latest messages. In this example, it starts consuming messages from a few hours ago at 14:30 UTC. To do this, first create a new consumer group on the client instance:
./bin/kafka-consumer-groups.sh --command-config client.properties --bootstrap-server $MSKBOOTSTRAP --topic demoTopic01 --group specificTimeCG --to-datetime 2022-08-10T16:00:00.000 --reset-offsets --execute
- In this case,
specificTimeCG
is the consumer group ID used when creating the Lambda ESM. Listing the consumer groups on the cluster shows the new group:./bin/kafka-consumer-groups.sh --list --command-config client.properties --bootstrap-server $MSKBOOTSTRAP
- With the consumer group created, create the Lambda function along with the Event Source Mapping that uses this new consumer group. In this case, the Lambda function and DynamoDB table are already created. Create the ESM with the following AWS CLI Command:
aws lambda create-event-source-mapping --region us-east-1 --event-source-arn arn:aws:kafka:us-east-1:0123456789:cluster/demo-us-east-1/78a8d1c1-fa31-4f59-9de3-aacdd77b79bb-23 --function-name msk-consumer-demo-ProcessMSKfunction-IrUhEoDY6X9N --batch-size 3 --amazon-managed-kafka-event-source-config '{"ConsumerGroupId":"specificTimeCG"}' --topics demoTopic01
The event source in the Lambda console or CLI shows the starting position set to TRIM_HORIZON. However, if you specify a custom consumer group ID that already has existing offsets, those offsets take precedent.
- With the event source created, navigate to the DynamoDB console. Locate the DynamoDB table to see the records written by the Lambda function.
Converting the record timestamp of the earliest record in DynamoDB, 1660147212, to a human-readable date shows that the first record was created on 2022-08-10T16:00:12.
In this example, the consumer group is created before the Lambda ESM so that you can specify the timestamp to start from.
If you create an ESM and specify a custom consumer group ID that does not exist, it is created. This is a convenient way to create a new consumer group for an ESM with an ID of your choosing.
Deleting an ESM does not delete the consumer group, regardless of whether it is created before, or during, the ESM creation.
Using the AWS Serverless Application Model (AWS SAM)
To create the event source mapping with a custom consumer group using an AWS Serverless Application Model (AWS SAM) template, use the following snippet:
Events:
MyMskEvent:
Type: MSK
Properties:
Stream: !Sub arn:aws:kafka:${AWS::Region}:012345678901:cluster/ demo-us-east-1/78a8d1c1-fa31-4f59-9de3-aacdd77b79bb-23
Topics:
- "demoTopic01"
ConsumerGroupId: specificTimeCG
Other types of Kafka clusters
This example uses the custom consumer group ID feature when consuming a Kafka topic from an MSK cluster. In addition to MSK clusters, this feature also supports self-managed Kafka clusters. These could be clusters running on EC2 instances or managed Kafka clusters from a partner such as Confluent.
Conclusion
This post shows how to use the new custom consumer group ID feature of the Lambda event source mapping for Amazon MSK and self-managed Kafka. This feature can be used to consume messages with Lambda starting at a specific timestamp or offset within a Kafka topic. It can also be used to consume messages from a consumer group that is replicated from another Kafka cluster using MirrorMaker v2.
For more serverless learning resources, visit Serverless Land. To learn more, read the guide: Using Lambda to process Apache Kafka streams.