【AICon】探索八个行业创新案例,教你在教育、金融、医疗、法律等领域实践大模型技术! >>> 了解详情
写点什么

使用 spark-submit 提交用户应用程序

  • 2019-12-11
  • 本文字数:5027 字

    阅读完需:约 16 分钟

使用 spark-submit 提交用户应用程序

Francisco Oliveira AWS _专业服务团队的顾问_客户开始大数据之旅时,通常会需要一些关于如何将用户应用程序提交到


Amazon EMR


Spark 方面的指导。例如,客户会需要以下方面的指导:如何设定可用于其应用程序的内存和计算资源的大小,以及适用于其使用案例的最佳资源分配模型。在本文中,我将介绍如何通过设置


spark-submit 标志来控制提交到在 EMR 上运行的 Spark 应用程序可用的内存和计算资源。我还会探讨在哪些情况下使用 maximumResourceAllocation 配置选项和 executor 执行程序的动态资源分配。

Spark 执行模型

从较高维度看,每个 Spark 应用程序都有一个 driver 驱动程序,它以任务形式为集群多个节点上运行的 executor 执行程序分配工作。



Driver 驱动程序是一种应用程序代码,用于定义应用于数据集的转换和动作。driver 驱动程序的核心是实例化的 SparkContext 类的对象。该对象允许 driver 驱动程序获取到集群的连接、请求资源、将应用程序操作拆分为任务,以及在 executor 执行程序中安排和启动任务。


Executor 执行程序不仅执行驱动 driver 程序发送的任务,还会在本地存储数据。随着 executor 执行程序不断创建和销毁(请参阅后文的“启用 executor 执行程序的动态资源分配”部分),它们通过 driver 驱动程序注册和注销。Driver 驱动程序和 executor 执行程序直接进行通信。


要执行应用程序,driver 驱动程序会组织要在作业中完成的工作。每个作业分为多个阶段,每个阶段由一组并行运行的独立任务组成。任务是 Spark 中的最小工作单元,用于在不同的分区执行相同的代码。

Spark 编程模型

弹性分布式数据集 (RDD) 是 Spark 中的一个重要抽象层。这一抽象层是执行内存中计算的关键。RDD 是分布在集群节点之间的只读和不可变数据分区集合。Spark 中的分区允许并行执行数据子集。Spark 应用程序会创建 RDD 并对 RDD 执行操作。尽管 Spark 会自动对 RDD 进行分区,但您也可以自己设置分区数。


RDD 支持两种类型的操作:转换(transform)和动作(action)。转换是生成新 RDD 的操作,而动作是在数据集上运行转换后,将数据写入外部存储或将值返回 driver 驱动程序的操作。常见的转换包括按照键进行筛选、排序和分组。常见的动作包括收集任务结果并将其发送给 driver 驱动程序、保存 RDD 或计算 RDD 中的元素数。

spark-submit

在集群上启动应用程序的常用方法是使用 spark-submit 脚本。该脚本提供了几个标志,可让您控制应用程序使用的资源。


设置 spark-submit 标志是向在 driver 驱动程序中实例化的 SparkContext 对象动态提供配置的一种方法。当您创建集群和在应用程序中进行硬编码(但不建议这么做)时,spark-submit 还可以读取您使用 EMR 配置选项设置的 conf/spark-defaults.conf) 文件中设置的配置值_。_更改 conf/spark-defaults.conf 的另一种方法是使用 –conf prop = value 标志。我将介绍 spark-submit 标志和在 spark-defaults.conf 文件中使用的属性名称以及 –conf 标志。

在 EMR 上运行的 Spark 应用程序

提交到 EMR 上的 Spark 的任何应用程序都会在 YARN 上运行,且每个 Spark executor 执行程序都会作为 YARN 容器运行。在 YARN 上运行时,driver 驱动程序可以在集群中的某个 YARN 容器中运行(集群模式),也可以在 spark-submit 进程中本地运行(客户端模式)。


在集群模式下运行时,driver 驱动程序在 ApplicationMaster 上运行,ApplicationMaster 根据应用程序所需资源向 YARN ResourceManager 提交 YARN 容器请求。一个简化的和高维度的应用程序提交流程示意图如下所示。



