利用 Amazon ElastiCache 寻找附近的 X

阅读数:42 2019 年 11 月 13 日 08:00

利用Amazon ElastiCache寻找附近的X
基于地理信息的应用已经越来越深入到日常生活中,人们经常会在应用中寻找附近的朋友,车,餐厅或其它资源。而与此同时,随着物理网技术及设备的普及,应用需要更加实时和精确的处理来自各种数据源(包括用户手机,各种传感器设备及其他系统)的大量数据,以完成相关的搜索和计算距离等操作。
复制代码
### 架构
对于开发者来说,Redis 因为其性能上的优势往往会被采用作为位置数据的缓存,只是在 3.2 版本之前需要代码中把位置数据进行 Geohash 后才能有效的排序和分析。不过 3.2 版本后,Redis 已经能够原生支持基于位置信息的存储,计算及搜索了。Amazon ElastiCache 是 AWS 提供的托管型的数据缓存服务,借助该服务,用户能够在云中轻松部署、运行和扩展分布式内存数据存储或缓存。 Amazon ElastiCache 的 Redis 引擎是一项与 Redis 兼容的内存服务,兼具 Redis 的易用性和强大功能,同时还可为要求最苛刻的应用程序提供适用的可用性、可靠性和性能,提供单节点和多达 15 个分片的群集,从而可将内存数据扩展到高达 3.55TiB。这里,我们可以基于 Elasticache 并结合 AWS 其他服务构建出以下的示例架构:
![](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-1.png)
1)终端设备获取 GPS 位置信息,定时或基于事件将数据上传到云端。在 AWS 上可以选择使用 IoT 或 Kinesis 等托管型服务作为数据收集的接收端,也可以使用部署在 EC2/Lambda 上的自定义服务。
2)所有位置信息写入可以自动扩展的 DynamoDB,基本 Schema 包含设备 Id/Timestamp/Geo location, 方便历史查询或轨迹查询。
3)打开 DynamoDB 流,用 KCL 或 Lambda 监听 DynamoDB 的数据改变,并将当前变化的位置数据更新到 Elasticache 中建立基于 Geospatial 的索引缓存。
4)手机应用搜索附近资源时,部署在 EC2/Lambda 的查询服务利用 Elasticache geospatial 直接获取结果。
### 实现
如前文所述,步骤 12 可选择的方案很多,比如采用 AWS IoT 服务甚至可以无需任何代码仅通过配置即可完成云端的功能讲数据实时写入相应的 DynamoDB 表中。因此,本文将着重介绍如何实现前文架构中的 34 步:
a) 打开 DynamoDB 流,获取流的 ARN 用于读取,如下图:
![](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-2.png)
读取 DynamoDB 流数据有三种方式:利用 Kinesis adapter,利用低级别 API 以及利用 Lambda 函数来进行读取。从易用性的角度来说,当然是 Lambda 函数最简单,不需要考虑 shard,吞吐和 checkpoint 等问题而专注于业务逻辑。但是 Lambda 函数并不是在所有的 AWS 区域都支持,因此本文采用第一种方式利用 Kinesis adapter 完成读取。具体参考文档:http://docs.amazonaws.cn/amazondynamodb/latest/developerguide/Streams.KCLAdapter.html
b) 在读取流的同时,我们需要将最新的地理位置信息利用 GEOADD 更新到 Elasticache 中。前文提到 Redis 在 3.2 版本后,Geospatial Indexing 已经被原生支持,而它实际上是 Sorted List 数据结构的一种扩展,即排序 key 扩展成了经纬度,如下图所示的数据结构,并且可以方便的使用基于地理信息的 API,例如 GEOADD——添加地理位置 。
![](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-3.png)
通过 Elasticache 可以快速构建出一个 Redis 环境,包括支持 shard 的集群模式,如下图所示。
![](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-4.png)
构建完成后,通过 Elasticache 提供的终端节点就可以访问 cache 了。
![](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-5.png)
需要注意的是如果选择的 Redis 是集群模式,那么就得同步升级支持 Redis 集群模式的客户端 SDK 用以开发。因为 Redis 的集群提供的是分片功能,它会把不同的 slots 分布在不同的节点上,需要由客户端通过 CRC16(Key) 取模从而计算出数据在哪个节点上。目前可以支持 redis 集群模式的客户端有很多,比如本文用到的 java 的 jedis 以及 nodejs 的 ioredis。
综合 a,b 两步的示例代码的 StreamCacheProcessor.java 如下 (其余代码参考 http://docs.amazonaws.cn/amazondynamodb/latest/developerguide/Streams.KCLAdapter.Walkthrough.CompleteProgram.html ):
Java
复制代码
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import redis.clients.jedis.GeoCoordinate
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.RecordAdapter;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;
public class StreamsCacheProcessor implements IRecordProcessor {
private static Log LOG = LogFactory.getLog(StreamsCacheProcessor.class);
private Integer checkpointCounter;
private String clusterHost;
private int port;
private JedisCluster jedis;
public StreamsCacheProcessor(String clusterHost,int port) {
this.clusterHost=clusterHost;
this.port=port;
}
@Override
public void initialize(String shardId) {
Set jedisClusterNode=new HashSet();
jedisClusterNode.add(new HostAndPort(clusterHost,port));
jedis=new JedisCluster(jedisClusterNode);
checkp ointCounter = 0;
}
@Override
public void processRecords(List records, IRecordProcessorCheckpointer checkpointer) {
for (Record record : records) {
String data = new String(record.getData().array(), Charset.forName("UTF-8"));
LOG.debug("Received the data as:"+data);
if(record instanceof RecordAdapter)
com.amazonaws.services.dynamodbv2.model.Record streamRecord = ((RecordAdapter) record).getInternalObject();
// 新增 GPS 数据更新到 Elasticache 中
if(streamRecord.getEventName().equals("INSERT")){
Map coordinateMap = new HashMap();
double longitude = Double.parseDouble(streamRecord.getDynamodb().getNewImage().get("longitude").getN());
double latitude = Double.parseDouble(streamRecord.getDynamodb().getNewImage().get("latitude").getN());
String deviceId = streamRecord.getDynamodb().getNewImage().get("deviceId").getS();
coordinateMap.put(deviceId, new GeoCoordinate(longitude, latitude));
jedis.geoadd("bikes", coordinateMap);
LOG.info("Updated "+deviceId+" GPS information as:"+longitude+","+latitude);
}
}
checkpointCounter += 1;
if(checkpointCounter % 10 == 0){ //checkpoint 大小需根据实际需求调整
try {
checkpointer.checkpoint();
}
catch (Exception e) {
e.printStackTrace();
}
}
}
}
@Override
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
{
if(reason == ShutdownReason.TERMINATE) {
try {
checkpointer.checkpoint();
}
catch (Exception e) {
e.printStackTrace();
}
}
}
}
复制代码
c)在完成地理信息的实时更新后, 可以基于 Elasticache 的数据利用 GEORADIUS 搜索周边的资源。使用 nodejs 示例代码如下:
Java
复制代码
var express = require('express');
var app = express();
var Redis = require('ioredis');
var cluster = new Redis.Cluster([{
port:6379,
host:'lab-cluster.4bh9j8.clustercfg.cnn1.cache.amazonaws.com.cn'
}]);
app.get('/bikes', function(req,res){
if(req.query['longitude'] && req.query['latitude']){
console.log ('longitude = %s,latitude = %s',req.query['longitude'],req.query['latitude']);
cluster.send_command('GEORADIUS',
[ 'bikes',req.query['longitude'],req.query['latitude'],2000,
'm',
'WITHDIST',
'WITHCOORD',
'COUNT',
10], (error, reply) =>{
if (error) {
res.status(500).send(" 无法获取附近车辆信息 ");
return;
}
var stations = reply.map( (r) =>{
return {
name: r[0],
distance: `${r[1]} m`,
coordinates: {
latitude: Number(r[2][1]),
longitude: Number(r[2][0])
} }
});
res.status(200).json(stations);
});
}
});
var server = app.listen(8080,function(){
var host = server.address().address
var port = server.address().port
console.log(" 应用实例,访问地址为 http://%s:%s", host, port)
});
复制代码
基于以上代码,服务端就可以返回最近的 10 个资源以及每个资源离当前位置的距离。例如:
请求
`http://hostname or elb address/bikes?longitude=116&latitude=39.4`
返回
Java
复制代码
[
{
"name": "48093ba0-f8f1-49f0-b312-285800341b08",
"distance": "1117.8519 m",
"coordinates": {
"latitude": 39.40640623614937,
"longitude": 116.01002186536789
}
},
{
"name": "950fb5df-c0ff-4a95-90ea-2f5f574c5796",
"distance":"1305.5083 m",
"coordinates": {
"latitude": 39.40184880750488,
"longitude": 116.01500004529953
}
},
……
]
复制代码
d) 通过封装 http 请求构建手机应用。
![](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/08/01/map.png)
### 总结
Redis Geospatial 功能可以让开发者更高效的搜索和计算位置信息的记录。同时,Amazon ElastiCache 提供的托管 Redis 服务大大简化了对于 Redis 集群的维护工作,包括搭建,备份和迁移等工作。最后,自动扩展的 Amazon DynamoDB 则负责位置信息数据的持久化和检索,而它的流功能也使得数据能够快速实时的流转起来。
** 作者介绍 **
![](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/24/Zhao-Fei-Mini.png)
赵霏,AWS 解决方案架构师。负责基于 AWS 的云计算方案架构咨询和设计,同时致力于 AWS 云服务在国内的应用和推广。他拥有超过 13 年 IT 行业从业经验,长期专注于企业 IT 云转型、物联网、移动互联网、Devops 等领域,在大规模后台架构、分布式计算和自动化运维等方面有着广泛的设计和实践经验。

本文转载自 AWS 技术博客。

原文链接:
https://amazonaws-china.com/cn/blogs/china/using-amazon-elasticache-to-find-x/

欲了解 AWS 的更多信息,请访问【AWS 技术专区】

评论

发布