写点什么

利用 Amazon ElastiCache 寻找附近的 X

  • 2019-11-13
  • 本文字数:5023 字

    阅读完需:约 16 分钟

利用Amazon ElastiCache寻找附近的X
基于地理信息的应用已经越来越深入到日常生活中,人们经常会在应用中寻找附近的朋友,车,餐厅或其它资源。而与此同时,随着物理网技术及设备的普及,应用需要更加实时和精确的处理来自各种数据源(包括用户手机,各种传感器设备及其他系统)的大量数据,以完成相关的搜索和计算距离等操作。


### 架构
对于开发者来说,Redis因为其性能上的优势往往会被采用作为位置数据的缓存,只是在3.2版本之前需要代码中把位置数据进行Geohash后才能有效的排序和分析。不过3.2版本后,Redis已经能够原生支持基于位置信息的存储,计算及搜索了。Amazon ElastiCache是AWS提供的托管型的数据缓存服务,借助该服务,用户能够在云中轻松部署、运行和扩展分布式内存数据存储或缓存。 Amazon ElastiCache 的Redis引擎是一项与 Redis 兼容的内存服务,兼具 Redis 的易用性和强大功能,同时还可为要求最苛刻的应用程序提供适用的可用性、可靠性和性能,提供单节点和多达 15 个分片的群集,从而可将内存数据扩展到高达 3.55TiB。这里,我们可以基于Elasticache并结合AWS其他服务构建出以下的示例架构:
[](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-1-1024x717.png)](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-1.png)
1)终端设备获取GPS位置信息,定时或基于事件将数据上传到云端。在AWS上可以选择使用IoT或Kinesis等托管型服务作为数据收集的接收端,也可以使用部署在EC2/Lambda上的自定义服务。
2)所有位置信息写入可以自动扩展的DynamoDB,基本Schema包含设备Id/Timestamp/Geo location, 方便历史查询或轨迹查询。
3)打开DynamoDB流,用KCL或Lambda监听DynamoDB的数据改变,并将当前变化的位置数据更新到Elasticache中建立基于Geospatial的索引缓存。
4)手机应用搜索附近资源时,部署在EC2/Lambda的查询服务利用Elasticache geospatial直接获取结果。
### 实现
如前文所述,步骤1和2可选择的方案很多,比如采用AWS IoT服务甚至可以无需任何代码仅通过配置即可完成云端的功能讲数据实时写入相应的DynamoDB表中。因此,本文将着重介绍如何实现前文架构中的3和4步:
a) 打开DynamoDB 流,获取流的ARN用于读取,如下图:
[](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-2-1024x662.png)](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-2.png)
读取DynamoDB流数据有三种方式:利用Kinesis adapter,利用低级别API以及利用Lambda函数来进行读取。从易用性的角度来说,当然是Lambda函数最简单,不需要考虑shard,吞吐和checkpoint等问题而专注于业务逻辑。但是Lambda函数并不是在所有的AWS区域都支持,因此本文采用第一种方式利用Kinesis adapter完成读取。具体参考文档:http://docs.amazonaws.cn/amazondynamodb/latest/developerguide/Streams.KCLAdapter.html
b) 在读取流的同时,我们需要将最新的地理位置信息利用GEOADD更新到Elasticache中。前文提到Redis在3.2版本后,Geospatial Indexing已经被原生支持,而它实际上是Sorted List数据结构的一种扩展,即排序 key扩展成了经纬度,如下图所示的数据结构,并且可以方便的使用基于地理信息的API,例如GEOADD——添加地理位置 。
[](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-3-1024x204.png)](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-3.png)
通过Elasticache可以快速构建出一个Redis环境,包括支持shard的集群模式,如下图所示。
[](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-4.png)](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-4.png)
构建完成后,通过Elasticache提供的终端节点就可以访问cache了。
[](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-5.png)](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-5.png)
需要注意的是如果选择的Redis是集群模式,那么就得同步升级支持Redis集群模式的客户端SDK用以开发。因为Redis的集群提供的是分片功能,它会把不同的slots分布在不同的节点上,需要由客户端通过CRC16(Key)取模从而计算出数据在哪个节点上。目前可以支持redis集群模式的客户端有很多,比如本文用到的java的jedis以及nodejs的ioredis。
综合a,b两步的示例代码的StreamCacheProcessor.java如下(其余代码参考http://docs.amazonaws.cn/amazondynamodb/latest/developerguide/Streams.KCLAdapter.Walkthrough.CompleteProgram.html ):
Java
复制代码


   import java.nio.charset.Charset;   import java.util.HashMap;   import java.util.HashSet;   import java.util.List;   import java.util.Map;   import java.util.Set;
import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory;
import redis.clients.jedis.GeoCoordinate import redis.clients.jedis.HostAndPort; import redis.clients.jedis.JedisCluster;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.RecordAdapter; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.model.Record;
public class StreamsCacheProcessor implements IRecordProcessor {
private static Log LOG = LogFactory.getLog(StreamsCacheProcessor.class);
private Integer checkpointCounter;
private String clusterHost; private int port; private JedisCluster jedis;
public StreamsCacheProcessor(String clusterHost,int port) { this.clusterHost=clusterHost; this.port=port; }
@Override public void initialize(String shardId) { Set jedisClusterNode=new HashSet(); jedisClusterNode.add(new HostAndPort(clusterHost,port)); jedis=new JedisCluster(jedisClusterNode); checkp ointCounter = 0; } @Override public void processRecords(List records, IRecordProcessorCheckpointer checkpointer) { for (Record record : records) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); LOG.debug("Received the data as:"+data); if(record instanceof RecordAdapter) com.amazonaws.services.dynamodbv2.model.Record streamRecord = ((RecordAdapter) record).getInternalObject(); //新增GPS数据更新到Elasticache中 if(streamRecord.getEventName().equals("INSERT")){ Map coordinateMap = new HashMap(); double longitude = Double.parseDouble(streamRecord.getDynamodb().getNewImage().get("longitude").getN()); double latitude = Double.parseDouble(streamRecord.getDynamodb().getNewImage().get("latitude").getN()); String deviceId = streamRecord.getDynamodb().getNewImage().get("deviceId").getS(); coordinateMap.put(deviceId, new GeoCoordinate(longitude, latitude));
jedis.geoadd("bikes", coordinateMap); LOG.info("Updated "+deviceId+" GPS information as:"+longitude+","+latitude); } } checkpointCounter += 1; if(checkpointCounter % 10 == 0){ //checkpoint大小需根据实际需求调整 try { checkpointer.checkpoint(); } catch (Exception e) { e.printStackTrace(); } } }
}
@Override public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { if(reason == ShutdownReason.TERMINATE) { try { checkpointer.checkpoint(); } catch (Exception e) { e.printStackTrace(); } } } }
复制代码


