“AI 技术+人才”如何成为企业增长新引擎?戳此了解>>> 了解详情
写点什么

使用 spark-submit 提交用户应用程序

  • 2019-12-11
  • 本文字数:5027 字

    阅读完需:约 16 分钟

使用 spark-submit 提交用户应用程序

Francisco Oliveira AWS _专业服务团队的顾问_客户开始大数据之旅时,通常会需要一些关于如何将用户应用程序提交到


Amazon EMR


Spark 方面的指导。例如,客户会需要以下方面的指导:如何设定可用于其应用程序的内存和计算资源的大小,以及适用于其使用案例的最佳资源分配模型。在本文中,我将介绍如何通过设置


spark-submit 标志来控制提交到在 EMR 上运行的 Spark 应用程序可用的内存和计算资源。我还会探讨在哪些情况下使用 maximumResourceAllocation 配置选项和 executor 执行程序的动态资源分配。

Spark 执行模型

从较高维度看,每个 Spark 应用程序都有一个 driver 驱动程序,它以任务形式为集群多个节点上运行的 executor 执行程序分配工作。



Driver 驱动程序是一种应用程序代码,用于定义应用于数据集的转换和动作。driver 驱动程序的核心是实例化的 SparkContext 类的对象。该对象允许 driver 驱动程序获取到集群的连接、请求资源、将应用程序操作拆分为任务,以及在 executor 执行程序中安排和启动任务。


Executor 执行程序不仅执行驱动 driver 程序发送的任务,还会在本地存储数据。随着 executor 执行程序不断创建和销毁(请参阅后文的“启用 executor 执行程序的动态资源分配”部分),它们通过 driver 驱动程序注册和注销。Driver 驱动程序和 executor 执行程序直接进行通信。


要执行应用程序,driver 驱动程序会组织要在作业中完成的工作。每个作业分为多个阶段,每个阶段由一组并行运行的独立任务组成。任务是 Spark 中的最小工作单元,用于在不同的分区执行相同的代码。

Spark 编程模型

弹性分布式数据集 (RDD) 是 Spark 中的一个重要抽象层。这一抽象层是执行内存中计算的关键。RDD 是分布在集群节点之间的只读和不可变数据分区集合。Spark 中的分区允许并行执行数据子集。Spark 应用程序会创建 RDD 并对 RDD 执行操作。尽管 Spark 会自动对 RDD 进行分区,但您也可以自己设置分区数。


RDD 支持两种类型的操作:转换(transform)和动作(action)。转换是生成新 RDD 的操作,而动作是在数据集上运行转换后,将数据写入外部存储或将值返回 driver 驱动程序的操作。常见的转换包括按照键进行筛选、排序和分组。常见的动作包括收集任务结果并将其发送给 driver 驱动程序、保存 RDD 或计算 RDD 中的元素数。

spark-submit

在集群上启动应用程序的常用方法是使用 spark-submit 脚本。该脚本提供了几个标志,可让您控制应用程序使用的资源。


设置 spark-submit 标志是向在 driver 驱动程序中实例化的 SparkContext 对象动态提供配置的一种方法。当您创建集群和在应用程序中进行硬编码(但不建议这么做)时,spark-submit 还可以读取您使用 EMR 配置选项设置的 conf/spark-defaults.conf) 文件中设置的配置值_。_更改 conf/spark-defaults.conf 的另一种方法是使用 –conf prop = value 标志。我将介绍 spark-submit 标志和在 spark-defaults.conf 文件中使用的属性名称以及 –conf 标志。

在 EMR 上运行的 Spark 应用程序

提交到 EMR 上的 Spark 的任何应用程序都会在 YARN 上运行,且每个 Spark executor 执行程序都会作为 YARN 容器运行。在 YARN 上运行时,driver 驱动程序可以在集群中的某个 YARN 容器中运行(集群模式),也可以在 spark-submit 进程中本地运行(客户端模式)。


在集群模式下运行时,driver 驱动程序在 ApplicationMaster 上运行,ApplicationMaster 根据应用程序所需资源向 YARN ResourceManager 提交 YARN 容器请求。一个简化的和高维度的应用程序提交流程示意图如下所示。



在客户端模式下运行时,driver 驱动程序在 ApplicationMaster 外部运行,它运行在提交应用程序的机器上 的_spark-submit_ 脚本进程中。


