AWS Big Data Blog

Deliver Amazon CloudWatch logs to Amazon OpenSearch Serverless

Amazon CloudWatch Logs collect, aggregate, and analyze logs from different systems in one place. CloudWatch provides subcriptions as a real-time feed of these logs to other services like Amazon Kinesis Data Streams, AWS Lambda, and Amazon OpenSearch Service. These subscriptions are a popular mechanism to enable custom processing and advanced analysis of log data to gain additional valuable insights. At the time of publishing this blog post, these subscription filters support delivering logs to Amazon OpenSearch Service provisioned clusters only. Customers are increasingly adopting Amazon OpenSearch Serverless as a cost-effective option for infrequent, intermittent and unpredictable workloads.

In this blog post, we will show how to use Amazon OpenSearch Ingestion to deliver CloudWatch logs to OpenSearch Serverless in near real-time. We outline a mechanism to connect a Lambda subscription filter with OpenSearch Ingestion and deliver logs to OpenSearch Serverless without explicitly needing a separate subscription filter for it.

Solution overview

The following diagram illustrates the solution architecture.

  1. CloudWatch Logs: Collects and stores logs from various AWS resources and applications. It serves as the source of log data in this solution.
  2. Subscription filter : A CloudWatch Logs subscription filter filters and routes specific log data from CloudWatch Logs to the next component in the pipeline.
  3. CloudWatch exporter Lambda function: This is a Lambda function that receives the filtered log data from the subscription filter. Its purpose is to transform and prepare the log data for ingestion into the OpenSearch Ingestion pipeline.
  4. OpenSearch Ingestion: This is a component of OpenSearch Service. The Ingestion pipeline is responsible for processing and enriching the log data received from the CloudWatch exporter Lambda function before storing it in the OpenSearch Serverless collection.
  5. OpenSearch Service: This is fully managed service that stores and indexes log data, making it searchable and available for analysis and visualization. OpenSearch Service offers two configurations: provisioned domains and serverless. In this setup, we use serverless, which is an auto-scaling configuration for OpenSearch Service.

Prerequisites

Deploy the solution

With the prerequisites in place, you can create and deploy the pieces of the solution.

Step 1: Create PipelineRole for ingestion

  • Open the AWS Management Console for AWS Identity and Access Management (IAM).
  • Choose Policies, and then choose Create policy.
  • Select JSON and paste the following policy into the editor:
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "aoss:BatchGetCollection",
                "aoss:APIAccessAll"
            ],
            "Effect": "Allow",
            "Resource": "arn:aws:aoss:us-east-1:{accountId}:collection/{collectionId}"
        },
        {
            "Action": [
                "aoss:CreateSecurityPolicy",
                "aoss:GetSecurityPolicy",
                "aoss:UpdateSecurityPolicy"
            ],
            "Effect": "Allow",
            "Resource": "*",
            "Condition": {
                "StringEquals": {
                    "aoss:collection": "{collection}"
                }
            }
        }
    ]
}

// Replace {accountId}, {collectionId}, and {collection} with your own values
  • Choose Next, choose Next, and name your policy collection-pipeline-policy.
  • Choose Create policy.
  • Next, create a role and attach the policy to it. Choose Roles, and then choose Create role.
  • Select Custom trust policy and paste the following policy into the editor:
{
   "Version":"2012-10-17",
   "Statement":[
      {
         "Effect":"Allow",
         "Principal":{
            "Service":"osis-pipelines.amazonaws.com"
         },
         "Action":"sts:AssumeRole"
      }
   ]
}
  • Choose Next, and then search for and select the collection-pipeline-policy you just created.
  • Choose Next and name the role PipelineRole.
  • Choose Create role.

Step 2: Configure the network and data policy for OpenSearch collection

  • In the OpenSearch Service console, navigate to the Serverless menu.
  • Create a VPC endpoint by following the instruction in Create an interface endpoint for OpenSearch Serverless.
  • Go to Security and choose Network policies.
  • Choose Create network policy.
  • Configure the following policy
