亚马逊AWS官方博客

搭建云上日志收集分析系统 (EKK Amazon Elasticsearch Service, Amazon Kinesis, and Kibana)

背景说明

应用系统的日志收集与分析工作对运维来说至关重要。常见的系统解决方案中开源技术栈ELK(Elastic Stack: Elasticsearch, Logstash, Kibana)是当前比较流行的选择。下面我们会讨论另一种构建于云原生设计的类似于ELK的解决方案EKK(Amazon Elasticsearch Service, Amazon Kinesis, and Kibana)。

EKK的优势在于组件是AWS托管服务,不必自己安装、运维,并且与AWS的其它服务轻松集成,可以很轻松的部署一套可靠、可扩展、安全、容错以及解耦和基于事件的解决方案。

传统的Elasticsearch中,日志数据的不断膨胀,对数据的生命周期管理越来越重要(应对此需求的新功能ILM(index lifecycle management)在Elasticsearch 7.0中闪亮登场)。本文不介绍ILM,介绍另一种解决方案:使用Lambda配合实现数据的轮换。

文中使用的AWS服务简介

1. Amazon Elasticsearch Service

是一项完全托管的服务,方便您部署、保护和运行大量 Elasticsearch 操作,且不用停机。该服务提供开源 Elasticsearch API、受托管的 Kibana 以及与 Logstash 和其他 AWS 服务的集成,支持您安全获取任何来源的数据,并开展实时搜索、分析和可视化。

2. Amazon Kinesis Firehose

Firehose 是将流数据可靠地加载到数据湖、数据存储和分析工具的最简单方式。它可以捕获、转换流数据并将其加载到 Amazon S3、Amazon Redshift、Amazon Elasticsearch Service 和 Splunk,让您可以借助正在使用的现有商业智能工具和控制面板进行近乎实时的分析。这是一项完全托管的服务,可以自动扩展以匹配数据吞吐量,并且无需持续管理。它还可以在加载数据前对其进行批处理、压缩和加密,从而最大程度地减少目的地使用的存储量,同时提高安全性。

本文中,Firehouse会获取并自动加载日志的流式数据到Amazon ES里,并在S3中还会再进行一次备份。

3. AWS Lambda

通过 AWS Lambda,无需预置或管理服务器即可运行代码。您只需按使用的计算时间付费 – 代码未运行时不产生费用。

借助 Lambda,您几乎可以为任何类型的应用程序或后端服务运行代码,而且完全无需管理。只需上传您的代码,Lambda 会处理运行和扩展高可用性代码所需的一切工作。您可以将您的代码设置为自动从其他 AWS 产品触发,或者直接从任何 Web 或移动应用程序调用。

本文中使用Lambda实现Elasticsearch Index的rotate功能,将历史数据从ES中移除,解决由历史数据太大造成的性能下降,成本上升等问题。

并使用Lambda实现了,将S3中的历史数据文件Load回ES,借助ES进行临时性历史日志分析。

4. Amazon CloudWatch

Amazon CloudWatch 是一种面向开发运营工程师、开发人员、站点可靠性工程师 (SRE) 和 IT 经理的监控和可观测性服务。CloudWatch 为您提供相关数据和切实见解,以监控应用程序、响应系统范围的性能变化、优化资源利用率,并在统一视图中查看运营状况。CloudWatch 以日志、指标和事件的形式收集监控和运营数据,让您能够在统一查看在 AWS 和本地服务器上运行的资源、应用程序和服务。您可以使用 CloudWatch 检测环境中的异常行为、设置警报、并排显示日志和指标、执行自动化操作、排查问题,以及发现可确保应用程序正常运行的见解。

5. Amazon Athena

Amazon Athena 是一种交互式查询服务,让您能够轻松使用标准 SQL 分析 Amazon S3 中的数据。Athena 没有服务器,因此您无需管理任何基础设施,且只需为您运行的查询付费。

