AICon 上海站|日程100%上线,解锁Al未来! 了解详情
写点什么

实时计算框架 Flink 在教育行业的应用实践(下)

  • 2019-11-07
  • 本文字数:3118 字

    阅读完需:约 10 分钟

实时计算框架 Flink 在教育行业的应用实践(下)

1.3.2 Spark 基于 Structured Streaming 的实现

Spark 发送数据到 Kafka,及最后的执行分析计划,与 Flink 无区别,不再展开。下面简述差异点。

1. 编写 Spark 任务分析代码

(1)构建 SparkSession


如果需要使用 Spark 的 Structured Streaming 组件,首先需要创建 SparkSession 实例,代码如下所示:


val sparkConf = new SparkConf()  .setAppName("StreamingAnalysis")  .set("spark.local.dir", "F:\\temp")  .set("spark.default.parallelism", "3")  .set("spark.sql.shuffle.partitions", "3")  .set("spark.executor.instances", "3")
val spark = SparkSession .builder .config(sparkConf) .getOrCreate()
复制代码


(2)从 Kafka 读取答题数据


接下来,从 Kafka 中实时读取答题数,并生成 streaming-DataSet 实例,代码如下所示:


val inputDataFrame1 = spark  .readStream  .format("kafka")  .option("kafka.bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092")  .option("subscribe", "test_topic_learning_1")  .load()
复制代码


(3)进行 JSON 解析


从 Kafka 读取到数据后,进行 JSON 解析,并封装到 Answer 实例中,代码如下所示:


val keyValueDataset1 = inputDataFrame1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]
val answerDS = keyValueDataset1.map(t => { val gson = new Gson() val answer = gson.fromJson(t._2, classOf[Answer]) answer})
复制代码


其中 Answer 为 Scala 样例类,代码结构如下所示:


case class Answer(student_id: String,                  textbook_id: String,                  grade_id: String,                  subject_id: String,                  chapter_id: String,                  question_id: String,                  score: Int,                  answer_time: String,                  ts: Timestamp) extends Serializable
复制代码


(4)创建临时视图


创建临时视图代码如下所示:


answerDS.createTempView("t_answer")
复制代码


(5)进行任务分析


仅以需求 1(统计题目被作答频次)为例,编写代码如下所示:


  • 实时:统计题目被作答频次


//实时:统计题目被作答频次val result1 = spark.sql(  """SELECT    |  question_id, COUNT(1) AS frequency    |FROM    |  t_answer    |GROUP BY    |  question_id  """.stripMargin).toJSON
复制代码


(6)实时输出分析结果


仅以需求 1 为例,输出到 Kafka 的代码如下所示:


result1.writeStream .outputMode("update") .trigger(Trigger.ProcessingTime(0)) .format("kafka") .option("kafka.bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092") .option("topic", "test_topic_learning_2") .option("checkpointLocation", "./checkpoint_chapter11_1") .start()
复制代码

1.3.3 使用 UFlink SQL 加速开发

通过上文可以发现,无论基于 Flink 还是 Spark 通过编写代码实现数据分析任务时,都需要编写大量的代码,并且在生产集群上运行时,需要打包程序,然后提交打包后生成的 Jar 文件到集群上运行。


为了简化开发者的工作量,不少开发者开始致力于 SQL 模块的封装,希望能够实现只写 SQL 语句,就完成类似上述的需求。UFlink SQL 即是 UCloud 为简化计算模型、降低用户使用实时计算 UFlink 产品门槛而推出的一套符合 SQL 语义的开发套件。通过 UFlink SQL 模块可以快速完成这一工作,实践如下。

1. 创建 UKafka 集群

在 UCloud 控制台 UKafka 创建页,选择配置并设置相关阈值,创建 UKafka 集群。



提示:此处暂且忽略在 Kafka 集群中创建 Topic 的操作。

2. 创建 UFlink 集群

  • 在 UCloud 控制台 UFlink 创建页,选择配置和运行模式,创建一个 Flink 集群。



  • 完成创建


