高品质的音视频能力是怎样的? | Qcon 全球软件开发大会·上海站邀请函 了解详情
写点什么

基于 Flink 构建用户实时基础行为工程

  • 2019-04-18
  • 本文字数:5772 字

    阅读完需:约 19 分钟

基于Flink构建用户实时基础行为工程

导读

Hi,小伙伴们!今天我给大家介绍下基于 Flink 构建用户实时基础行为工程的相关实践,包括 Flink 相关的技术点和基础行为实时工程的业务。


Flink 是目前 Qunar 主推的实时数据处理开源平台,用于替代 SparkStreaming。如果你们使用 Flink 也是和我们之前一样,不知道如何使用我们的 Flink 实时计算平台,或者不知道该怎样合理利用其 Features 去更好构建我们的工程,再或者你想了解每天处理超过 12 亿条实时数据,数据实时性达到秒级,QPS 可支持 10 万的用户实时基础行为工程的技术实现,你在后面应该能找到你的答案。


Flink 简介

Apache Flink 是一个面向数据流处理和批量数据处理的分布式的开源计算框架,能够支持流处理和批处理两种应用类型。有着低延迟、Exactly-once 保证,而批处理需要支持高吞吐、高效处理的特点。


Flink 是完全支持流处理,也就是说作为流处理看待时输入数据流是无界的;批处理被作为一种特殊的流处理,只是它的输入数据流被定义为有界的。这与 sparkstreaming 不同,sparkstreaming 是将流处理视为无限个有界的批处理(microbatch)。


1 Flink 特点

(1)有状态计算的 Exactly-once 语义。状态是指 flink 能够维护数据在时序上的聚类和聚合,同时它的 checkpoint 机制可以方便快速的做出失败重试;


(2)支持带有事件时间(event time)语义的流处理和窗口处理。事件时间的语义使流计算的结果更加精确,尤其在事件到达无序或者延迟的情况下;


(3)支持高度灵活的窗口(window)操作。支持基于 time、count、session,以及 data-driven 的窗口操作,能很好的对现实环境中的创建的数据进行建模;


(4)轻量的容错处理(fault tolerance)。 它使得系统既能保持高的吞吐率又能保证 exactly-once 的一致性。通过轻量的 state snapshots 实现;


(5)支持高吞吐、低延迟、高性能的流处理;


(6)支持 savepoints 机制(一般手动触发)。即可以将应用的运行状态保存下来;在升级应用或者处理历史数据是能够做到无状态丢失和最小停机时间;


(7)支持大规模的集群模式,支持 yarn、Mesos。可运行在成千上万的节点上;


(8)支持具有 Backpressure 功能的持续流模型;


(9)Flink 在 JVM 内部实现了自己的内存管理,包括完善的内存架构和 OOM error prevention;


(10)支持迭代计算;


(11)支持程序自动优化:避免特定情况下 Shuffle、排序等昂贵操作,中间结果进行缓存。


2 Flink 分布式 runtime


(1)JobManager 主要工作是协调分布式系统的运行。比如协调各个任务的执行时间,管理 checkpoint 和协调异常状态的恢复等。


(2)TaskManager 是任务的真正执行者,包括数据流的缓存和交换等操作。


(3)client 不是 Flink Runtime 的一部分,也不参与任务的真正执行,只是用来启动 Job 时生成执行计划并交给 JobManager。


3 Flink 流式(DataStream)编程模型

3.1 编程抽象层级


最底层为有状态的流,通过处理函数进入 DadaStream 的 API 处理层。DataStream API 层也叫核心 API 层,一般大部分编程工作都集中于此,包括业务处理,聚合,关联等逻辑操作。Table API 层和 SQL 层其实是可以提供基于 schema 的 SQL 查询形式,目前较少使用。


3.2 流式编程


Flink 编程模型中三大元素分别是 Source,Operator 和 Sink。Flink 的流起始于 source,经过 Transformation Operator 对流进行处理,最后在 Sink 进行持久化。


用户实时基础行为工程简介

1 用户基础行为工程架构


首先适用 Flink 订阅各个业务线日志的 Kafka 集群 Topic,实时数据进入 Flink 集群中运行的各个业务线对应的 Job 进行数据清洗,为保证实时性和增加系统吞吐量,直接按照业务线为 Key 存入 redis 中。Server 端按照 Gid 和 Username 从 redis 中取出此用户所有业务线的所有行为并与离线数据合并,通过 dubbo 接口返回给客户端。为了减小服务的压力,数据的截取、解压缩等耗 CPU 资源的操作都在客户端进行。


2 基础用户行为工程的意义