Athena 简单易用。只需指向您存储在 Amazon S3 中的数据,定义架构并使用标准 SQL 开始查询。就可在数秒内获取最多的结果。使用 Athena,无需执行复杂的 ETL 作业来为数据分析做准备。这样一来,具备 SQL 技能的任何人都可以轻松快速地分析大规模数据集。

本文最后简单介绍实用Athena服务,如何直接进行基于S3全量日志的分析工作。

6. AWS Identity and Access Management (IAM)

AWS Identity and Access Management (IAM) 使您能够安全地管理对 AWS 服务和资源的访问。您可以使用 IAM 创建和管理 AWS 用户和组,并使用各种权限来允许或拒绝他们对 AWS 资源的访问。

架构

1. EKK 和 数据Rotate架构

2. Load 历史数据到ES架构

在IAM服务中创建身份授权IAM Role

1. 为需要收集日志的EC2,创建Role:EKK-EC2

2. 为Kinesis Firehose 创建Role: EKK-Firehose

设置权限

在Amazon Elasticsearch服务中创建创建 ES cluster

1. 计算存储要求

源数据 * (1 + 副本数量) * 1.45 = 最小存储要求

(元数据 + 增长空间) * (1 + 索引开销) / 所需的分片大小 = 主分片的大约数量

尽量使分片大小保持在 10–50GiB 之间是一种比较好的做法。

2. 实例类型

尝试从每 100GiB 存储要求更接近 2 个 vCPU 核心和 8GiB 内存的配置开始。

3. 专用主节点

实例计数 推荐的最小专用主实例类型
1–10 c5.large.elasticsearch
10–30 c5.xlarge.elasticsearch
30–75 c5.2xlarge.elasticsearch
75–200 r5.4xlarge.elasticsearch

参考https://docs.amazonaws.cn/elasticsearch-service/latest/developerguide/sizing-domains.html

4. 控制台创建集群


IP地址设置为自己的IP地址CIDR段,此CIDR段之外地址无权访问ES、Kibana。

5. 可获得ES Cluster 和 Kibana的地址


在Kinesis服务中创建Kinesis Firehose

配置需要收集日志服务器

1. 在EC2服务中为需要收集日志的EC2分配角色

分配角色

Linux日志环境

安装Agent

$sudo yum install –y aws-kinesis-agent

或者

$sudo yum install –y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-latest.amzn1.noarch.rpm

打开并编辑配置文件:/etc/aws-kinesis/agent.json

{

"cloudwatch.emitMetrics": true,

"firehose.endpoint": "firehose---cn-north-1.amazonaws.com.rproxy.goskope.com.cn",

"flows": [

{

"filePattern": "/var/log/httpd/access_log*",

"deliveryStream": "EKK-LogFirehose-apachelog",

"dataProcessingOptions": [

{

"optionName": "LOGTOJSON",

"logFormat": "COMMONAPACHELOG"

}

]



}

]

}

$sudo service aws-kinesis-agent start

$sudo chkconfig aws-kinesis-agent on

2. Windows日志环境

下载安装Agent

https://s3-us-west-2.amazonaws.com/kinesis-agent-windows/downloads/AWSKinesisTap.1.1.168.1.msi

C:\Program Files\Amazon\AWSKinesisTap\appsettings.json

{

"Sources": [

{

"Id": "PerformanceCounter",

"SourceType": "WindowsPerformanceCounterSource",

"Categories": [

{

"Category": "Server",

"Counters": [

"Files Open",

"Logon Total"

]

},

{

"Category": "LogicalDisk",

"Instances": "*",

"Counters": [

"% Free Space",

{

"Counter": "Disk Reads/sec",

"Unit": "Count/Second"

}

]

}

],

}

],

"Sinks": [

{

"Namespace": "MyServiceMetrics",

"Region": "cn-north-1",

"Id": "CloudWatchSink",

"SinkType": "CloudWatch"

},

{

"Id": "WindowsLogKinesisFirehoseSink",

"SinkType": "KinesisFirehose",

"StreamName": "EKK-LogFirehose-iislog",

"Region": "cn-north-1",

"QueueType": "file"

}

],

"Pipes": [

{

"Id": "PerformanceCounterToCloudWatch",

"SourceRef": "PerformanceCounter",

"SinkRef": "WindowsLogKinesisFirehoseSink"

}

]

}

