AWS Partner Network (APN) Blog

Filter and Stream Logs from Amazon S3 Logging Buckets into Splunk Using AWS Lambda

By Ameya Paldhikar, Partner Solutions Architect – AWS
By Marc Luescher, Sr. Solutions Architect – AWS

Splunk-AWS-Partners-2022
Splunk
Connect with Splunk-1.1

Amazon Web Services (AWS) customers of all sizes–from growing startups to large enterprises–manage multiple AWS accounts. Following the prescriptive guidance from AWS for multi-account management, customers typically choose to perform centralization of the AWS log sources (AWS CloudTrail logs, VPC flow logs, AWS Config logs) from their multiple AWS accounts within Amazon Simple Storage Service (Amazon S3) buckets in a dedicated log archive account.

The volume of logs stored in these centralized S3 buckets can be extremely high (multiple TBs/day) depending on the number of AWS accounts and the size of workload. In order to ingest the logs from S3 buckets in Splunk, customers normally use the Splunk add-on for AWS. This is deployed on Splunk Heavy Forwarders, which act as dedicated pollers to pull the data from S3.

These servers also need an ability to scale horizontally as the data ingestion volume increases in order to support near real-time ingestion of logs. This approach involves an additional overhead of managing the deployment, and there’s an increased cost for running this dedicated infrastructure.

Consider another use case where you want to optimize ingest license costs in Splunk by filtering and forwarding only a subset of logs from the S3 buckets to Splunk. An example of this is ingesting only the rejected traffic within the VPC flow logs where the field “action” == “REJECT”. The pull-based log ingestion approach currently does not offer a way to achieve that.

This post showcases a way to filter and stream logs from centralized Amazon S3 logging buckets to Splunk using a push mechanism leveraging AWS Lambda. The push mechanism offers benefits such as lower operational overhead, lower costs, and automated scaling. We’ll provide instructions and a sample Lambda code that filters virtual private cloud (VPC) flow logs with “action” flag set to “REJECT” and pushes it to Splunk via a Splunk HTTP Event Collector (HEC) endpoint.

Splunk is an AWS Specialization Partner and AWS Marketplace Seller with Competencies in Cloud Operations, Data and Analytics, DevOps, and more. Leading organizations use Splunk’s unified security and observability platform to keep their digital systems secure and reliable.

Architecture

The architecture diagram in Figure 1 illustrates the process for ingesting the VPC flow logs into Splunk using AWS Lambda.

Architecture for Splunk Ingestion using AWS Lambda

Figure 1 – Architecture for Splunk ingestion using AWS Lambda.

  1. VPC flow logs for one or multiple AWS accounts are centralized in a logging S3 bucket within the log archive AWS account.
  2. The S3 bucket sends an “object create” event notification to an Amazon Simple Queue Service (SQS) queue for every object stored in the bucket.
  3. A Lambda function is created with Amazon SQS as event source for the function. This function polls the messages from SQS in batches, reads the contents of each event notification, and identifies the object key and corresponding S3 bucket name.
  4. The function then makes a “GetObject” call to the S3 bucket and retrieves the object. The Lambda function filters out the events that do not have the “action” flag as “REJECT”.
  5. The Lambda function streams the filtered VPC flow logs to Splunk HTTP Event Collector.
  6. VPC flow logs are ingested and are available for searching within Splunk.
  7. If Splunk is unavailable, or if any error occurs while forwarding logs, the Lambda function forwards those events to a backsplash S3 bucket.

Prerequisites

The following prerequisites exist at a minimum:

  • Publish VPC flow logs to Amazon S3 – Configure VPC flow logs to be published to an S3 bucket within your AWS account.
  • Create an index in Splunk to ingest the VPC flow logs.

Step 1: Splunk HTTP Event Collector (HEC) Configuration

To get started, we need to set up Splunk HEC to receive the data before we can configure the AWS services to forward the data to Splunk.

  • Access Splunk web, go to Settings, and choose Data inputs.

