QCon 全球软件开发大会(北京站)门票 9 折倒计时 4 天,点击立减 ¥880 了解详情
写点什么

Amazon Kinesis Analytics 中的实时热点检测

2019 年 10 月 28 日

Amazon Kinesis Analytics 中的实时热点检测

今天,我们将在 Amazon Kinesis Data Analytics 中发布新的 Machine Learning 功能,用以检测流式数据中的“热点”。2016 年 8 月,我们推出了 Kinesis Data Analytics,并在此后不断添加功能。您或许已经知道,Kinesis Data Analytics 是一种完全托管的流式数据实时处理引擎,可让您编写 SQL 查询以便从您的数据中推演含义,并将结果输出到 Kinesis Data FirehoseKinesis Data Streams 甚至是 AWS Lambda。全新 HOTSPOT 函数将补充 Kinesis 中的现有 Machine Learning 功能,允许客户利用基于无人监管流的 Machine Learning 算法。客户不需要是数据科学或 Machine Learning 领域的专家也能得心应手地利用这些功能。



热点

[](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sqlrf-hotspots.html) 函数是一种全新的 Kinesis Data Analytics SQL 函数,可供您用于识别数据中相对密集的区域,而不必显式构建和训练复杂的 Machine Learning 模型。您可以识别需要立即关注的数据子节点,并通过将热点流式传输到 Kinesis 数据流、Firehose 传输流或者通过调用 AWS Lambda 函数,以编程方式执行操作。


这一函数有很多超酷的应用场合,可以让您的操作更轻松。想象一下,传达交通拥堵情况时空数据的骑乘共享项目或无人驾驶车辆车队,或者某个数据中心内的大量服务器过热,说明暖通空调 (HVAC) 系统存在问题。 HOTSPOTS 不仅限于时空数据,还可以应用于众多问题领域。


该函数遵循简单的语法,接受 DOUBLEINTEGERFLOATTINYINTSMALLINTREALBIGINT 数据类型。


HOTSPOT 函数获取游标作为输入,并返回描述热点的 JSON 字符串。通过示例可以更轻松地理解这一函数。


利用 Kinesis Data Analytics 检测热点

我们以纽约市出租车和豪华轿车委员会的一个简单数据集为例,该数据集用于跟踪黄色出租车的接客和送客地点。 其中大多数数据已经存储在 S3 中,可通过 s3://nyc-tlc/ 公开访问。我们将创建一个小型 Python 脚本,为 Kinesis 数据流载入出租车数据,并将这些数据提供给 Kinesis Data Analytics。最后,我们会将此信息输出到与 Amazon Elasticsearch Service 集群相连的 Kinesis Data Firehose,以通过 Kibana 呈现。根据我自己在纽约 5 年的生活经历,我们或许能在这些数据中找到一两个热点。


首先,我们将创建一个 Kinesis 输入流,并开始将我们的纽约市出租车载客数据发送到此流中。我编写了一个简单的 Python 脚本来读取其中一个 CSV 文件,并使用 boto3 将记录推送到 Kinesis。您可以按照自己的方式处理记录。


Python


import csvimport jsonimport boto3def chunkit(l, n):    """Yield successive n-sized chunks from l."""    for i in range(0, len(l), n):        yield l[i:i + n]
kinesis = boto3.client("kinesis")with open("taxidata2.csv") as f: reader = csv.DictReader(f) records = chunkit([{"PartitionKey": "taxis", "Data": json.dumps(row)} for row in reader], 500) for chunk in records: kinesis.put_records(StreamName="TaxiData", Records=chunk)
复制代码


接下来,我们将创建一个 Kinesis Data Analytics 应用程序,并将我们的输入流与出租车数据一起添加为源。





随后我们要自动检测 schema。



现在,我们将创建一个简单的 SQL 脚本来检测热点,并将其添加到应用程序的实时分析部分。


SQL


CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (    "pickup_longitude" DOUBLE,    "pickup_latitude" DOUBLE,    HOTSPOTS_RESULT VARCHAR(10000)); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"     SELECT "pickup_longitude", "pickup_latitude", "HOTSPOTS_RESULT" FROM        TABLE(HOTSPOTS(            CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),            1000,            0.013,            20        )    );
复制代码



我们的 HOTSPOTS 函数将获取输入流、窗口大小、扫描半径以及要计为热点的最小点数。这些值取决于应用程序,但您可以轻松在控制台中修改它们,直至获得所需的结果。文档中提供了有关参数的更多详细信息。 HOTSPOTS_RESULT 会返回一些有用的 JSON,让我们可以在热点周围绘制边界框:


``


{  "hotspots": [    {      "density": "elided",      "minValues": [40.7915039, -74.0077401],      "maxValues": [40.7915041, -74.0078001]    }  ]}
复制代码


获得所需结果后,即可保存脚本,并将应用程序连接到 Amazon Elastic Search Service Firehose 传输流。我们可以在 Firehose 中运行一个中间 Lambda 函数,将记录转换成更符合地理工作需求的格式。然后可以在 Elasticsearch 中更新映射,以将热点对象的索引编制为 Geo-Shapes。


最后,我们可以连接到 Kibana 并呈现结果。



看起来曼哈顿的交通十分繁忙!


现已推出


此功能现已在提供 Kinesis Data Analytics 的所有区域推出。我认为这是 Kinesis Data Analytics 一项十分有趣的新功能,可以直接帮助许多应用程序创造价值。在 Twitter 或评论中告诉我们您构建的内容!


本文转载自 AWS 技术博客。


原文链接:


https://amazonaws-china.com/cn/blogs/china/real-time-hotspot-detection-in-amazon-kinesis-analytics/


2019 年 10 月 28 日 08:00158

欲了解 AWS 的更多信息,请访问【AWS 技术专区】

评论

发布
暂无评论
发现更多内容

互联网应用系统技术方案主要解决什么问题?

博古通今小虾米

开源=免费?

Learun

深入分析CRM系统对现代企业的作用

Learun

区块链支付系统开发公司,USDT承兑支付

135深圳3055源中瑞8032

区块链是一个不知道要解决什么问题的解决方案吗?

CECBC区块链专委会

比特币 区块链 银行

MySQL-技术专题-查询速度性能

李浩宇/Alex

架构师训练营第1期第四周作业二

道长

极客大学架构师训练营

光大银行刘淼:基于华为云GaussDB(DWS) 数据仓库创新实践

华为云开发者社区

数据仓库 数据 huawei

4年Java经验,备战两月成功拿到美团、京东、字节offer

Java成神之路

Java 面试 算法 编程语言 面试程序员

架构1期第四周作业1-大型互联网系统技术梳理

道长

极客大学架构师训练营

lldb常用命令与调试技巧

iOSer

ios lldb常用命令 lldb调试技巧

EffectiveJava读书笔记-01-对象创建与销毁

wander

读书笔记 编程开发

区块链数字钱包技术开发,数字资产钱包

135深圳3055源中瑞8032

程序员去外包真的不可取吗?

Java架构师迁哥

31道Java核心面试题,一次性打包送给你

小Q

Java 学习 程序员 架构 面试

建筑行业区块链应用场景是怎样的

CECBC区块链专委会

区块链 行业资讯

技术实操丨SoundNet迁移学习之由声音分类到语音情感识别

华为云开发者社区

AI 数据 语音识别

2020年秋招阿里136道Java高级岗面试题(含答案及复习资源)

Java架构之路

Java 程序员 面试 算法 编程语言

spring-boot-route(十)多数据源切换

Java旅途

Java Spring Boot

从构建小系统到架构分布式大系统,Spring Boot2的精髓全在这里了

Java架构之路

Java 程序员 面试 Spring Boot 编程语言

Nacos-技术专题-配置中心实现

李浩宇/Alex

PyFlink + 区块链?揭秘行业领头企业 BTC.com 如何实现实时计算

Apache Flink

flink

开源数据库这么香,为什么我们还要下功夫自研?

华为云开发者社区

数据库 开源 数据

Spring Cloud 微服务实践(7) - 日志

xiaoboey

kafka 微服务 Spring Cloud 日志 spring cloud stream

Web前后端:如何分离,如何解耦?

华为云开发者社区

前端 后端 开发

Java程序员月薪多少K才能在北上广买得起房?

Java架构之路

Java 程序员 编程语言

数字货币交易所源码开发,交易所APP搭建

135深圳3055源中瑞8032

区块链教育 丨 首批区块链专业新生正式入学

CECBC区块链专委会

区块链技术 区块链教育

来不及解释了,快上车!力软快速开发平台,助力企业搭乘万物互联的顺风车

Learun

架构师训练营第四周作业

郎哲158

架构师训练营第四章 系统架构总结

郎哲158

边缘计算隔离技术的挑战与实践

边缘计算隔离技术的挑战与实践

Amazon Kinesis Analytics 中的实时热点检测-InfoQ