参考: https://docs.aws.amazon.com/zh_cn/kinesis-agent-windows/latest/userguide/getting-started.html

【可选】模拟一个Apache Log环境为后面的测试验证

在EC2服务中创建一个Linux EC2

chmod 400 EKK-test.pem

ssh -i EKK-test.pem ec2-user@52.81.85.86

 

安装Kinesis Agent

$sudo yum install –y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-latest.amzn1.noarch.rpm

配置Agent

$sudo vi /etc/aws-kinesis/agent.json

{

"cloudwatch.emitMetrics": true,

"firehose.endpoint": "firehose---cn-north-1.amazonaws.com.rproxy.goskope.com.cn",

"flows": [

{

"filePattern": "/var/log/httpd/access_log*",

"deliveryStream": "EKK-LogFirehose-apachelog",

"dataProcessingOptions": [

{

"optionName": "LOGTOJSON",

"logFormat": "COMMONAPACHELOG"

}

]



}

]

}

$sudo service aws-kinesis-agent start

$sudo chkconfig aws-kinesis-agent on

 

$sudo mkdir /var/log/httpd

安装Fake日志生成程序

参考 https://github.com/kiritbasu/Fake-Apache-Log-Generator

$sudo yum install -y git

$git clone https://github.com/kiritbasu/Fake-Apache-Log-Generator.git

 

$sudo yum install python-pip -y

$cd Fake-Apache-Log-Generator/

$sudo pip install -r requirements.txt

 

产生日志脚本

$cd ~

$sudo vi test.sh

#!/bin/bash

# chkconfig: 2345 10 90

cd /var/log/httpd/

while true

do

sudo python /home/ec2-user/Fake-Apache-Log-Generator/apache-fake-log-gen.py -n 100 -o LOG

sleep 10

done

后台运行脚本:

$sudo sh ./test.sh &

 

Fake日志产生在目录 /var/log/httpd 中。

【可选】可使用Systems Manager(SSM)安装Agent

1. 为EC2的Role EKK-EC2 附加SSM需要的权限

2. 使用SSM服务

Linux 可使用AWS-RunShellScript



运行Shell的目标EC2 可直接选择,或者按Tag筛选

Windows 类似,可使用AWS-RunPowerShellScript 运行powershell script

使用Lambda在ES轮换数据数据(Rotate)

1. 为Lambda设置IAM Role

2. 在Lambda中创建个Layer(层)

把项目需要的依赖包放到层里,方便Lambda的使用。

层参考: https://docs.aws.amazon.com/zh_cn/lambda/latest/dg/configuration-layers.html

 

参考命令:

$mkdir python

$pip3 install elasticsearch-curator -t ./python/

$zip -q -r layer.zip ./python

3. 创建Lambda: ES-Rotate

设置Lambda运行的Layers

保存

设置运行代码,点击

在下面Function code区域更新代码,host地址换成我们创建的ES Cluster地址


import json

import boto3

from requests_aws4auth import AWS4Auth

from elasticsearch import Elasticsearch, RequestsHttpConnection

import curator



host = 'search-ekk-log-vpfpqvgbxnom3ctwvz5evv2du4---cn-north-1---es.amazonaws.com.rproxy.goskope.com.cn' # For example, search-my-domain.region.es.amazonaws.com

region = 'cn-north-1' # For example, us-west-1

service = 'es'

credentials = boto3.Session().get_credentials()

awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)