设置 driver 驱动程序位置

借助 spark-submit,可以使用 –deploy-mode 标志选择 driver 驱动程序的位置。


如果希望进行调试并快速查看应用程序的输出时,可以选择以客户端模式提交应用程序。对于生产环境的应用程序,最佳实践是在集群模式下运行应用程序。此模式可确保在应用程序执行期间 driver 驱动程序始终可用。但是,如果您使用客户端模式,并且是在 EMR 集群外部(例如,在本地的笔记本电脑上)提交应用程序,需要注意 driver 驱动程序是在 EMR 集群外部运行,driver 驱动程序与 executor 执行程序之间的通信延迟会增加。

设置 driver 驱动程序资源

Driver 驱动程序的大小取决于它执行的计算量,以及它从 executor 执行程序收集的数据量。在集群模式下运行 driver 驱动程序时,spark-submit 为您提供了相关选项,可控制 driver 驱动程序使用的核心数 (–driver-cores) 和内存 (–driver-memory)。在客户端模式下,driver 驱动程序内存的默认值为 1024MB 和 1 个核心。

设置 executor 核心数和执行程序数

Executor 执行程序核心数_(__–executor-cores_ 或 spark.executor.cores)确定了每个 executor 执行程序可以并行执行的任务数。最佳实践是:为操作系统保留 1 个核心,为每个 executor 执行程序保留大约 4-5 个核心。请求的核心数受配置属性 yarn.nodemanager.resource.cpu-vcores 限制,该属性可以控制在一个节点中运行的所有 YARN 容器可用的核心数,该属性可以在 yarn-site.xml 文件中设置。


可以使用以下公式计算每个节点的 executor 执行程序数:


每个节点的__executor 执行程序数 = (节点的核心数 为操作系统保留的 1 个核心)__/ 每个__executor 执行程序的任务数


Spark 作业的 executor 执行程序_(__—_num-executorsspark.executor.instances)总数为:


executor 执行程序总数 = 每个节点的 executor 执行程序数 * 实例数 – 1。

设置 executor 执行程序的内存

每个 executor 执行程序容器的内存空间都分为两个主要区域:Spark executor 执行程序内存和 overhead 内存。



请注意:可以分配给 executor 执行程序容器的最大内存取决于 yarn-site.xml 中可用的 yarn.nodemanager.resource.memory-mb 属性。executor 执行程序内存(–executor-memoryspark.executor.memory)确定了每个 executor 执行程序进程可以使用的内存量。overhead 内存 (spark.yarn.executor.memoryOverHead) 是堆外内存,会自动添加到 executor 执行程序内存,它的默认值是 executorMemory * 0.10


Executor 执行程序内存将用于存储和执行目的堆的各个部分进行了统一。如果超出使用量,这两部分可以彼此借用空间。相关属性是 spark.memory.fractionspark.memory.storageFraction。有关更多信息,请参阅 Spark 1.6 中的统一内存管理白皮书。


可以使用以下公式计算每个 executor 执行程序的内存:


每个 executor 执行程序的内存 = 节点上最大容器大小 / 每个节点的 executor 执行程序数

spark-submit 示例

为了向您展示如何设置之前介绍的标志,我会提交一个 wordcount 示例应用程序,然后使用 Spark history server 获取执行的视图。


首先,以 EMR 步骤的形式,向现有集群提交一个修改版的 word count 示例应用程序。代码如下所示:


Bash


from __future__ import print_functionfrom pyspark import SparkContextimport sysif __name__ == "__main__":    if len(sys.argv) != 3:        print("Usage: wordcount  ", file=sys.stderr)        exit(-1)    sc = SparkContext(appName="WordCount")    text_file = sc.textFile(sys.argv[1])    counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)    counts.saveAsTextFile(sys.argv[2])    sc.stop()
复制代码


集群具有 6 个 m3.2xlarge 核心实例和 1 个主实例,每个实例有 8 个 vCPU 和 30GB 内存_。_此实例类型 yarn.nodemanager.resource.memory-mb 默认值是 23GB。


根据上述介绍的公式,spark-submit 命令如下所示:


Bash


