亚马逊AWS官方博客

利用Amazon ElastiCache寻找附近的X

基于地理信息的应用已经越来越深入到日常生活中,人们经常会在应用中寻找附近的朋友,车,餐厅或其它资源。而与此同时,随着物理网技术及设备的普及,应用需要更加实时和精确的处理来自各种数据源(包括用户手机,各种传感器设备及其他系统)的大量数据,以完成相关的搜索和计算距离等操作。

架构

对于开发者来说,Redis因为其性能上的优势往往会被采用作为位置数据的缓存,只是在3.2版本之前需要代码中把位置数据进行Geohash后才能有效的排序和分析。不过3.2版本后,Redis已经能够原生支持基于位置信息的存储,计算及搜索了。Amazon ElastiCache是AWS提供的托管型的数据缓存服务,借助该服务,用户能够在云中轻松部署、运行和扩展分布式内存数据存储或缓存。 Amazon ElastiCache 的Redis引擎是一项与 Redis 兼容的内存服务,兼具 Redis 的易用性和强大功能,同时还可为要求最苛刻的应用程序提供适用的可用性、可靠性和性能,提供单节点和多达 15 个分片的群集,从而可将内存数据扩展到高达 3.55TiB。这里,我们可以基于Elasticache并结合AWS其他服务构建出以下的示例架构:

1)终端设备获取GPS位置信息,定时或基于事件将数据上传到云端。在AWS上可以选择使用IoT或Kinesis等托管型服务作为数据收集的接收端,也可以使用部署在EC2/Lambda上的自定义服务。

2)所有位置信息写入可以自动扩展的DynamoDB,基本Schema包含设备Id/Timestamp/Geo location, 方便历史查询或轨迹查询。

3)打开DynamoDB流,用KCL或Lambda监听DynamoDB的数据改变,并将当前变化的位置数据更新到Elasticache中建立基于Geospatial的索引缓存。

4)手机应用搜索附近资源时,部署在EC2/Lambda的查询服务利用Elasticache geospatial直接获取结果。

实现

如前文所述,步骤1和2可选择的方案很多,比如采用AWS IoT服务甚至可以无需任何代码仅通过配置即可完成云端的功能讲数据实时写入相应的DynamoDB表中。因此,本文将着重介绍如何实现前文架构中的3和4步:

a) 打开DynamoDB 流,获取流的ARN用于读取,如下图:

读取DynamoDB流数据有三种方式:利用Kinesis adapter,利用低级别API以及利用Lambda函数来进行读取。从易用性的角度来说,当然是Lambda函数最简单,不需要考虑shard,吞吐和checkpoint等问题而专注于业务逻辑。但是Lambda函数并不是在所有的AWS区域都支持,因此本文采用第一种方式利用Kinesis adapter完成读取。具体参考文档:http://docs.amazonaws.cn/amazondynamodb/latest/developerguide/Streams.KCLAdapter.html

b) 在读取流的同时,我们需要将最新的地理位置信息利用GEOADD更新到Elasticache中。前文提到Redis在3.2版本后,Geospatial Indexing已经被原生支持,而它实际上是Sorted List数据结构的一种扩展,即排序 key扩展成了经纬度,如下图所示的数据结构,并且可以方便的使用基于地理信息的API,例如GEOADD——添加地理位置 。

通过Elasticache可以快速构建出一个Redis环境,包括支持shard的集群模式,如下图所示。

构建完成后,通过Elasticache提供的终端节点就可以访问cache了。

需要注意的是如果选择的Redis是集群模式,那么就得同步升级支持Redis集群模式的客户端SDK用以开发。因为Redis的集群提供的是分片功能,它会把不同的slots分布在不同的节点上,需要由客户端通过CRC16(Key)取模从而计算出数据在哪个节点上。目前可以支持redis集群模式的客户端有很多,比如本文用到的java的jedis以及nodejs的ioredis。