3. 编写 SQL 语句

完成之后,只需要在工作空间中创建如下形式的 SQL 语句,即可完成上述 3 个需求分析任务。


(1)创建数据源表


创建数据源表,本质上就是为 Flink 当前上下文环境执行 addSource 操作,SQL 语句如下:


CREATE TABLE t_answer(    student_id VARCHAR,    textbook_id VARCHAR,    grade_id VARCHAR,    subject_id VARCHAR,    chapter_id VARCHAR,    question_id VARCHAR,    score INT,    answer_time VARCHAR,    ts TIMESTAMP )WITH(    type ='kafka11',    bootstrapServers ='ukafka-mqacnjxk-kafka001:9092,ukafka-mqacnjxk-kafka002:9092,ukafka-mqacnjxk-kafka003:9092',    zookeeperQuorum ='ukafka-mqacnjxk-kafka001:2181/ukafka',    topic ='test_topic_learning_1',    groupId = 'group_consumer_learning_test01',    parallelism ='3' );
复制代码


(2)创建结果表


创建结果表,本质上就是为 Flink 当前上下文环境执行 addSink 操作,SQL 语句如下:


CREATE TABLE t_result1(    question_id VARCHAR,    frequency INT)WITH(    type ='kafka11',    bootstrapServers ='ukafka-mqacnjxk-kafka001:9092,ukafka-mqacnjxk-kafka002:9092,ukafka-mqacnjxk-kafka003:9092',    zookeeperQuorum ='ukafka-mqacnjxk-kafka001:2181/ukafka',    topic ='test_topic_learning_2',    parallelism ='3');
CREATE TABLE t_result2( grade_id VARCHAR, frequency INT)WITH( type ='kafka11', bootstrapServers ='ukafka-mqacnjxk-kafka001:9092,ukafka-mqacnjxk-kafka002:9092,ukafka-mqacnjxk-kafka003:9092', zookeeperQuorum ='ukafka-mqacnjxk-kafka001:2181/ukafka', topic ='test_topic_learning_3', parallelism ='3');
CREATE TABLE t_result3( subject_id VARCHAR, question_id VARCHAR, frequency INT)WITH( type ='kafka11', bootstrapServers ='ukafka-mqacnjxk-kafka001:9092,ukafka-mqacnjxk-kafka002:9092,ukafka-mqacnjxk-kafka003:9092', zookeeperQuorum ='ukafka-mqacnjxk-kafka001:2181/ukafka', topic ='test_topic_learning_4', parallelism ='3');
复制代码


(3)执行查询计划


最后,执行查询计划,并向结果表中插入查询结果,SQL 语句形式如下:


INSERT INTO    t_result1  SELECT      question_id, COUNT(1) AS frequency    FROM      t_answer    GROUP BY      question_id;
INSERT INTO t_result2 SELECT grade_id, COUNT(1) AS frequency FROM t_answer GROUP BY grade_id;
INSERT INTO t_result3 SELECT subject_id, question_id, COUNT(1) AS frequency FROM t_answer GROUP BY subject_id, question_id;
复制代码


SQL 语句编写完毕后,将其直接粘贴到 UFlink 前端页面对话框中,并提交任务,即可快速完成上述 3 个需求。如下图所示:


1.3.4. UFlink SQL 支持多流 JOIN

Flink、Spark 目前都支持多流 JOIN,即 stream-stream join,并且也都支持 Watermark 处理延迟数据,以上特性均可以在 SQL 中体现,得益于此,UFlink SQL 也同样支持纯 SQL 环境下进行 JOIN 操作、维表 JOIN 操作、自定义函数操作、JSON 数组解析、嵌套 JSON 解析等。更多细节欢迎大家参考 UFlink SQL 相关案例展示https://docs.ucloud.cn/analysis/uflink/dev/sql

1.4 总结