目前我们提供的服务是提供一个用户 100 天内的实时行为,包括了机票、酒店、火车票、门票、度假、车票、大搜等业务线的搜索、点击、收藏、订单、预定等行为,数据源为 Hotdog 日志,Kylin 日志,业务线日志等。为了补充度假,门票和景点的商品信息,使用了 skuvacationinfo,skuticketinfo,skusightinfo 这三个 SKU 库。


基于用户行为,我们可以做精细化留存评估,让留存数据更有价值和指导意义。也可以进行质量评估,需要基于用户行为并且贴合业务去评估,比如某个景点的搜索点击行为在同类中较高,那我们就可以认为此景点为优质景点从而增加此景点的推荐权重等。用户行为也可以用作产品分析,用数据量化产品核心功能,让产品迭代排期更科学,部门配合更高效。用户行为还可以更好实现用户分群、用户分层等精准营销。


现在基础用户行为工程已经服务于首页所有的推荐场景。用户实时行为服务已经广泛用于定向广告、首页预制词、单品推荐、目的地推荐等多个场景。我们在个性化推荐场景中进行了测试,使用实时行为服务比使用 T-1 日行为数据,点击率提升 20%。


DataStream 的典型算子(operator)使用举例

1 使用 Filter 对流进行过滤

某些时候一种日志流中包含了许多种不同行为类型的日志,但业务处理时我们只需要对一种行为类型的日志进行清洗,这时我们可以使用 DataStream 的 Filter 来对数据流进行过滤。


dataStream.filter(new FilterFunction<Integer>() { @Override public boolean filter(Integer value) throws Exception { return value != 0;}});
复制代码


2 使用 Split 和 select 对流进行拆分

实际业务中经常会遇到一种日志包含了多种不同的业务或者行为,但我们想将不同的业务分流后分别处理,这就使用到了分流 split 算子。


SplitStream<Integer> split = someDataStream.split( new OutputSelector<Integer>() { @Override public Iterable<String> select(Integer value) {        List<String> output = new ArrayList<String>(); if (value % 2 == 0) {            output.add("even");        } else {            output.add("odd");        } return output;    }});
复制代码


分流过后,再使用 select 算子进行拆分。


SplitStream<Integer> split;DataStream<Integer> even = split.select("even");DataStream<Integer> odd = split.select("odd");DataStream<Integer> all = split.select("even","odd");
复制代码


逻辑非常的清晰,实现非常简单。


3 使用 Join 实现双流的聚合

当我们要实现两个数据流中的数据关联的时候,我们可能想到使用 redis 等缓存中间件对中间数据进行缓存,幸运的是 Flink 的算子中已经提供了两条流进行关联的操作 Join。Join 操作和 SQL 的 Join 道理是一样的,需要用 where 指定 Join 的字段和用 equalTo 指定 Join 的条件,最后使用 apply 对 Join 成功的结果进行处理。节约了中间件的资源。


dataStream.join(otherStream)    .where(<key selector>).equalTo(<key selector>)    .window(TumblingEventTimeWindows.of(Time.seconds(3)))    .apply { ... }
复制代码


容错策略

1 异常重试策略

特殊情况下(如遇上无法解析的数据或者出现未知异常)导致任务执行失败,Flink 会根据自己的 checkpoints 来进行自动重启恢复。


Flink 的 checkpointing 机制会存储状态的一致性快照,配置了不同的状态存储策略,checkpoints 就会保存在不同的地方,比如 JM 的内存,文件系统或是数据库。


当前我们设置 5 秒触发一次 checkpoint 保存,为了节约集群内存资源我们选择保存的位置为 HDFS。


重启的策略支持自定义,集群的重启策略可以通过 flink-conf.yaml 的 restart-strategy 来进行集群级别的控制,也可以在 Job 级别进行设置,分为固定延迟重启策略(Fixed Delay Restart Strategy)和失败率重启策略(Failure rate Restart Strategy)。


固定延迟重启策略会尝试一个给定的次数来重启 Job,如果超过了最大的重启次数,Job 最终将失败。在连续的两次重启尝试之间,重启策略会等待一个固定的时间。


失败率重启策略在 Job 失败后会重启,但是超过失败率后,Job 会最终被认定失败。在两个连续的重启尝试之间,重启策略会等待一个固定的时间。


当前我们使用 Job 级别的设置,重启 1 次,间隔 10 秒。


2 停机恢复策略

任务有更新时,Flink 版本升级时,系统升级或系统迁移等情况下,需要停掉 running 状态的 Job,此时如何保证正在处理的数据不会丢失?这就用到了 Flink 的另一个特性 Savepoint。


不同于 checkpoint 的自动触发机制,Savepoint 是手动触发的。Savepoint 为全局一致性快照,可以保存数据源 offset,operator 操作状态等信息。


(1)停止 job 并保存 savepoit