综合a,b两步的示例代码的StreamCacheProcessor.java如下(其余代码参考http://docs.amazonaws.cn/amazondynamodb/latest/developerguide/Streams.KCLAdapter.Walkthrough.CompleteProgram.html ):


import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import redis.clients.jedis.GeoCoordinate
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;

import com.amazonaws.services.dynamodbv2.streamsadapter.model.RecordAdapter;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;

public class StreamsCacheProcessor implements IRecordProcessor {

        private static Log LOG = LogFactory.getLog(StreamsCacheProcessor.class);

  private Integer checkpointCounter;

  private String clusterHost;
  private int port;
  private JedisCluster jedis;

public StreamsCacheProcessor(String clusterHost,int port) {
      this.clusterHost=clusterHost;
      this.port=port;
}

@Override
public void initialize(String shardId) {
      Set jedisClusterNode=new HashSet();
      jedisClusterNode.add(new HostAndPort(clusterHost,port));
      jedis=new JedisCluster(jedisClusterNode);
 checkp ointCounter = 0;
}
@Override
public void processRecords(List records, IRecordProcessorCheckpointer checkpointer) {
 for (Record record : records) {
  String data = new String(record.getData().array(), Charset.forName("UTF-8"));
  LOG.debug("Received the data as:"+data);
  if(record instanceof RecordAdapter)
     com.amazonaws.services.dynamodbv2.model.Record streamRecord = ((RecordAdapter) record).getInternalObject();
    //新增GPS数据更新到Elasticache中
    if(streamRecord.getEventName().equals("INSERT")){
    Map coordinateMap = new HashMap();
    double longitude = Double.parseDouble(streamRecord.getDynamodb().getNewImage().get("longitude").getN());
    double latitude = Double.parseDouble(streamRecord.getDynamodb().getNewImage().get("latitude").getN());
    String deviceId = streamRecord.getDynamodb().getNewImage().get("deviceId").getS();
    coordinateMap.put(deviceId, new GeoCoordinate(longitude, latitude));

    jedis.geoadd("bikes", coordinateMap);
    LOG.info("Updated "+deviceId+" GPS information as:"+longitude+","+latitude);
    }
  }
checkpointCounter += 1;
if(checkpointCounter % 10 == 0){ //checkpoint大小需根据实际需求调整
  try {
    checkpointer.checkpoint();
   }
  catch (Exception e) {
    e.printStackTrace();
   }
  }
 }
 
}

@Override
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) 
{
  if(reason == ShutdownReason.TERMINATE) {
    try {
      checkpointer.checkpoint();
    }
    catch (Exception e) {
     e.printStackTrace();
    }
   }
  }
}

c)在完成地理信息的实时更新后, 可以基于Elasticache的数据利用GEORADIUS搜索周边的资源。使用nodejs示例代码如下:


var express = require('express');
var app = express();
var Redis = require('ioredis');

var cluster = new Redis.Cluster([{
    port:6379,
    host:'lab-cluster---4bh9j8---clustercfg---cnn1---cache.amazonaws.com.rproxy.goskope.com.cn'
}]);

app.get('/bikes', function(req,res){
if(req.query['longitude'] && req.query['latitude']){
console.log ('longitude = %s,latitude = %s',req.query['longitude'],req.query['latitude']);
cluster.send_command('GEORADIUS',
[ 'bikes',req.query['longitude'],req.query['latitude'],2000,
'm',
'WITHDIST',
'WITHCOORD',
'COUNT',
10], (error, reply) =>{

if (error) {
res.status(500).send("无法获取附近车辆信息");
return;
}

var stations = reply.map( (r) =>{
return {
name: r[0],
distance: `${r[1]} m`,
coordinates: {
latitude:  Number(r[2][1]),
longitude: Number(r[2][0])
} }
});

res.status(200).json(stations);
});
}
});

var server = app.listen(8080,function(){
var host = server.address().address
var port = server.address().port
console.log("应用实例,访问地址为 http://%s:%s", host, port)
});

基于以上代码,服务端就可以返回最近的10个资源以及每个资源离当前位置的距离。例如:
请求
http://hostname or elb address/bikes?longitude=116&latitude=39.4
返回


[
 {
  "name": "48093ba0-f8f1-49f0-b312-285800341b08",
  "distance": "1117.8519 m",
   "coordinates": {
    "latitude": 39.40640623614937,
    "longitude": 116.01002186536789
 }
},
{
  "name": "950fb5df-c0ff-4a95-90ea-2f5f574c5796",
  "distance":"1305.5083 m",
  "coordinates": {
   "latitude": 39.40184880750488,
   "longitude": 116.01500004529953
  }
 },
……
]

d)通过封装http请求构建手机应用。

总结

Redis Geospatial功能可以让开发者更高效的搜索和计算位置信息的记录。同时,Amazon ElastiCache提供的托管Redis服务大大简化了对于Redis集群的维护工作,包括搭建,备份和迁移等工作。最后,自动扩展的Amazon DynamoDB则负责位置信息数据的持久化和检索,而它的流功能也使得数据能够快速实时的流转起来。

 

作者介绍

赵霏,AWS解决方案架构师。负责基于AWS的云计算方案架构咨询和设计,同时致力于AWS云服务在国内的应用和推广。他拥有超过13年IT行业从业经验,长期专注于企业IT云转型、物联网、移动互联网、Devops等领域,在大规模后台架构、分布式计算和自动化运维等方面有着广泛的设计和实践经验。