Figure 2 – Splunk data inputs.

  • Select HTTP Event Collector and choose New Token.
  • Configure the new token as per the details shown in Figure 3 below and click Submit. Verify the Source Type is set as aws:cloudwatchlogs:vpcflow.

Figure 3 – Splunk HEC token configuration.

  • Once the Token has been created, choose Global Settings, verify that All Tokens have been enabled, and click Save.

Step 2: Splunk Configurations

Next, we need to add configurations within the Splunk server under props.conf to verify that line breaking, time stamp, and field extractions are configured correctly. Copy the contents below in props.conf in $SPLUNK_HOME/etc/system/local/. For more information regarding these configurations, refer to the Splunk props.conf documentation.

[aws:cloudwatchlogs:vpcflow]
BREAK_ONLY_BEFORE_DATE = false
CHARSET=UTF-8
disabled=false
pulldown_type = true
SHOULD_LINEMERGE=false
LINE_BREAKER=\'\,(\s+)
NO_BINARY_CHECK=true
KV_MODE = json
TIME_FORMAT = %s
TIME_PREFIX = ^(?>\S+\s){10}
MAX_TIMESTAMP_LOOKAHEAD = 10  # makes sure account_id is not used for timestamp
## Replace unrequired characters from the VPC Flow logs list with blank values
SEDCMD-A = s/\'//g 
SEDCMD-B = s/\"|\,//g

## Extraction of fields within VPC Flow log events ##
EXTRACT-vpcflowlog=^(?<version>2{1})\s+(?<account_id>[^\s]{7,12})\s+(?<interface_id>[^\s]+)\s+(?<src_ip>[^\s]+)\s+(?<dest_ip>[^\s]+)\s+(?<src_port>[^\s]+)\s+(?<dest_port>[^\s]+)\s+(?<protocol_code>[^\s]+)\s+(?P<packets>[^\s]+)\s+(?<bytes>[^\s]+)\s+(?<start_time>[^\s]+)\s+(?<end_time>[^\s]+)\s+(?<vpcflow_action>[^\s]+)\s+(?<log_status>[^\s]+)

Step 3: Create SQS to Queue Event Notifications

Whenever a new object (log file) is stored in an Amazon S3 bucket, an event notification is forwarded to an SQS queue. Follow the steps below to create the SQS queue and configure a log centralization S3 bucket to forward event notifications.

  • Access the Amazon SQS console in your AWS account and choose Create Queue.
  • Select Standard type and choose a Queue name.
  • Within Configurations, increase the Visibility timeout to 5 minutes, and the Message retention period to 14 days. Refer to the screenshot below for these configurations.

Figure 4 – Amazon SQS configuration.

  • Enable Encryption for at-rest encryption for your queue.
  • Configure the Access policy as shown below to provide the S3 bucket with permissions to send messages to this SQS queue. Replace the placeholders in <> with the specific values for your environment.
    {
        "Version": "2012-10-17",
        "Id": "Queue1_Policy_UUID",
        "Statement": [
          {
            "Sid": "Queue1_Send",
            "Effect": "Allow",
            "Principal": {
              "Service": "s3.amazonaws.com"
            },
            "Action": "sqs:SendMessage",
            "Resource": "<arn_of_this_SQS>",
            "Condition": {
              "StringEquals": {
                "aws:SourceAccount": "<your_AWS_Account_ID>"
              },
              "ArnLike": {
                "aws:SourceArn": "<arn_of_the_log_centralization_S3_bucket>"
              }
            }
          }
        ]
      }
  • Enable Dead-letter queue so that messages that aren’t processed from this queue will be forwarded to the dead-letter queue for further inspection.

Step 4: Forward Amazon S3 Event Notifications to SQS