def lambda_handler(event, context):

es = Elasticsearch(

hosts = [{'host': host, 'port': 443}],

http_auth = awsauth,

use_ssl = True,

verify_certs = True,

connection_class = RequestsHttpConnection

)



index_list = curator.IndexList(es)

index_list.filter_by_age(source='name', direction='older', timestring='%Y-%m-%d', unit='days', unit_count=1)

print("Found %s indices to delete" % len(index_list.indices))



# If our filtered list contains any indices, delete them.

if index_list.indices:

curator.DeleteIndices(index_list).do_action()





# TODO implement

return {

'statusCode': 200,

'body': json.dumps('Hello from Lambda!')

}

修改Lambda 运行时内存和超时时间

4. 测试

定义测试用例输入参数,

因此Lambda测试用例,不需输入参数,可使用默认设置,

运行测试用例

在ES中查看

参考 https://docs.aws.amazon.com/zh_cn/elasticsearch-service/latest/developerguide/curator.html

5. 在CloudWatch设置定时轮换

Load to ES from S3

1. 创建Lambad s3-to-es-bulk-by-hour

2. 设置Layer

3. 设置Code

Bucket 设置为Kinesis Firehose 中设置的bucket

Host 设置为ES 的endpoint

import boto3

import re

import requests

from requests_aws4auth import AWS4Auth

import json

from elasticsearch.helpers import bulk

import time

from elasticsearch import Elasticsearch, RequestsHttpConnection





region = 'cn-north-1' # e.g. us-west-1

service = 'es'

credentials = boto3.Session().get_credentials()

awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)



bucket = 'zhnc-ekk-full-log'



host = 'search-ekk-log-vpfpqvgbxnom3ctwvz5evv2du4---cn-north-1---es.amazonaws.com.rproxy.goskope.com.cn'

type = 'log'

headers = { "Content-Type": "application/json" }



s3 = boto3.client('s3', region_name=region)



def get_all_s3_keys(bucket, prefix):

keys = []

kwargs = {'Bucket': bucket, 'Prefix':prefix}

while True:

resp = s3.list_objects_v2(**kwargs)

for obj in resp['Contents']:

keys.append(obj['Key'])

try:

kwargs['ContinuationToken'] = resp['NextContinuationToken']

except KeyError:

break

return keys





# Lambda execution starts here

def lambda_handler(event, context):



print(event)

for record in event['Records']:

msg = json.loads(json.dumps(eval(record['Sns']['Message'])))

year=msg['year']

month=msg['month']

day=msg['day']

hour=msg['hour']

index = 'apachelog-{}-{}-{}-{}'.format(year,month,day,hour)



print(index)



keys = get_all_s3_keys(bucket,"apachelog/{}/{}/{}/{}".format(year,month,day,hour))



print(len(keys));



es = Elasticsearch(

hosts = [{'host': host, 'port': 443}],

http_auth = awsauth,

use_ssl = True,

verify_certs = True,

connection_class = RequestsHttpConnection

)





for key in keys:



ACTIONS = []

obj = s3.get_object(Bucket=bucket, Key=key)

body = obj['Body'].read()

lines = body.splitlines()



for line in lines:



document = json.loads(line)

action = {

"_index": index,

"_type": type,

"_source": document

}

ACTIONS.append(action)

success, _ = bulk(es, ACTIONS, index=index, raise_on_error=True)

print('Performed %d actions' % success)

4. 设置运行内存,超时时间

5. 测试

这个Lambda将被SNS触发,创建一个模拟SNS的事件,

消息Message 设置为将要测试的事件,Lambda会读取对应的S3文件到ES中,

消息内容为,

 

