背景说明
应用系统的日志收集与分析工作对运维来说至关重要。常见的系统解决方案中开源技术栈ELK(Elastic Stack: Elasticsearch, Logstash, Kibana)是当前比较流行的选择。下面我们会讨论另一种构建于云原生设计的类似于ELK的解决方案EKK(Amazon Elasticsearch Service, Amazon Kinesis, and Kibana)。
EKK的优势在于组件是AWS托管服务,不必自己安装、运维,并且与AWS的其它服务轻松集成,可以很轻松的部署一套可靠、可扩展、安全、容错以及解耦和基于事件的解决方案。
传统的Elasticsearch中,日志数据的不断膨胀,对数据的生命周期管理越来越重要(应对此需求的新功能ILM(index lifecycle management)在Elasticsearch 7.0中闪亮登场)。本文不介绍ILM,介绍另一种解决方案:使用Lambda配合实现数据的轮换。
文中使用的AWS服务简介
1. Amazon Elasticsearch Service
是一项完全托管的服务,方便您部署、保护和运行大量 Elasticsearch 操作,且不用停机。该服务提供开源 Elasticsearch API、受托管的 Kibana 以及与 Logstash 和其他 AWS 服务的集成,支持您安全获取任何来源的数据,并开展实时搜索、分析和可视化。
2. Amazon Kinesis Firehose
Firehose 是将流数据可靠地加载到数据湖、数据存储和分析工具的最简单方式。它可以捕获、转换流数据并将其加载到 Amazon S3、Amazon Redshift、Amazon Elasticsearch Service 和 Splunk,让您可以借助正在使用的现有商业智能工具和控制面板进行近乎实时的分析。这是一项完全托管的服务,可以自动扩展以匹配数据吞吐量,并且无需持续管理。它还可以在加载数据前对其进行批处理、压缩和加密,从而最大程度地减少目的地使用的存储量,同时提高安全性。
本文中,Firehouse会获取并自动加载日志的流式数据到Amazon ES里,并在S3中还会再进行一次备份。
3. AWS Lambda
通过 AWS Lambda,无需预置或管理服务器即可运行代码。您只需按使用的计算时间付费 – 代码未运行时不产生费用。
借助 Lambda,您几乎可以为任何类型的应用程序或后端服务运行代码,而且完全无需管理。只需上传您的代码,Lambda 会处理运行和扩展高可用性代码所需的一切工作。您可以将您的代码设置为自动从其他 AWS 产品触发,或者直接从任何 Web 或移动应用程序调用。
本文中使用Lambda实现Elasticsearch Index的rotate功能,将历史数据从ES中移除,解决由历史数据太大造成的性能下降,成本上升等问题。
并使用Lambda实现了,将S3中的历史数据文件Load回ES,借助ES进行临时性历史日志分析。
4. Amazon CloudWatch
Amazon CloudWatch 是一种面向开发运营工程师、开发人员、站点可靠性工程师 (SRE) 和 IT 经理的监控和可观测性服务。CloudWatch 为您提供相关数据和切实见解,以监控应用程序、响应系统范围的性能变化、优化资源利用率,并在统一视图中查看运营状况。CloudWatch 以日志、指标和事件的形式收集监控和运营数据,让您能够在统一查看在 AWS 和本地服务器上运行的资源、应用程序和服务。您可以使用 CloudWatch 检测环境中的异常行为、设置警报、并排显示日志和指标、执行自动化操作、排查问题,以及发现可确保应用程序正常运行的见解。
5. Amazon Athena
Amazon Athena 是一种交互式查询服务,让您能够轻松使用标准 SQL 分析 Amazon S3 中的数据。Athena 没有服务器,因此您无需管理任何基础设施,且只需为您运行的查询付费。
Athena 简单易用。只需指向您存储在 Amazon S3 中的数据,定义架构并使用标准 SQL 开始查询。就可在数秒内获取最多的结果。使用 Athena,无需执行复杂的 ETL 作业来为数据分析做准备。这样一来,具备 SQL 技能的任何人都可以轻松快速地分析大规模数据集。
本文最后简单介绍实用Athena服务,如何直接进行基于S3全量日志的分析工作。
6. AWS Identity and Access Management (IAM)
AWS Identity and Access Management (IAM) 使您能够安全地管理对 AWS 服务和资源的访问。您可以使用 IAM 创建和管理 AWS 用户和组,并使用各种权限来允许或拒绝他们对 AWS 资源的访问。
架构
1. EKK 和 数据Rotate架构
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana1.png)
2. Load 历史数据到ES架构
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana2.png)
在IAM服务中创建身份授权IAM Role
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/picture replacement1.png)
1. 为需要收集日志的EC2,创建Role:EKK-EC2
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana3.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana4.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana5.png)
2. 为Kinesis Firehose 创建Role: EKK-Firehose
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana6.png)
设置权限
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana7.png)
在Amazon Elasticsearch服务中创建创建 ES cluster
1. 计算存储要求
源数据 * (1 + 副本数量) * 1.45 = 最小存储要求
(元数据 + 增长空间) * (1 + 索引开销) / 所需的分片大小 = 主分片的大约数量
尽量使分片大小保持在 10–50GiB 之间是一种比较好的做法。
2. 实例类型
尝试从每 100GiB 存储要求更接近 2 个 vCPU 核心和 8GiB 内存的配置开始。
3. 专用主节点
实例计数 |
推荐的最小专用主实例类型 |
1–10 |
c5.large.elasticsearch |
10–30 |
c5.xlarge.elasticsearch |
30–75 |
c5.2xlarge.elasticsearch |
75–200 |
r5.4xlarge.elasticsearch |
参考https://docs.amazonaws.cn/elasticsearch-service/latest/developerguide/sizing-domains.html
4. 控制台创建集群
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/picture replacement2.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana8.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana9.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana10.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana11.png)
IP地址设置为自己的IP地址CIDR段,此CIDR段之外地址无权访问ES、Kibana。
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana12.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana13.png)
5. 可获得ES Cluster 和 Kibana的地址
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana14.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/picture replacement3.png)
在Kinesis服务中创建Kinesis Firehose
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana15.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana16.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana17.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana18.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana19.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana20.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana21.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana22.png)
配置需要收集日志服务器
1. 在EC2服务中为需要收集日志的EC2分配角色
分配角色
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana23.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana24.png)
Linux日志环境
安装Agent
$sudo yum install –y aws-kinesis-agent
或者
$sudo yum install –y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-latest.amzn1.noarch.rpm
打开并编辑配置文件:/etc/aws-kinesis/agent.json
{
"cloudwatch.emitMetrics": true,
"firehose.endpoint": "firehose---cn-north-1.amazonaws.com.rproxy.goskope.com.cn",
"flows": [
{
"filePattern": "/var/log/httpd/access_log*",
"deliveryStream": "EKK-LogFirehose-apachelog",
"dataProcessingOptions": [
{
"optionName": "LOGTOJSON",
"logFormat": "COMMONAPACHELOG"
}
]
}
]
}
$sudo service aws-kinesis-agent start
$sudo chkconfig aws-kinesis-agent on
2. Windows日志环境
下载安装Agent
https://s3-us-west-2.amazonaws.com/kinesis-agent-windows/downloads/AWSKinesisTap.1.1.168.1.msi
C:\Program Files\Amazon\AWSKinesisTap\appsettings.json
{
"Sources": [
{
"Id": "PerformanceCounter",
"SourceType": "WindowsPerformanceCounterSource",
"Categories": [
{
"Category": "Server",
"Counters": [
"Files Open",
"Logon Total"
]
},
{
"Category": "LogicalDisk",
"Instances": "*",
"Counters": [
"% Free Space",
{
"Counter": "Disk Reads/sec",
"Unit": "Count/Second"
}
]
}
],
}
],
"Sinks": [
{
"Namespace": "MyServiceMetrics",
"Region": "cn-north-1",
"Id": "CloudWatchSink",
"SinkType": "CloudWatch"
},
{
"Id": "WindowsLogKinesisFirehoseSink",
"SinkType": "KinesisFirehose",
"StreamName": "EKK-LogFirehose-iislog",
"Region": "cn-north-1",
"QueueType": "file"
}
],
"Pipes": [
{
"Id": "PerformanceCounterToCloudWatch",
"SourceRef": "PerformanceCounter",
"SinkRef": "WindowsLogKinesisFirehoseSink"
}
]
}
参考: https://docs.aws.amazon.com/zh_cn/kinesis-agent-windows/latest/userguide/getting-started.html
【可选】模拟一个Apache Log环境为后面的测试验证
在EC2服务中创建一个Linux EC2![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana25.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana26.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana27.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana28.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana29.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana30.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana31.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana32.png)
chmod 400 EKK-test.pem
ssh -i EKK-test.pem ec2-user@52.81.85.86
安装Kinesis Agent
$sudo yum install –y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-latest.amzn1.noarch.rpm
配置Agent
$sudo vi /etc/aws-kinesis/agent.json
{
"cloudwatch.emitMetrics": true,
"firehose.endpoint": "firehose---cn-north-1.amazonaws.com.rproxy.goskope.com.cn",
"flows": [
{
"filePattern": "/var/log/httpd/access_log*",
"deliveryStream": "EKK-LogFirehose-apachelog",
"dataProcessingOptions": [
{
"optionName": "LOGTOJSON",
"logFormat": "COMMONAPACHELOG"
}
]
}
]
}
$sudo service aws-kinesis-agent start
$sudo chkconfig aws-kinesis-agent on
$sudo mkdir /var/log/httpd
安装Fake日志生成程序
参考 https://github.com/kiritbasu/Fake-Apache-Log-Generator
$sudo yum install -y git
$git clone https://github.com/kiritbasu/Fake-Apache-Log-Generator.git
$sudo yum install python-pip -y
$cd Fake-Apache-Log-Generator/
$sudo pip install -r requirements.txt
产生日志脚本
$cd ~
$sudo vi test.sh
#!/bin/bash # chkconfig: 2345 10 90 cd /var/log/httpd/ while true do sudo python /home/ec2-user/Fake-Apache-Log-Generator/apache-fake-log-gen.py -n 100 -o LOG sleep 10 done |
后台运行脚本:
$sudo sh ./test.sh &
Fake日志产生在目录 /var/log/httpd 中。
【可选】可使用Systems Manager(SSM)安装Agent
1. 为EC2的Role EKK-EC2 附加SSM需要的权限
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana33.png)
2. 使用SSM服务
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana34.png)
Linux 可使用AWS-RunShellScript
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana35.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/picture replacement4.png)
运行Shell的目标EC2 可直接选择,或者按Tag筛选
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana37.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana38.png)
Windows 类似,可使用AWS-RunPowerShellScript 运行powershell script
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana39.png)
使用Lambda在ES轮换数据数据(Rotate)
1. 为Lambda设置IAM Role
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana40.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana41.png)
2. 在Lambda中创建个Layer(层)
把项目需要的依赖包放到层里,方便Lambda的使用。
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana42.png)
层参考: https://docs.aws.amazon.com/zh_cn/lambda/latest/dg/configuration-layers.html
参考命令:
$mkdir python $pip3 install elasticsearch-curator -t ./python/ $zip -q -r layer.zip ./python |
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana43.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana44.png)
3. 创建Lambda: ES-Rotate
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana45.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana46.png)
设置Lambda运行的Layers
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana47.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana48.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana49.png)
保存
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana50.png)
设置运行代码,点击
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana51.png)
在下面Function code区域更新代码,host地址换成我们创建的ES Cluster地址
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana52.png)
import json
import boto3
from requests_aws4auth import AWS4Auth
from elasticsearch import Elasticsearch, RequestsHttpConnection
import curator
host = 'search-ekk-log-vpfpqvgbxnom3ctwvz5evv2du4---cn-north-1---es.amazonaws.com.rproxy.goskope.com.cn' # For example, search-my-domain.region.es.amazonaws.com
region = 'cn-north-1' # For example, us-west-1
service = 'es'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)
def lambda_handler(event, context):
es = Elasticsearch(
hosts = [{'host': host, 'port': 443}],
http_auth = awsauth,
use_ssl = True,
verify_certs = True,
connection_class = RequestsHttpConnection
)
index_list = curator.IndexList(es)
index_list.filter_by_age(source='name', direction='older', timestring='%Y-%m-%d', unit='days', unit_count=1)
print("Found %s indices to delete" % len(index_list.indices))
# If our filtered list contains any indices, delete them.
if index_list.indices:
curator.DeleteIndices(index_list).do_action()
# TODO implement
return {
'statusCode': 200,
'body': json.dumps('Hello from Lambda!')
}
修改Lambda 运行时内存和超时时间
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana53.png)
4. 测试
定义测试用例输入参数,
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana54.png)
因此Lambda测试用例,不需输入参数,可使用默认设置,
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana55.png)
运行测试用例
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana56.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana57.png)
在ES中查看
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana58.png)
参考 https://docs.aws.amazon.com/zh_cn/elasticsearch-service/latest/developerguide/curator.html
5. 在CloudWatch设置定时轮换
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana59.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana60.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana61.png)
Load to ES from S3
1. 创建Lambad s3-to-es-bulk-by-hour
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana62.png)
2. 设置Layer
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana63.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana64.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana65.png)
3. 设置Code
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana66.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana67.png)
Bucket 设置为Kinesis Firehose 中设置的bucket
Host 设置为ES 的endpoint
import boto3
import re
import requests
from requests_aws4auth import AWS4Auth
import json
from elasticsearch.helpers import bulk
import time
from elasticsearch import Elasticsearch, RequestsHttpConnection
region = 'cn-north-1' # e.g. us-west-1
service = 'es'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)
bucket = 'zhnc-ekk-full-log'
host = 'search-ekk-log-vpfpqvgbxnom3ctwvz5evv2du4---cn-north-1---es.amazonaws.com.rproxy.goskope.com.cn'
type = 'log'
headers = { "Content-Type": "application/json" }
s3 = boto3.client('s3', region_name=region)
def get_all_s3_keys(bucket, prefix):
keys = []
kwargs = {'Bucket': bucket, 'Prefix':prefix}
while True:
resp = s3.list_objects_v2(**kwargs)
for obj in resp['Contents']:
keys.append(obj['Key'])
try:
kwargs['ContinuationToken'] = resp['NextContinuationToken']
except KeyError:
break
return keys
# Lambda execution starts here
def lambda_handler(event, context):
print(event)
for record in event['Records']:
msg = json.loads(json.dumps(eval(record['Sns']['Message'])))
year=msg['year']
month=msg['month']
day=msg['day']
hour=msg['hour']
index = 'apachelog-{}-{}-{}-{}'.format(year,month,day,hour)
print(index)
keys = get_all_s3_keys(bucket,"apachelog/{}/{}/{}/{}".format(year,month,day,hour))
print(len(keys));
es = Elasticsearch(
hosts = [{'host': host, 'port': 443}],
http_auth = awsauth,
use_ssl = True,
verify_certs = True,
connection_class = RequestsHttpConnection
)
for key in keys:
ACTIONS = []
obj = s3.get_object(Bucket=bucket, Key=key)
body = obj['Body'].read()
lines = body.splitlines()
for line in lines:
document = json.loads(line)
action = {
"_index": index,
"_type": type,
"_source": document
}
ACTIONS.append(action)
success, _ = bulk(es, ACTIONS, index=index, raise_on_error=True)
print('Performed %d actions' % success)
|
4. 设置运行内存,超时时间
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana68.png)
5. 测试
这个Lambda将被SNS触发,创建一个模拟SNS的事件,
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana69.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana70.png)
消息Message 设置为将要测试的事件,Lambda会读取对应的S3文件到ES中,
消息内容为,
{
"Records": [
{
"EventSource": "aws:sns",
"EventVersion": "1.0",
"EventSubscriptionArn": "arn:aws-cn:sns:cn-north-1: 725362542198:s3-to-es-by-day:caf9a3b1-679c-4604-9b65-f15dca3b5b18",
"Sns": {
"Type": "Notification",
"MessageId": "71c6da65-49bf-5301-8270-8ff199faaa1b",
"TopicArn": "arn:aws-cn:sns:cn-north-1: 725362542198:s3-to-es-by-day",
"Subject": "None",
"Message": "{'year':'2019','month':'06','day':'30','hour':'02'}",
"Timestamp": "2019-07-02T03: 22: 45.020Z",
"SignatureVersion": "1",
"Signature": "ihaGN/JL8u/v57xEY1RTFekpUpgVukM9Ebj9IIM9Rr9KGkUMe6dO7hze7estD0yM9K0QRQAreQ5XiB0Tfj/jOCvyjL9IrRcTplQcWPzMHmVqd4C3942gduFkHyul2+lYa0DJZM46J/Yy7mihe9EfXUySf2Eyok4NsUC6WtnbyJPN17FG1t4fnEWpRwU2Yg+MLM4bJWr3sK5/6xRnUVerLlMm5tCsynybW6FQCYsVgl7SJLW6nBmbCe3v6jRMuKCNW8xptVyEAnII4h5uPVElts0IWhnE+EQG3FNFmOZmj8OLZutRadSrNFexRMZebmKwRZRD5dTaCoD5E6v6TTYGbQ==",
"SigningCertUrl": "https: //sns---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/SimpleNotificationService-3250158c6506d40f628c21ed8dad1787.pem",
"UnsubscribeUrl": "https://sns---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/?Action=Unsubscribe&SubscriptionArn=arn:aws-cn:sns:cn-north-1:725362542198:s3-to-es-by-day:caf9a3b1-679c-4604-9b65-f15dca3b5b18",
"MessageAttributes": {}
}
}
]
}
此事例将读取以下位置的日志文件
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana71.png)
点击测试
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana72.png)
ES Index已经导入
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana73.png)
创建一个SNS Topic(s3-to-es-by-hour) 触发Lambda(s3-to-es-bulk-by-hour)
1. 创建Topic
进入SNS服务页面
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana74.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana75.png)
2. 订阅事件
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana76.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana77.png)
3. 测试
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana78.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana79.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana80.png)
消息
{"year":"2019","month":"06","day":"30","hour":"05"}
复制Topic的ARN,下面Lambda会发送消息进来
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana81.png)
创建Lambda split-day-to-24-hour
负责把按天ES index的导入事件拆分成24个按小时导入的事件
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana82.png)
1. 更新代码
替换SNS Topic ARN 为上面创建的Topic
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana83.png)
import json
import boto3
sns = boto3.client('sns')
def lambda_handler(event, context):
for record in event['Records']:
msg = json.loads(json.dumps(eval(record['Sns']['Message'])))
# TODO implement
for i in range(0,24) :
msg['hour'] = "%02d" % i
print(msg)
response = sns.publish(
TopicArn='arn:aws-cn:sns:cn-north-1:725362542198:s3-to-es-by-hour',
Message=json.dumps(msg),
)
return {
'statusCode': 200,
'body': json.dumps('Hello from Lambda!')
}
2. 创建SNS Topic 用于触发此Lambda![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana84.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana85.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana86.png)
3. 订阅Lambda
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana87.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana88.png)
4. 测试
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana89.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana90.png)
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana91.png)
{"year":"2019","month":"06","day":"30"}
2019年6月30日的日志文件,会并发为24个Lambda按小时导入到ES
Athena query data in S3 using SQL
1. 基于S3路径,创建Table
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana92.png)
CREATE EXTERNAL TABLE IF NOT EXISTS log.apachelogtest (
'bytes' string,
'datetime' string,
'host' string,
'request' string,
'response' string,
'datetime' string
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
'serialization.format' = '1'
) LOCATION 's3://poc-zhnc-hive/log/'
TBLPROPERTIES ('has_encrypted_data'='false');
2. 使用Table查询
![](https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/awschinablog/Amazon Elasticsearch Service, Amazon Kinesis, and Kibana93.png)
本篇作者