c)在完成地理信息的实时更新后, 可以基于Elasticache的数据利用GEORADIUS搜索周边的资源。使用nodejs示例代码如下:
Java
复制代码


   var express = require('express');   var app = express();   var Redis = require('ioredis');
var cluster = new Redis.Cluster([{ port:6379, host:'lab-cluster.4bh9j8.clustercfg.cnn1.cache.amazonaws.com.cn' }]);
app.get('/bikes', function(req,res){ if(req.query['longitude'] && req.query['latitude']){ console.log ('longitude = %s,latitude = %s',req.query['longitude'],req.query['latitude']); cluster.send_command('GEORADIUS', [ 'bikes',req.query['longitude'],req.query['latitude'],2000, 'm', 'WITHDIST', 'WITHCOORD', 'COUNT', 10], (error, reply) =>{
if (error) { res.status(500).send("无法获取附近车辆信息"); return; }
var stations = reply.map( (r) =>{ return { name: r[0], distance: `${r[1]} m`, coordinates: { latitude: Number(r[2][1]), longitude: Number(r[2][0]) } } });
res.status(200).json(stations); }); } });
var server = app.listen(8080,function(){ var host = server.address().address var port = server.address().port console.log("应用实例,访问地址为 http://%s:%s", host, port) });
复制代码


基于以上代码,服务端就可以返回最近的10个资源以及每个资源离当前位置的距离。例如:  请求  `http://hostname or elb address/bikes?longitude=116&latitude=39.4`  返回
Java
复制代码


   [    {     "name": "48093ba0-f8f1-49f0-b312-285800341b08",     "distance": "1117.8519 m",      "coordinates": {       "latitude": 39.40640623614937,       "longitude": 116.01002186536789    }   },   {     "name": "950fb5df-c0ff-4a95-90ea-2f5f574c5796",     "distance":"1305.5083 m",     "coordinates": {      "latitude": 39.40184880750488,      "longitude": 116.01500004529953     }    },   ……   ]
复制代码


d)通过封装http请求构建手机应用。
[](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/08/01/map-503x1024.png)](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/08/01/map.png)
### 总结
Redis Geospatial功能可以让开发者更高效的搜索和计算位置信息的记录。同时,Amazon ElastiCache提供的托管Redis服务大大简化了对于Redis集群的维护工作,包括搭建,备份和迁移等工作。最后,自动扩展的Amazon DynamoDB则负责位置信息数据的持久化和检索,而它的流功能也使得数据能够快速实时的流转起来。