在客户端模式下运行时,driver 驱动程序在 ApplicationMaster 外部运行,它运行在提交应用程序的机器上 的_spark-submit_ 脚本进程中。


设置 driver 驱动程序位置

借助 spark-submit,可以使用 –deploy-mode 标志选择 driver 驱动程序的位置。


如果希望进行调试并快速查看应用程序的输出时,可以选择以客户端模式提交应用程序。对于生产环境的应用程序,最佳实践是在集群模式下运行应用程序。此模式可确保在应用程序执行期间 driver 驱动程序始终可用。但是,如果您使用客户端模式,并且是在 EMR 集群外部(例如,在本地的笔记本电脑上)提交应用程序,需要注意 driver 驱动程序是在 EMR 集群外部运行,driver 驱动程序与 executor 执行程序之间的通信延迟会增加。

设置 driver 驱动程序资源

Driver 驱动程序的大小取决于它执行的计算量,以及它从 executor 执行程序收集的数据量。在集群模式下运行 driver 驱动程序时,spark-submit 为您提供了相关选项,可控制 driver 驱动程序使用的核心数 (–driver-cores) 和内存 (–driver-memory)。在客户端模式下,driver 驱动程序内存的默认值为 1024MB 和 1 个核心。

设置 executor 核心数和执行程序数

Executor 执行程序核心数_(__–executor-cores_ 或 spark.executor.cores)确定了每个 executor 执行程序可以并行执行的任务数。最佳实践是:为操作系统保留 1 个核心,为每个 executor 执行程序保留大约 4-5 个核心。请求的核心数受配置属性 yarn.nodemanager.resource.cpu-vcores 限制,该属性可以控制在一个节点中运行的所有 YARN 容器可用的核心数,该属性可以在 yarn-site.xml 文件中设置。


可以使用以下公式计算每个节点的 executor 执行程序数:


每个节点的__executor 执行程序数 = (节点的核心数 为操作系统保留的 1 个核心)__/ 每个__executor 执行程序的任务数


Spark 作业的 executor 执行程序_(__—_num-executorsspark.executor.instances)总数为:


executor 执行程序总数 = 每个节点的 executor 执行程序数 * 实例数 – 1。

设置 executor 执行程序的内存

每个 executor 执行程序容器的内存空间都分为两个主要区域:Spark executor 执行程序内存和 overhead 内存。



请注意:可以分配给 executor 执行程序容器的最大内存取决于 yarn-site.xml 中可用的 yarn.nodemanager.resource.memory-mb 属性。executor 执行程序内存(–executor-memoryspark.executor.memory)确定了每个 executor 执行程序进程可以使用的内存量。overhead 内存 (spark.yarn.executor.memoryOverHead) 是堆外内存,会自动添加到 executor 执行程序内存,它的默认值是 executorMemory * 0.10


Executor 执行程序内存将用于存储和执行目的堆的各个部分进行了统一。如果超出使用量,这两部分可以彼此借用空间。相关属性是 spark.memory.fractionspark.memory.storageFraction。有关更多信息,请参阅 Spark 1.6 中的统一内存管理白皮书。


可以使用以下公式计算每个 executor 执行程序的内存:


每个 executor 执行程序的内存 = 节点上最大容器大小 / 每个节点的 executor 执行程序数

spark-submit 示例

为了向您展示如何设置之前介绍的标志,我会提交一个 wordcount 示例应用程序,然后使用 Spark history server 获取执行的视图。


首先,以 EMR 步骤的形式,向现有集群提交一个修改版的 word count 示例应用程序。代码如下所示:


Bash


from __future__ import print_functionfrom pyspark import SparkContextimport sysif __name__ == "__main__":    if len(sys.argv) != 3:        print("Usage: wordcount  ", file=sys.stderr)        exit(-1)    sc = SparkContext(appName="WordCount")    text_file = sc.textFile(sys.argv[1])    counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)    counts.saveAsTextFile(sys.argv[2])    sc.stop()
复制代码