Now that the SQS queue has been created, follow the steps below to configure the VPC flow log S3 bucket to forward the event notifications for all object create events to the queue.

  • From the Amazon S3 console, access the centralized S3 bucket for VPC flow logs.
  • Select the Properties tab, scroll down to Event notifications, and choose Create event notifications.
  • Within General configurations, provide an appropriate Event name. Under Event types, select All object create events. Under Destination, choose SQS queue and select the SQS queue we created in the previous step. Click on Save changes and the configuration should look like this:

Figure 5 - S3 Event Notification

Figure 5 – Amazon S3 event notifications.

Step 5: Create a Backsplash Amazon S3 bucket

Now, let’s create a backsplash S3 bucket to verify that filtered data is not lost, in case the AWS Lambda function is unable to deliver data to Splunk. The Lambda function sends the filtered logs to this bucket whenever the delivery to Splunk fails. Please follow the steps in this documentation to create an S3 bucket.

Step 6: Create an AWS IAM Role for the Lambda Function

  • From the AWS IAM console, access the Policies menu and select Create Policy.
  • Select JSON as the Policy Editor option and paste the policy below. Replace the placeholders in <> with specific values for your environment. Once done, click Next.
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "lambdaPutObject",
                "Effect": "Allow",
                "Action": "s3:PutObject",
                "Resource": "arn:aws:s3:::<your_backsplash_s3_name>/*"
            },
            {
                "Sid": "lambdaGetObject",
                "Effect": "Allow",
                "Action": "s3:GetObject",
                "Resource": "arn:aws:s3:::<your_log_centralization_s3_name>/*"
            },
            {
                "Sid": "lambdaSqsActions",
                "Effect": "Allow",
                "Action": [
                    "sqs:ReceiveMessage",
                    "sqs:DeleteMessage",
                    "sqs:GetQueueAttributes"
                ],
                "Resource": "<arn_of_the_SQS_Queue>" 
            }
        ]
    }
  • Enter a name and description for the policy, and select Create Policy.
  • From the IAM console, access Roles and select Create role.
  • Under Use Case, select Lambda and click on Next.
  • On the Add Permissions page, select the AWS managed AWSLambdaBasicExecutionRole policy and the custom policy we just created prior to creating this role. Choose Next once both the policies are selected.
  • Enter an appropriate role name and then choose Create role.

Step 7: Create Lambda Function to Filter and Push Logs to Splunk

  • Access the AWS Lambda console and choose Create function.
  • Under Basic information, enter an appropriate Function name and under Runtime choose the latest supported runtime for Python.
  • Expand the Change default execution role option, select Use an existing role, and select the role we created in the previous section.
  • Keep all other settings as default and select Create function.
  • Once the function is created, select the Configuration tab within the function and edit the General configuration. Change the Timeout value to 5 min and click Save.
  • Edit the Environment variables and add these key-value pairs. Make sure you replace the placeholders in <> with the appropriate values based on your environment. Once the environment variables are added, click Save:
    • backup_s3 = <backsplash_Amazon_S3_bucket_name_created_in_the earlier_section>
    • splunk_hec_token = <your_splunk_hec_token>
    • splunk_hec_url = <your_splunk_url>:8088/services/collector/raw
  • Select the Code tab within your function and update the lambda_function.py with the Python code below. You can also access the Python code from the lambda_splunk_function.py file within this GitHub repository.
import os
import boto3
import json
import gzip
import urllib3
import logginglogger = logging.getLogger()
logger.setLevel(logging.INFO)

s3 = boto3.client('s3')
splunk_hec_url = os.environ['splunk_hec_url']
splunk_hec_token = os.environ['splunk_hec_token']
s3_bucket_name = os.environ['backup_s3']

def write_to_backup_s3(data, key):
data_bytes=bytes(json.dumps(data).encode())
compressed_data = gzip.compress(data_bytes)
try:
response = s3.put_object(
Bucket = s3_bucket_name,
Key = key,
Body = compressed_data
)
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
logger.info('Object written to S3 successfully')

except Exception as e:
logger.info(f"Error writing object to S3: {e}")
return