**作者介绍**
[](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/24/Zhao-Fei-Mini.png)](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/24/Zhao-Fei-Mini.png)
赵霏,AWS解决方案架构师。负责基于AWS的云计算方案架构咨询和设计,同时致力于AWS云服务在国内的应用和推广。他拥有超过13年IT行业从业经验,长期专注于企业IT云转型、物联网、移动互联网、Devops等领域,在大规模后台架构、分布式计算和自动化运维等方面有着广泛的设计和实践经验。
复制代码


本文转载自 AWS 技术博客。


原文链接:


https://amazonaws-china.com/cn/blogs/china/using-amazon-elasticache-to-find-x/


2019-11-13 08:00953

评论

发布
暂无评论
发现更多内容

深度合作 | TDengine + 华为云 Stack 强强联合打造高效物联网时序数据处理解决方案

TDengine

数据库 tdengine 时序数据库

从行业角度看,数仓领域的未来是什么?

字节跳动数据平台

字节跳动 数据仓库 OLAP

InfoQ 极客传媒 15 周年庆征文|Spring Cloud netflix概览及架构设计

No Silver Bullet

架构 6月月更 InfoQ极客传媒15周年庆 Spring Cloud netflix

千亿参数“一口闷”?大模型训练必备四种策略

OneFlow

人工智能 模型训练 策略

从 keynote 大神到语雀画图大神,她是怎么做到的?

编辑器 思维导图 文档管理 企业知识管理

保险APP适老化服务评测框架 发布

易观分析

保险

TiDB 6.0 实战分享丨内存悲观锁原理浅析与实践

PingCAP

分布式数据库 TiDB

InfoQ 极客传媒 15 周年庆征文|手摸手教你在Windows安装Docker,一定要看到最后

迷彩

Docker 架构 运维 6月月更 InfoQ极客传媒15周年庆

天翼云数据中台通过“数字政府智慧中台”评估

极客天地

融云首席科学家任杰:数字游民和意识体,疫情将如何影响人类社会进化

融云 RongCloud

Wallys/Routerboard/DR344/WiFi/AR9344 FCC/CE/IC

wallys-wifi6

AR9344 802.11a

【Java Web 系列】Session的原理分析和使用细节

倔强的牛角

Java javaWeb session 6月月更

技术分享| 云服务器的使用-nginx的安装及使用

anyRTC开发者

nginx centos 音视频 服务器

莫把暑假插错秧,代码哪有足球香,Alluxio足球青训营在线摇人!

Alluxio

微软 开源 足球 分布式, CCF开源高校行

数据智能基础设施升级窗口将至?看九章云极 DingoDB 如何击破数据痛点

九章云极DataCanvas

人工智能 数据库 数据 数据智能

Go语言入门基础之库源码文件

Damon

6月月更

物联网低代码平台如何添加报警配置?

AIRIOT

物联网 低代码开发 低代码平台

Java—线程安全II

武师叔

6月月更

【云计算】云计算平台是什么意思?可以划分为哪三类?

行云管家

云计算 云服务 私有云 云平台 云计算平台

How to solve the different brightness of LED display colors

Dylan

LED LED display

web前端培训VUE开发者需要知道哪些实用技术点

@零度

Vue 前端开发

NFT卡牌盲盒链游系统dapp开发搭建

薇電13242772558

智能合约 NFT

2022年中国社区团购发展新动向

易观分析

社区团购

KubeVela 1.4:让应用交付更安全、上手更简单、过程更透明

孙健波

Kubernetes OAM KubeVela 云原生应用 K8s 多集群管理

十分钟带你入门Docker容器引擎

百思不得小赵

云原生 Docker 镜像 6月月更

二级等保要求几年做一次测评?测评项目有多少项?

行云管家

等级保护 等保测评 二级等保 等保二级

服务网格项目Aeraki Mesh正式进入CNCF沙箱

York

开源 云原生 istio Service Mesh 服务网格 cncf

帮助文档在软件中的存在价值是什么?

小炮

量化夹子机器人系统开发逻辑分析

开发微hkkf5566

Nginx 配置和性能调优

CRMEB

justcows奶牛理财dapp系统开发

开发微hkkf5566

利用Amazon ElastiCache寻找附近的X_语言 & 开发_亚马逊云科技 (Amazon Web Services)_InfoQ精选文章