./bin/flink cancel -s [savepointDirectory] <jobID>
复制代码


(2)启动 job 并开启 savepoint


./bin/flink run -s <savepointPath> ...
复制代码


(3)使用 WEBUI 启动 Job 并输入 savepoint 路径



3 监控告警

Flink 提供了 Metrics,相当于我们的 Qmonitor 来对各项指标进行监控,API 丰富且使用简单。监控的指标可以投射到 Watcher 上面。目前我们所有的任务包括了“单位时间接收的数据”、“单位时间处理失败的数据”、“数据从产生到进入 Flink 的延时”、“数据在 Flink 处理的实际时间”、“单位时间持久化成功率”等监控。


可用性

1 HA

目前实时处理使用的是公司 Flink 集群,4 台 taskmanager,16 个 slot。DB 使用的是 mysql-mmm 高可用方式。Redis 使用 10 个实例的集群。Server 使用 8 台虚机做的 NG。没有单点问题。


2 使用 Flink Backpressure 应对流量洪峰

Flink 自带背压感知功能。使我们不用手动去缓存过剩的消息,Flink 会自动控制消费速度。其实现方式印证了“最简单的办法往往最有效”这个道理。Flink 使用分布式阻塞队列来作为有界缓冲区。如同 Java 里通用的阻塞队列跟处理线程进行连接一样,一旦队列达到容量上限,一个相对较慢的接受者将拖慢发送者。



(1)记录“A”进入 Flink,然后被 Task 1 处理;


(2)Task 1 处理后的结果被序列化进缓冲区;


(3)task 2 从缓冲区内读取一些数据,缓冲区内将有更多的空间;


(4)如果 task 2 处理的较慢,task1 的缓存区将很快填满。发送速度随之下降。


我们可以通过 WEBUI 来查看 Flink 各个 operator 的背压状态。



3 使用 EventTime、Window 和 WaterMark 处理无序流量(数据延时)

实际处理实时数据的过程中,由于日志收集和传递发送过程中,难免会在时间上乱序,就导致在处理前后有依赖性或者关联性的数据的时候出现问题。Flink 可以使用 EventTime 和 WaterMark 优雅的处理无序流量问题。


3.1 Time

在 flink 中元素可以设置 3 种不同的时间模型:-Processing time 此时间为元素进入 operator 后被赋予的当前算子所在服务器的本地时间戳。简单说就是算子时间。-Event time 此时间为元素真正的产生时间(例如日志内容中的时间戳),所以通常情况下需要我们从原始的日志内容中提取出来。-Ingestion time 此时间的赋值是在元素从 source 发出,刚进入 operator 时此时的服务器本地时间戳。



从上面的简介中可以看出,如果我们想让保证元素的顺序与其最初产生的顺序一致,我们需要使用 EventTime 时间模型。


3.2 Window

Window 是无限数据流处理的核心,Window 将一个无限的 stream 拆分成有限大小的缓存区,我们在这些缓存区中对数据进行处理和计算。是一个相对比较好理解的概念。Window 分为滚动窗口、滑动窗口、会话窗口、全局窗口等。可以根据业务需求去选择使用。由于篇幅原因,这里暂时不详细介绍各自的原理和使用。


3.3 Watermark

首先介绍一下 watermark 的由来。当我们使用 window 去处理 EventTime 的乱序流时,难免会遇到延迟的元素,但我们又不想无限期的等下去,所以我们想要这么一种方式去告诉 window 停止等待,马上进入计算,watermark 就是做此工作的。


Watermark 是衡量 EventTime 的流进度的一种方式。我们可以把水印视为 flink 插入流中的一个元素,它也拥有一个时间戳,只是这个元素不会像普通元素那样被做逻辑处理。



以上图为例,W(11)和 W(17) 是两个 watermark 分别携带的时间戳为 11 和 17,当算子处理到 W(11)时,首先它先识别出这是一个 watermark 对象,然后它会知道时间戳小于 11 的元素已经不会再进入此 window 了,于是触发当前 window 进行计算并将 11 缓存到自己的状态中。


3.4 编程实现

对 EventTime、Window 和 WaterMark 的概念的介绍完后,我们来了解下实践中这三者是如何进行配合来处理乱序流的。


3.4.1 首先开启 EventTime 时间模型

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
复制代码


我们需要告诉 Flink 我们需要使用 EventTime 的元素时间模型。


3.4.2 设置 window

ds.window(SlidingEventTimeWindows.of(Time.minutes(2), Time.minutes(1)))
复制代码


这里我们例子中选用的滑动窗口,根据业务调整窗口的大小和窗口滑动的距离。


3.4.3 设置 watermark

设置 watermark 有两种方式。