[
  {
    "Rules": [
      {
        "Resource": [
          "collection/{collection name}"
        ],
        "ResourceType": "collection"
      }
    ],
    "AllowFromPublic": false,
    "SourceVPCEs": [
      "{VPC Enddpoint Id}"
    ]
  },
  {
    "Rules": [
      {
        "Resource": [
          "collection/{collection name}"
        ],
        "ResourceType": "dashboard"
      }
    ],
    "AllowFromPublic": true
  }
]
  • Go to Security and choose Data access policies.
  • Choose Create access policy.
  • Configure the following policy:
[
  {
    "Rules": [
      {
        "Resource": [
          "index/{collection name}/*"
        ],
        "Permission": [
          "aoss:CreateIndex",
          "aoss:UpdateIndex",
          "aoss:DescribeIndex",
          "aoss:ReadDocument",
          "aoss:WriteDocument"
        ],
        "ResourceType": "index"
      }
    ],
    "Principal": [
      "arn:aws:iam::{accountId}:role/PipelineRole",
      "arn:aws:iam::{accountId}:role/Admin"
    ],
    "Description": "Rule 1"
  }
]

Step 3: Create an OpenSearch Ingestion pipeline

  • Navigate to the OpenSearch Service.
  • Go to the Ingestion pipelines section.
  • Choose Create pipeline.
  • Define the pipeline configuration.
version: "2"
 cwlogs-ingestion-pipeline:

  source:

    http:

      path: /logs/ingest

  sink:

    - opensearch:

        # Provide an AWS OpenSearch Service domain endpoint

        hosts: ["https://{collectionId}.{region}.aoss.amazonaws.com"]

        index: "cwl-%{yyyy-MM-dd}"

        aws:

          # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com

          sts_role_arn: "arn:aws:iam::{accountId}:role/PipelineRole"

          # Provide the region of the domain.

          region: "{region}"

          serverless: true

          serverless_options:

            network_policy_name: "{Network policy name}"
 # To get the values for the placeholders: 
 # 1. {collectionId}: You can find the collection ID by navigating to the Amazon OpenSearch Serverless Collection in the AWS Management Console, and then clicking on the Collection. The collection ID is listed under the "Overview" section. 
 # 2. {region}: This is the AWS region where your Amazon OpenSearch Service domain is located. You can find this information in the AWS Management Console when you navigate to the domain. 
 # 3. {accountId}: This is your AWS account ID. You can find your account ID by clicking on your username in the top-right corner of the AWS Management Console and selecting "My Account" from the dropdown menu. 
 # 4. {Network policy name}: This is the name of the network policy you have configured for your Amazon OpenSearch Serverless Collection. If you haven't configured a network policy, you can leave this placeholder as is or remove it from the configuration.
 # After obtaining the necessary values, replace the placeholders in the configuration with the actual values.            

Step 4: Create a Lambda function

  • Create a Lambda layer for requests and sigv4 packages. Run the following commands in AWS Cloudshell.
mkdir lambda_layers
 cd lambda_layers
 mkdir python
 cd python
 pip install requests -t ./
 pip install requests_auth_aws_sigv4 -t ./
 cd ..
 zip -r python_modules.zip .


 aws lambda publish-layer-version --layer-name Data-requests --description "My Python layer" --zip-file fileb://python_modules.zip --compatible-runtimes python3.x