UFlink 基于 Apache Flink 构建,除 100%兼容开源外,也在不断推出 UFlink SQL 等模块,从而提高开发效率,降低使用门槛,在性能、可靠性、易用性上为用户创造价值。 今年 8 月新推出的 Flink 1.9.0,大规模变更了 Flink 架构,能够更好地处理批、流任务,同时引入全新的 SQL 类型系统和更强大的 SQL 式任务编程。UFlink 预计将于 10 月底支持 Flink 1.9.0,敬请期待。


本文转载自公众号 UCloud 技术(ID:ucloud_tech)。


原文链接:


https://mp.weixin.qq.com/s/JFcANUK_Vfa7ZMXnn7sruQ


2019-11-07 23:44903

评论

发布
暂无评论
发现更多内容

RTE2021 实时互联网大会参会感想

轻口味

1024我在现场 10月月更

[架构实战营] 模块一作业

张祥

架构实战营

开源应用中心 | KodBox快捷高效的私有云在线文档管理系统

开源技术

3面蚂蚁,一路过关斩将 成功拿到offer定级P6,大厂面试雀食有点难!

进击的王小二

java面试 大厂面试 阿里巴巴面经总结 java

华为云企业级Redis:助力VMALL打造先进特征平台

华为云数据库小助手

GaussDB GaussDB ( for Redis ) 华为云数据库

硝烟弥漫的安全战场,只等一位超级英雄登场

白洞计划

从芯片公司到VR,字节跳动为了元宇宙加码布局

海比研究院

以“有用”为圆心:重新认识智慧城市的“高手之路”

脑极体

kotlin实现接口,已开源下载

android 程序员 移动开发

华为全球首发《全光自动驾驶网络白皮书》,助力打造品质联接新体验

企业系统太多?WorkPlus让工作事半功倍

BeeWorks

打造价值交付体系,企业 CIO 如何应对 DevOps 命题?

BoCloud博云

DevOps 云原生

GrowingIO 数据安全实践

GrowingIO技术专栏

隐私保护 数据安全 隐私安全 数据安全法

kotlin库,大佬带你看源码

android 程序员 移动开发

kotlin开发网站,字节跳动大神讲座

android 程序员 移动开发

Python爬虫实战 | 利用多线程爬取 LOL 高清壁纸

JackTian

Python 程序员 爬虫 后端 实战

喜大普奔!BFE 控制平面正式开源发布!

百度开发者中心

负载均衡 云原生 Go 语言 开源技术

来,肝了这份网络安全学习计划无敌

网络安全学海

网络安全 信息安全 渗透测试 WEB安全 学习安全

自定义View:resolveSizeAndState方法

Changing Lin

10月月更

Android 音视频 - EGL 源码解析以及 C++ 实现

声网

android 音视频 OpenGL ES

5面阿里斩获offer(Java岗),原来阿里面试官总喜欢问这种问题

进击的王小二

Java java面试 大厂面试

技术干货 | 闲鱼:一个优秀的 Push 平台,需要经历怎样的前世今生

蚂蚁集团移动开发平台 mPaaS

消息推送 push mPaaS

从区块链到元宇宙 Metaverse

devpoint

区块链 元宇宙 10月月更

ironSource 斩获 2021 年度鲸鸣奖三大重量级奖项

各位Oracle DBA们,你们期待的在线实训环境终于来了

墨天轮

MySQL 数据库 oracle redis 实训

EMQ 映云科技5G 边缘计算工业解决方案获中国移动创客马拉松大赛三等奖

EMQ映云科技

5G 物联网 边缘计算 移动互联网

产业数字化的思考

Geek_vidmje

面试官:如何防止 Java 源码被反编译?我竟然答不上来。。

Java 编程 程序员 架构 面试

端智能研发核心套件:MNN 工作台深度剖析

阿里巴巴终端技术

深度学习 ios android 移动端 端智能

EMQ 在2021电力人工智能大会:稳健数据基础设施架构支撑电力数字化发展

EMQ映云科技

人工智能 物联网 电力 mqtt

揭秘!探访百度AI反诈第一线

脑极体

实时计算框架 Flink 在教育行业的应用实践(下)_文化 & 方法_刘景泽_InfoQ精选文章