(1)AssignerWithPeriodicWatermarks:定时更新的 watermark。


(2)AssignerWithPunctuatedWatermarks:每个元素到来都要设置一个 watermark。


虽然后者更精准,但是大数据量的情况下会影响性能,一般使用前者。


        static AssignerWithPeriodicWatermarks assigner = new AssignerWithPeriodicWatermarks<String>() { private final long maxTimeLag = 60000;//60 secs @Nullable  @Override public Watermark getCurrentWatermark() { //设置允许延后60秒  return new Watermark(System.currentTimeMillis() - maxTimeLag);       } @Override public long extractTimestamp(String logStr, long l) {            JsonObject jsonObject =                  (JsonObject) new JsonParser().parse(logStr).getAsJsonObject(); return jsonObject.get("timestamp").getAsLong();        }    };}
复制代码


重写 getCurrentWatermark()来设置生成 watermark 的方式。重写 extractTimestamp()来提取元素的 eventtime。在 WEBUI 上可以实时查看 watermark 状态。



4 性能扩展

当业务扩展时,只需要申请新的 taskmanager,扩容 Redis 实例,申请虚机,然后提交新的 Job 即可,水平扩展非常简单方便,完全不影响在运行的业务。


作者简介

孙赵宏,2018 年 4 月加入去哪儿网,后端大数据研发工程师,目前在大住宿事业部/公共技术中心负责用户基础行为数据工程的研发。是一个贱贱的 geek,想象力和技术同样重要!工匠精神编码,谦卑态度学习,勇敢品质创新!


2019-04-18 00:086515

评论

发布
暂无评论
发现更多内容

A/B测试助力游戏业务增长

字节跳动数据平台

游戏开发 游戏 ab测试 游戏运营

AIOps落地五大原则(二):价值路线

BizSeer必示科技

数据资产管理

奔向架构师

数据资产 数据管理 6月月更

百度安全再次亮相高性能计算国际顶会SC 2022 — 采用Fuzzing技术防护高性能计算静默数据损坏安全风险

百度安全

百度安全 百度安全实验室 高性能计算国际顶会 SC 2022 Fuzzing技术防护

AWS的运营管理类服务

冯亮

云计算 AWS

新课上线 | 每次 5 分钟,轻松玩转阿里云容器服务!

阿里巴巴云原生

阿里云 云原生 容器服务

JVM调优简要思想及简单案例-新生代回收算法

zarmnosaj

6月月更

数据库每日一题---第18天:每天的领导和合伙人

知心宝贝

数据库 大数据 前端 后端 6月月更

Android 修改系统音量及监听

yechaoa

android 6月月更 AudioManager

vue导航路由

小恺

6月月更

【网络进阶】网络问题排查实例集锦(实战经验分享)

dvlinker

Linux 服务器 网络知识 网络进阶

链上智能合约Dapp系统开发部署搭建

薇電13242772558

区块链 智能合约

携手腾竞体育后,英特尔IMC如何加速电竞生态正循环?

科技之家

flutter系列之:flutter中的builder

程序那些事

flutter 程序那些事 6月月更

「微服务的细节」—— 周期性注册 or 一次性注册

袁世超

微服务

【网络入门】详解常用的基础网络知识(面试常考内容)

dvlinker

网络知识 TCP与UDP 心跳机制与丢包重传 网络命令 抓包分析

uni-app进阶之创建组件/原生渲染【day9】

黎燃

6月月更

小红书严打买卖账号及刷量作弊行为:必须维护平台的公信力

石头IT视角

服务治理的工作内容

阿泽🧸

微服务 6月月更

喜讯!云效度量能力获信通院先进级评估

阿里云云效

云计算 阿里云 DevOps 研发效能 研发

攻防演练中红队的内网横向扩展

穿过生命散发芬芳

6月月更 攻防演练

客户案例|观测云助力合思信息升级新一代可观测平台

观测云

浅谈融云即时通讯服务「日志优化」

融云 RongCloud

稳住了,别抖!—— 看GetX 的Worker如何防抖

岛上码农

flutter ios 前端 安卓开发 6月月更

对讲功能在远程办公中的应用 | 社区征文

Changing Lin

初夏征文

统一日志

卢卡多多

日志 6月月更

一问带你彻底了解JVM-Java虚拟机内存区域详解

派大星

JVM

从华为WeAutomate数字机器人论坛,看政企领域的“政务新智理”

王吉伟频道

RPA 数字化转型 华为WeAutomate 政务新智理 数字政府

Larix真正的去中心化借贷平台,并开启double Mining活动

鳄鱼视界

透过华为军团看科技之变(四):互动媒体(音乐)

脑极体

基于Flink构建用户实时基础行为工程_大数据_孙赵宏_InfoQ精选文章