集群具有 6 个 m3.2xlarge 核心实例和 1 个主实例,每个实例有 8 个 vCPU 和 30GB 内存_。_此实例类型 yarn.nodemanager.resource.memory-mb 默认值是 23GB。


根据上述介绍的公式,spark-submit 命令如下所示:


Bash


spark-submit --deploy-mode cluster --master yarn --num-executors 5 --executor-cores 5 --executor-memory 20g –conf spark.yarn.submit.waitAppCompletion=false wordcount.py s3://inputbucket/input.txt s3://outputbucket/
复制代码


通过以下命令以 EMR 步骤的形式提交应用程序:


Bash


aws emr add-steps --cluster-id j-xxxxx --steps Type=spark,Name=SparkWordCountApp,Args=[--deploy-mode,cluster,--master,yarn,--conf,spark.yarn.submit.waitAppCompletion=false,--num-executors,5,--executor-cores,5,--executor-memory,20g,s3://codelocation/wordcount.py,s3://inputbucket/input.txt,s3://outputbucket/],ActionOnFailure=CONTINUE
复制代码


请注意:还可通过步骤定义设置属性 spark.yarn.submit.waitAppCompletion。当此属性设置为 false 时,客户端提交应用程序然后就可以退出,不需要等待应用程序完成。此设置可让您提交多个应用程序由集群同时执行,但是它仅在集群模式下可用。


使用 –driver-memory–driver-cores 的默认值,因为示例应用程序将直接写入 Amazon S3,并且 driver 驱动程序没有从 executor 执行程序接收任何数据。

启用 executor****执行程序的动态资源分配

YARN 上的 Spark 可以动态扩展和缩减 executor 执行程序的数量。该功能非常有用,在同时处理多个应用程序时可以释放空闲 executor 执行程序,让某个应用程序可以按需请求其他 executor 执行程序。


要启用该功能,请参阅 EMR 文档,了解相关步骤。


Spark 具有以下属性,可为动态分配机制提供精细控制:


  • executor 执行程序的初始数量 (spark.dynamicAllocation.initalExecutors)

  • 应用程序使用的最小 executor 执行程序数 (spark.dynamicAllocation.minExecutors)

  • 可以请求的最大 executor 执行程序数 (spark.dynamicAllocation.maxExecutors)

  • 删除空闲 executor 执行程序的时间 (sparkdynamicAllocation.executorIdleTime)

  • 请求新 executor 执行程序处理等待任务的时间(spark.dynamicAllocation.schedulerBacklogTimeoutspark.dynamicAllocation.sustainedSchedulerBacklogTimeout

使用最大资源分配自动配置****executor 执行程序

EMR 提供了一个能够自动配置这个属性的选项,以实现整个集群资源利用率最大化。如果集群一次只处理一个应用程序,此配置选项会非常有用。如果您希望同时运行多个应用程序,请不要使用该选项。


要启用该配置选项,请参阅 EMR 文档,了解相关步骤。


在集群创建期间设置此配置选项后,EMR 会自动更新 spark-defaults.conf 文件,来控制 executor 执行程序计算和内存资源的属性,如下所示:


  • spark.executor.memory = (yarn.scheduler.maximum-allocation-mb – 1g) -spark.yarn.executor.memoryOverhead

  • spark.executor.instances = [将此设置为核心节点的初始数量加上任务节点数]

  • spark.executor.cores = yarn.nodemanager.resource.cpu-vcores

  • spark.default.parallelism = spark.executor.instances * spark.executor.cores

查看并行任务视图

您可以通过 EMR 控制台访问 Spark history server UI。它提供了有关应用程序性能和行为的有用信息。您可以查看计划的阶段和任务列表、检索有关 executor 执行程序的信息、获取内存使用情况摘要,以及检索提交到 SparkContext 对象的配置。在本文中,我将介绍如何将以上示例中使用的 spark-submit 脚本设置的标志转换为可视化工具。


要访问 Spark history server,请启用 SOCKS 代理,然后在连接下选择 Spark History Server



对于 Completed applications,请选择唯一可选的条目,然后展开事件时间轴,如下所示。Spark 根据_–num-executors_ 标志中定义的要求添加了 5 个 executor 执行程序。



然后,导航到阶段详细信息,您就可以看到每个 executor 执行程序并行运行的任务数了,该值与 –executor-cores 标志的值相同。


小结

本文向您介绍了如何使用 spark-submit 标志向集群提交应用程序。我们详细阐述了如何控制 driver 驱动程序运行位置、如何设置分配给 driver 驱动程序和 executor 执行程序的资源以及 executor 执行程序数量。此外,我们还介绍了在哪些情况下需使用 maximumResourceAllocation 配置选项和 executor 执行程序的动态资源分配。


如果您有任何问题或建议,欢迎留言。


—————————-


相关内容


使用支持 S3 的笔记本通过 Amazon EMR 上的 Spark 运行外部 Zeppelin 实例



想要了解有关大数据或流数据的更多信息? 请查看我们的大数据流数据教学页。


本文转载自 AWS 技术博客。


原文链接:https://amazonaws-china.com/cn/blogs/china/submitting-user-applications-with-spark-submit/


2019-12-11 15:381057

评论

发布
暂无评论
发现更多内容

使用LiteOS Studio图形化查看LiteOS在STM32上运行的奥秘

华为云开发者联盟

LiteOS 脚本 语言

一周信创舆情观察(11.30~12.6)

统小信uos

如何判断一个区块链项目是否优质?

CECBC

开源

三分钟看懂新一代.Net Core3.1工作流引擎平台

Philips

敏捷开发 工作流

构师训练营第八周学习笔记

李日盛

笔记

让垃圾分类开发“极快致简”的好物件,零基础的开发小白也能轻松驾驭它!

华为云开发者联盟

数据 分类

华为云亮相QCon2020深圳站,带你体会大厂的云原生玩法与秘诀

华为云开发者联盟

专家 华为云 深圳

多国探路数字货币

CECBC

数字货币

分布式事务框架 seata-golang 通信模型详解

阿里巴巴云原生

数据库 微服务 云原生 Go 语言

海阔天空的游戏出海,HMS生态提供的风帆与通路

脑极体

跨专业零基础校招拿到网易18K*13薪Java岗offer全过程复盘总结

Java架构师迁哥

Redis为什么用跳表而不用平衡树?

Java架构师迁哥

架构师训练营第三周课后作业

万有引力

我哭了!Centos6停止更新只能切换7,哪些习惯也需要切换

小Q

Java Linux centos 学习 面试

开除AI伦理学家,谷歌如何从“不作恶”到“不宽容”?

脑极体

什么是802.11ax(Wi-Fi 6)

Serverless 如何落地?揭秘阿里核心业务大规模落地实现

阿里巴巴云原生

阿里巴巴 阿里云 Serverless 开发者 云原生

云上的移动性能测试平台

移动研发平台EMAS

阿里云 测试 移动研发平台

我哭了!Centos6停止更新只能切换7,哪些习惯也需要切换

996小迁

Java 架构 面试 Centos6

Spark-submit执行流程,了解一下

华为云开发者联盟

spark 技术 流程

官方活动 | 盘点2020有奖征文

InfoQ写作社区官方

盘点2020 热门活动

训练营第八周作业

大脸猫

极客大学架构师训练营

最简单的Go Dockerfile编写姿势,没有之一!

万俊峰Kevin

Docker Dockerfile Go 语言

装机必备:借用IDM实现百度云高速下载

懒得勤快

LeetCode题解:515. 在每个树行中找最大值,DFS,JavaScript,详细注释

Lee Chen

算法 大前端 LeetCode

滴滴DoKit-功能介绍之文件同步助手

工具 文件 DoKit

训练营第八周总结

大脸猫

极客大学架构师训练营

《迅雷链精品课》第十二课:PoW共识算法

迅雷链

区块链

区块链加速产业革命,打造畜禽养殖业发展新途径

CECBC

养殖业

【得物技术】MySQL多表关联同步到ES的实践

得物技术

MySQL 原理 配置 ES 多表join

Singleton手绘

raox

极客大学架构师训练营

使用 spark-submit 提交用户应用程序_语言 & 开发_亚马逊云科技 (Amazon Web Services)_InfoQ精选文章