def send_to_splunk_hec(data_list):
data = str(data_list)[1:-1]

headers = {
"Authorization": "Splunk " + splunk_hec_token
}

http = urllib3.PoolManager(timeout=20)

try:
response = http.request(
"POST",
splunk_hec_url,
headers=headers,
body=json.dumps(data)
)
logger.info(f"Splunk HEC Response: {response.status} - {response.data}")
return response.status

except Exception as e:
logger.info(f"HTTP POST error: {e}")
return None

def filter_data(obj):
logs_to_send = []
content = gzip.decompress(obj['Body'].read()).decode('utf-8')
flow_log_records = content.strip().split('\n')
for record in flow_log_records:
fields = record.strip().split()
action = fields[12]
logger.info(f"Action: {action}")
if action == "REJECT":
logs_to_send.append(record)
logger.info(f"logs_to_send = {logs_to_send}")
return logs_to_send

def get_object(bucket, key):
try:
obj = s3.get_object(Bucket=bucket, Key=key)
return obj

except Exception as e:
logger.info(f"Error retrieving S3 object: {e}")
return None

def lambda_handler(event, context):
for record in event['Records']:
body = record['body']
logger.info(f"received sqs message: {body}")
#Parse the json message
message = json.loads(body)
try:
#extract s3 key and bucket name from the message
bucket = message['Records'][0]['s3']['bucket']['name']
s3_key = message['Records'][0]['s3']['object']['key']
#log the bucket and s3_key parameters
logger.info(f"bucket: {bucket}")
logger.info(f"s3 key: {s3_key}")

except Exception as e:
logger.info(f"Error retrieving S3 bucket name and/or object key from message: {e}")

#if bucket and s3_key are not null, invoke get_object function
if not (bucket or s3_key):
continue
obj = get_object(bucket, s3_key)
if not obj:
continue
filtered_data = filter_data(obj)
logger.info(f"filtered data = {filtered_data}")
if filtered_data:
status = send_to_splunk_hec(filtered_data)
logger.info(f"status: {status}")
if status != 200:
write_to_backup_s3(filtered_data, s3_key)

return
  • Configuration tab within the function and select Triggers.
  • Click Add trigger and select SQS.
  • Under the SQS queue drop-down, select the SQS we configured to store the S3 object-create event notifications.
  • Select Activate trigger. Keep all other settings as default and select Add.

Step 8: Searching the VPC Flow Logs in Splunk

Once the Lambda function is created and the SQS trigger has been activated, the function immediately starts forwarding the VPC flow logs to Splunk.

  • Open the Splunk console and navigate to the Search tab within the Searching and Reporting app.
  • Run the following SPL query to view the ingested VPC flow log records. Replace the placeholder in <> with the appropriate Splunk index name: index = <insert_index_name> sourcetype = “aws:cloudwatchlogs:vpcflow”

Figure 6 - Searching VPC Flow Logs in Splunk

Figure 6 – Searching VPC flow logs in Splunk.

Conclusion

This post delved into how you can filter and ingest virtual private cloud (VPC) VPC flow logs into Splunk with the help of an AWS Lambda function. VPC flow logs was used as an example, but a similar architectural pattern can be replicated for multiple log types stored in Amazon S3 buckets.

The code example provides you with an extendable framework to ingest AWS and non-AWS logs centralized in Amazon S3 into Splunk using the push-based mechanism. The filtering capability of the Lambda function can help you to ingest only the logs you’re interested in, thus helping to reduce costs by optimizing the Splunk license utilization.

You can also learn more about Splunk in AWS Marketplace.

.
Splunk-APN-Blog-Connect-2023
.


Splunk – AWS Partner Spotlight

Splunk is an AWS Specialization Partner with Competencies in Cloud Operations, Data and Analytics, DevOps, and more. Leading organizations use Splunk’s unified security and observability platform to keep their digital systems secure and reliable.

Contact Splunk | Partner Overview | AWS Marketplace | Case Studies