AWS Compute Blog

Invoking AWS Lambda from Amazon MQ

This post courtesy of Josh Kahn, AWS Solutions Architect

Message brokers can be used to solve a number of needs in enterprise architectures, including managing workload queues and broadcasting messages to a number of subscribers. Amazon MQ is a managed message broker service for Apache ActiveMQ that makes it easy to set up and operate message brokers in the cloud.

In this post, I discuss one approach to invoking AWS Lambda from queues and topics managed by Amazon MQ brokers. This and other similar patterns can be useful in integrating legacy systems with serverless architectures. You could also integrate systems already migrated to the cloud that use common APIs such as JMS.

For example, imagine that you work for a company that produces training videos and which recently migrated its video management system to AWS. The on-premises system used to publish a message to an ActiveMQ broker when a video was ready for processing by an on-premises transcoder. However, on AWS, your company uses Amazon Elastic Transcoder. Instead of modifying the management system, Lambda polls the broker for new messages and starts a new Elastic Transcoder job. This approach avoids changes to the existing application while refactoring the workload to leverage cloud-native components.

This solution uses Amazon CloudWatch Events to trigger a Lambda function that polls the Amazon MQ broker for messages. Instead of starting an Elastic Transcoder job, the sample writes the received message to an Amazon DynamoDB table with a time stamp indicating the time received.

Getting started

To start, navigate to the Amazon MQ console. Next, launch a new Amazon MQ instance, selecting Single-instance Broker and supplying a broker name, user name, and password. Be sure to document the user name and password for later.

For the purposes of this sample, choose the default options in the Advanced settings section. Your new broker is deployed to the default VPC in the selected AWS Region with the default security group. For this post, you update the security group to allow access for your sample Lambda function. In a production scenario, I recommend deploying both the Lambda function and your Amazon MQ broker in your own VPC.

After several minutes, your instance changes status from “Creation Pending” to “Available.” You can then visit the Details page of your broker to retrieve connection information, including a link to the ActiveMQ web console where you can monitor the status of your broker, publish test messages, and so on. In this example, use the Stomp protocol to connect to your broker. Be sure to capture the broker host name, for example:

<BROKER_ID>.mq.us-east-1.amazonaws.com

You should also modify the Security Group for the broker by clicking on its Security Group ID. Click the Edit button and then click Add Rule to allow inbound traffic on port 8162 for your IP address.

Deploying and scheduling the Lambda function

To simplify the deployment of this example, I’ve provided an AWS Serverless Application Model (SAM) template that deploys the sample function and DynamoDB table, and schedules the function to be invoked every five minutes. Detailed instructions can be found with sample code on GitHub in the amazonmq-invoke-aws-lambda repository, with sample code. I discuss a few key aspects in this post.

First, SAM makes it easy to deploy and schedule invocation of our function:

SubscriberFunction:
	Type: AWS::Serverless::Function
	Properties:
		CodeUri: subscriber/
		Handler: index.handler
		Runtime: nodejs6.10
		Role: !GetAtt SubscriberFunctionRole.Arn
		Timeout: 15
		Environment:
			Variables:
				HOST: !Ref AmazonMQHost
				LOGIN: !Ref AmazonMQLogin
				PASSWORD: !Ref AmazonMQPassword
				QUEUE_NAME: !Ref AmazonMQQueueName
				WORKER_FUNCTIOn: !Ref WorkerFunction
		Events:
			Timer:
				Type: Schedule
				Properties:
					Schedule: rate(5 minutes)

WorkerFunction:
Type: AWS::Serverless::Function
	Properties:
		CodeUri: worker/
		Handler: index.handler
		Runtime: nodejs6.10
Role: !GetAtt WorkerFunctionRole.Arn
		Environment:
			Variables:
				TABLE_NAME: !Ref MessagesTable

In the code, you include the URI, user name, and password for your newly created Amazon MQ broker. These allow the function to poll the broker for new messages on the sample queue.

The sample Lambda function is written in Node.js, but clients exist for a number of programming languages.

stomp.connect(options, (error, client) => {
	if (error) { /* do something */ }

	let headers = {
		destination: ‘/queue/SAMPLE_QUEUE’,
		ack: ‘auto’
	}

	client.subscribe(headers, (error, message) => {
		if (error) { /* do something */ }

		message.readString(‘utf-8’, (error, body) => {
			if (error) { /* do something */ }

			let params = {
				FunctionName: MyWorkerFunction,
				Payload: JSON.stringify({
					message: body,
					timestamp: Date.now()
				})
			}

			let lambda = new AWS.Lambda()
			lambda.invoke(params, (error, data) => {
				if (error) { /* do something */ }
			})
		}
})
})

Sending a sample message

For the purpose of this example, use the Amazon MQ console to send a test message. Navigate to the details page for your broker.

About midway down the page, choose ActiveMQ Web Console. Next, choose Manage ActiveMQ Broker to launch the admin console. When you are prompted for a user name and password, use the credentials created earlier.

At the top of the page, choose Send. From here, you can send a sample message from the broker to subscribers. For this example, this is how you generate traffic to test the end-to-end system. Be sure to set the Destination value to “SAMPLE_QUEUE.” The message body can contain any text. Choose Send.

You now have a Lambda function polling for messages on the broker. To verify that your function is working, you can confirm in the DynamoDB console that the message was successfully received and processed by the sample Lambda function.

First, choose Tables on the left and select the table name “amazonmq-messages” in the middle section. With the table detail in view, choose Items. If the function was successful, you’ll find a new entry similar to the following:

If there is no message in DynamoDB, check again in a few minutes or review the CloudWatch Logs group for Lambda functions that contain debug messages.

Alternative approaches

Beyond the approach described here, you may consider other approaches as well. For example, you could use an intermediary system such as Apache Flume to pass messages from the broker to Lambda or deploy Apache Camel to trigger Lambda via a POST to API Gateway. There are trade-offs to each of these approaches. My goal in using CloudWatch Events was to introduce an easily repeatable pattern familiar to many Lambda developers.

Summary

I hope that you have found this example of how to integrate AWS Lambda with Amazon MQ useful. If you have expertise or legacy systems that leverage APIs such as JMS, you may find this useful as you incorporate serverless concepts in your enterprise architectures.

To learn more, see the Amazon MQ website and Developer Guide. You can try Amazon MQ for free with the AWS Free Tier, which includes up to 750 hours of a single-instance mq.t2.micro broker and up to 1 GB of storage per month for one year.