spark-submit --deploy-mode cluster --master yarn --num-executors 5 --executor-cores 5 --executor-memory 20g –conf spark.yarn.submit.waitAppCompletion=false wordcount.py s3://inputbucket/input.txt s3://outputbucket/
复制代码


通过以下命令以 EMR 步骤的形式提交应用程序:


Bash


aws emr add-steps --cluster-id j-xxxxx --steps Type=spark,Name=SparkWordCountApp,Args=[--deploy-mode,cluster,--master,yarn,--conf,spark.yarn.submit.waitAppCompletion=false,--num-executors,5,--executor-cores,5,--executor-memory,20g,s3://codelocation/wordcount.py,s3://inputbucket/input.txt,s3://outputbucket/],ActionOnFailure=CONTINUE
复制代码


请注意:还可通过步骤定义设置属性 spark.yarn.submit.waitAppCompletion。当此属性设置为 false 时,客户端提交应用程序然后就可以退出,不需要等待应用程序完成。此设置可让您提交多个应用程序由集群同时执行,但是它仅在集群模式下可用。


使用 –driver-memory–driver-cores 的默认值,因为示例应用程序将直接写入 Amazon S3,并且 driver 驱动程序没有从 executor 执行程序接收任何数据。

启用 executor****执行程序的动态资源分配

YARN 上的 Spark 可以动态扩展和缩减 executor 执行程序的数量。该功能非常有用,在同时处理多个应用程序时可以释放空闲 executor 执行程序,让某个应用程序可以按需请求其他 executor 执行程序。


要启用该功能,请参阅 EMR 文档,了解相关步骤。


Spark 具有以下属性,可为动态分配机制提供精细控制:


  • executor 执行程序的初始数量 (spark.dynamicAllocation.initalExecutors)

  • 应用程序使用的最小 executor 执行程序数 (spark.dynamicAllocation.minExecutors)

  • 可以请求的最大 executor 执行程序数 (spark.dynamicAllocation.maxExecutors)

  • 删除空闲 executor 执行程序的时间 (sparkdynamicAllocation.executorIdleTime)

  • 请求新 executor 执行程序处理等待任务的时间(spark.dynamicAllocation.schedulerBacklogTimeoutspark.dynamicAllocation.sustainedSchedulerBacklogTimeout

使用最大资源分配自动配置****executor 执行程序

EMR 提供了一个能够自动配置这个属性的选项,以实现整个集群资源利用率最大化。如果集群一次只处理一个应用程序,此配置选项会非常有用。如果您希望同时运行多个应用程序,请不要使用该选项。


要启用该配置选项,请参阅 EMR 文档,了解相关步骤。


在集群创建期间设置此配置选项后,EMR 会自动更新 spark-defaults.conf 文件,来控制 executor 执行程序计算和内存资源的属性,如下所示:


  • spark.executor.memory = (yarn.scheduler.maximum-allocation-mb – 1g) -spark.yarn.executor.memoryOverhead

  • spark.executor.instances = [将此设置为核心节点的初始数量加上任务节点数]

  • spark.executor.cores = yarn.nodemanager.resource.cpu-vcores

  • spark.default.parallelism = spark.executor.instances * spark.executor.cores

查看并行任务视图

您可以通过 EMR 控制台访问 Spark history server UI。它提供了有关应用程序性能和行为的有用信息。您可以查看计划的阶段和任务列表、检索有关 executor 执行程序的信息、获取内存使用情况摘要,以及检索提交到 SparkContext 对象的配置。在本文中,我将介绍如何将以上示例中使用的 spark-submit 脚本设置的标志转换为可视化工具。


要访问 Spark history server,请启用 SOCKS 代理,然后在连接下选择 Spark History Server



对于 Completed applications,请选择唯一可选的条目,然后展开事件时间轴,如下所示。Spark 根据_–num-executors_ 标志中定义的要求添加了 5 个 executor 执行程序。



然后,导航到阶段详细信息,您就可以看到每个 executor 执行程序并行运行的任务数了,该值与 –executor-cores 标志的值相同。


小结

本文向您介绍了如何使用 spark-submit 标志向集群提交应用程序。我们详细阐述了如何控制 driver 驱动程序运行位置、如何设置分配给 driver 驱动程序和 executor 执行程序的资源以及 executor 执行程序数量。此外,我们还介绍了在哪些情况下需使用 maximumResourceAllocation 配置选项和 executor 执行程序的动态资源分配。


如果您有任何问题或建议,欢迎留言。


—————————-


相关内容


使用支持 S3 的笔记本通过 Amazon EMR 上的 Spark 运行外部 Zeppelin 实例



想要了解有关大数据或流数据的更多信息? 请查看我们的大数据流数据教学页。


本文转载自 AWS 技术博客。


原文链接:https://amazonaws-china.com/cn/blogs/china/submitting-user-applications-with-spark-submit/


2019-12-11 15:381042

评论

发布
暂无评论
发现更多内容

一文搞懂深度信念网络!DBN概念介绍与Pytorch实战

EquatorCoco

架构 网络 PyTorch

京东商品评论数据接口(JD.item_review)丨京东API接口

tbapi

京东商品评论数据接口 京东商品评价接口 京东商品评论API 京东商品评价API 京东评论API

多功能矢量图编辑器:Boxy SVG最新激活版

胖墩儿不胖y

Mac软件 矢量图编辑器 矢量图编辑

Markdown写作和笔记管理 MWeb Pro激活中文版

mac大玩家j

Mac软件推荐 写作软件

苹果Mac版交互式原型设计 Axure RP 8 汉化激活版

mac大玩家j

Mac软件 原型设计工具 交互原型设计

面向Java应用网络流的非侵入可观测指标采集联合方案 – Sermant & Gopher

华为云开源

微服务 Java、 sermant

开启安全功能 ES 集群就安全了吗?

极限实验室

console 集群

软件测试/测试开发|一文带你了解Python列表操作

霍格沃兹测试开发学社

AI数字人互动大屏技术的新趋势!

青否数字人

数字人

好用的视频下载和转换器:YT Saver 中文直装版

胖墩儿不胖y

视频处理 Mac软件 视频处理工具

几分钟时间,“数字人”诞生!

青否数字人

数字人

基于扁平化BOM的全业务应用领先实践,提升离散制造行业运营效率

用友BIP

智能制造

用友BIP全球司库助力陕西建工控股集团打造世界一流司库体系

用友BIP

全球司库

技术文档指南:版本说明、网站文案、FAQ、案例研究与内容优化

小万哥

程序人生 软件工程 后端开发 技术写作 文档指南

破解直播带货最大难题,青否数字人怎么做到的!

青否数字人

数字人

逻辑回归算法是什么呢?

小齐写代码

干货|EasyMR 基于 Kubernetes 应用的监控实践

袋鼠云数栈

大数据 Kubernetes 云原生 可观测性 Promtheus

【介绍篇】Supabase与Firebase的关系和区别

张文平

数据库 云服务 Baas Supabase firebase

营销创意素材如何秒级智能生成?即时创意白皮书来了!

京东科技开发者

实时数仓投放主备链路Diff测试工具落地实践

得物技术

AI 数据

玩转数据世界:跨工作空间的安全授权与高效查询

观测云

数据可视化 数据授权

JAVA开发工具Eclipse和MyEclipse

小魏写代码

【原理篇】Supabase应用开发为什么要配置RLS

张文平

权限 PgSQL Supabase 访问权限 Postgrest

Microsoft 365 (原office365) Mac版 v16.80正式破解版下载

南屿

Office Microsoft 365 office许可证 office365破解版

程序员会不会被人工智能取代?

ZA技术社区

程序员 #人工智能

【原理篇】Supabase关联查询:内联、外联及外键约束

张文平

外键 Function 关联查询 Supabase Postgrest

商智C店H5性能优化实战

京东科技开发者

前端

2024-01-03:用go语言,给你两个长度为 n 下标从 0 开始的整数数组 cost 和 time, 分别表示给 n 堵不同的墙刷油漆需要的开销和时间。你有两名油漆匠, 一位需要 付费 的油漆匠

福大大架构师每日一题

福大大架构师每日一题

2024你好!

鲸品堂

DAPP算力挖矿系统开发丨详情开发

l8l259l3365

软件测试/测试开发丨Python 列表

测试人

软件测试 测试开发

使用 spark-submit 提交用户应用程序_语言 & 开发_亚马逊云科技 (Amazon Web Services)_InfoQ精选文章