import base64
 import gzip
 import json
 import logging
 import json
 import jmespath
 import requests
 from datetime import datetime
 from requests_auth_aws_sigv4 import AWSSigV4
 import boto3


 LOGGER = logging.getLogger(__name__)
 LOGGER.setLevel(logging.INFO)


 def lambda_handler(event, context):

    """Extract the data from the event"""

    data = jmespath.search("awslogs.data", event)

    """Decompress the logs"""

    cwLogs = decompress_json_data(data)

    """Construct the payload to send to OpenSearch Ingestion"""

    payload = prepare_payload(cwLogs)

    print(payload)

    """Ingest the set of events to the pipeline"""    

    response = ingestData(payload)

    return {

        'statusCode': 200

    }
 def decompress_json_data(data):

    compressed_data = base64.b64decode(data)

    uncompressed_data = gzip.decompress(compressed_data)

    return json.loads(uncompressed_data)


 def prepare_payload(cwLogs):

    payload = []

    logEvents = cwLogs['logEvents']

    for logEvent in logEvents:

        request = {}

        request['id'] = logEvent['id']

        dt = datetime.fromtimestamp(logEvent['timestamp'] / 1000) 

        request['timestamp'] = dt.isoformat()

        request['message'] = logEvent['message'];

        request['owner'] = cwLogs['owner'];

        request['log_group'] = cwLogs['logGroup'];

        request['log_stream'] = cwLogs['logStream'];

        payload.append(request)

    return payload

 def ingestData(payload):

    ingestionEndpoint = '{OpenSearch Pipeline Endpoint}'

    endpoint = 'https://' + ingestionEndpoint

    headers = {'Content-Type': 'application/json', 'Accept':'application/json'}

    r = requests.request('POST', f'{endpoint}/logs/ingest', json=payload, auth=AWSSigV4('osis'), headers=headers)

    LOGGER.info('Response received: ' + r.text)

    return r
  • Replace {OpenSearch Pipeline Endpoint}’ with the endpoint of your OpenSearch Ingestion pipeline.
  • Attach the following inline policy in execution role.
{

    "Version": "2012-10-17",

    "Statement": [

        {

            "Sid": "PermitsWriteAccessToPipeline",

            "Effect": "Allow",

            "Action": "osis:Ingest",

            "Resource": "arn:aws:osis:{region}:{accountId}:pipeline/{OpenSearch Pipeline Name}"

        }

    ]
 }
  • Deploy the function.

Step 5: Set up a CloudWatch Logs subscription

  • Grant permission to a specific AWS service or AWS account to invoke the specified Lambda function. The following command grants permission to the CloudWatch Logs service to invoke the cloud-logs Lambda function for the specified log group. This is necessary because CloudWatch Logs cannot directly invoke a Lambda function without being granted permission. Run the following command in CloudShell to add permission.
aws lambda add-permission
 --function-name "{function name}"
 --statement-id "{function name}"
 --principal "logs.amazonaws.com"
 --action "lambda:InvokeFunction"
 --source-arn "arn:aws:logs:{region}:{accountId}:log-group:{log_group}:*"
 --source-account "{accountId}"
  • Create a subscription filter for a log group. The following command creates a subscription filter on the log group, which forwards all log events (because the filter pattern is an empty string) to the Lambda function. Run the following command in Cloudshell to create the subscription filter.
aws logs put-subscription-filter
 --log-group-name {log_group}
 --filter-name {filter name}
 --filter-pattern ""
 --destination-arn arn:aws:lambda:{region}:{accountId}:function:{function name}

Step 6: Testing and verification

  • Generate some logs in your CloudWatch log group. Run the following command in Cloudshell to create sample logs in log group.
aws logs put-log-events --log-group-name {log_group} --log-stream-name {stream_name} --log-events "[{\"timestamp\":{timestamp in millis} , \"message\": \"Simple Lambda Test\"}]"
  • Check the OpenSearch collection to ensure logs are indexed correctly.

Clean up

Remove the infrastructure for this solution when not in use to avoid incurring unnecessary costs.

Conclusion

You saw how to set up a pipeline to send CloudWatch logs to an OpenSearch Serverless collection within a VPC. This integration uses CloudWatch for log aggregation, Lambda for log processing, and OpenSearch Serverless for querying and visualization. You can use this solution to take advantage of the pay-as-you-go pricing model for OpenSearch Serverless to optimize operational costs for log analysis.

To further explore, you can:


About the Authors

Balaji Mohan is a senior modernization architect specializing in application and data modernization to the cloud. His business-first approach ensures seamless transitions, aligning technology with organizational goals. Using cloud-native architectures, he delivers scalable, agile, and cost-effective solutions, driving innovation and growth.

Souvik Bose is a Software Development Engineer working on Amazon OpenSearch Service.

Muthu Pitchaimani is a Search Specialist with Amazon OpenSearch Service. He builds large-scale search applications and solutions. Muthu is interested in the topics of networking and security, and is based out of Austin, Texas.