{

"Records": [

{

"EventSource": "aws:sns",

"EventVersion": "1.0",

"EventSubscriptionArn": "arn:aws-cn:sns:cn-north-1: 725362542198:s3-to-es-by-day:caf9a3b1-679c-4604-9b65-f15dca3b5b18",

"Sns": {

"Type": "Notification",

"MessageId": "71c6da65-49bf-5301-8270-8ff199faaa1b",

"TopicArn": "arn:aws-cn:sns:cn-north-1: 725362542198:s3-to-es-by-day",

"Subject": "None",

"Message": "{'year':'2019','month':'06','day':'30','hour':'02'}",

"Timestamp": "2019-07-02T03: 22: 45.020Z",

"SignatureVersion": "1",

"Signature": "ihaGN/JL8u/v57xEY1RTFekpUpgVukM9Ebj9IIM9Rr9KGkUMe6dO7hze7estD0yM9K0QRQAreQ5XiB0Tfj/jOCvyjL9IrRcTplQcWPzMHmVqd4C3942gduFkHyul2+lYa0DJZM46J/Yy7mihe9EfXUySf2Eyok4NsUC6WtnbyJPN17FG1t4fnEWpRwU2Yg+MLM4bJWr3sK5/6xRnUVerLlMm5tCsynybW6FQCYsVgl7SJLW6nBmbCe3v6jRMuKCNW8xptVyEAnII4h5uPVElts0IWhnE+EQG3FNFmOZmj8OLZutRadSrNFexRMZebmKwRZRD5dTaCoD5E6v6TTYGbQ==",

"SigningCertUrl": "https: //sns---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/SimpleNotificationService-3250158c6506d40f628c21ed8dad1787.pem",

"UnsubscribeUrl": "https://sns---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/?Action=Unsubscribe&SubscriptionArn=arn:aws-cn:sns:cn-north-1:725362542198:s3-to-es-by-day:caf9a3b1-679c-4604-9b65-f15dca3b5b18",

"MessageAttributes": {}

}

}

]

}

此事例将读取以下位置的日志文件

点击测试

ES Index已经导入

创建一个SNS Topic(s3-to-es-by-hour) 触发Lambda(s3-to-es-bulk-by-hour)

1. 创建Topic

进入SNS服务页面

2. 订阅事件

3. 测试

消息

{"year":"2019","month":"06","day":"30","hour":"05"}

复制Topic的ARN,下面Lambda会发送消息进来

创建Lambda split-day-to-24-hour

负责把按天ES  index的导入事件拆分成24个按小时导入的事件

1. 更新代码

替换SNS Topic ARN 为上面创建的Topic

 

import json

import boto3



sns = boto3.client('sns')



def lambda_handler(event, context):

for record in event['Records']:

msg = json.loads(json.dumps(eval(record['Sns']['Message'])))



# TODO implement

for i in range(0,24) :

msg['hour'] = "%02d" % i

print(msg)



response = sns.publish(

TopicArn='arn:aws-cn:sns:cn-north-1:725362542198:s3-to-es-by-hour',

Message=json.dumps(msg),

)



return {

'statusCode': 200,

'body': json.dumps('Hello from Lambda!')

}

2. 创建SNS Topic 用于触发此Lambda

3. 订阅Lambda

4. 测试

{"year":"2019","month":"06","day":"30"}

2019年6月30日的日志文件,会并发为24个Lambda按小时导入到ES

Athena query data in S3 using SQL

1. 基于S3路径,创建Table

 

CREATE EXTERNAL TABLE IF NOT EXISTS log.apachelogtest (

'bytes' string,

'datetime' string,

'host' string,

'request' string,

'response' string,

'datetime' string

)

ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'

WITH SERDEPROPERTIES (

'serialization.format' = '1'

) LOCATION 's3://poc-zhnc-hive/log/'

TBLPROPERTIES ('has_encrypted_data'='false');

2. 使用Table查询

本篇作者

陈朕

AWS解决方案架构师,负责基于AWS云计算方案架构的咨询和设计,在国内推广AWS云平台技术和各种解决方案。十余年分布式应用、大数据的分布式处理经验。