使用 spark-submit 提交用户应用程序

阅读数:18 2019 年 12 月 11 日 15:38

使用 spark-submit 提交用户应用程序

Francisco Oliveira AWS _ 专业服务团队的顾问 _ 客户开始大数据之旅时,通常会需要一些关于如何将用户应用程序提交到
Amazon EMR
Spark 方面的指导。例如,客户会需要以下方面的指导:如何设定可用于其应用程序的内存和计算资源的大小,以及适用于其使用案例的最佳资源分配模型。在本文中,我将介绍如何通过设置
spark-submit 标志来控制提交到在 EMR 上运行的 Spark 应用程序可用的内存和计算资源。我还会探讨在哪些情况下使用 maximumResourceAllocation 配置选项和 executor 执行程序的动态资源分配。

Spark 执行模型

从较高维度看,每个 Spark 应用程序都有一个 driver 驱动程序,它以任务形式为集群多个节点上运行的 executor 执行程序分配工作。

使用 spark-submit 提交用户应用程序

Driver 驱动程序是一种应用程序代码,用于定义应用于数据集的转换和动作。driver 驱动程序的核心是实例化的 SparkContext 类的对象。该对象允许 driver 驱动程序获取到集群的连接、请求资源、将应用程序操作拆分为任务,以及在 executor 执行程序中安排和启动任务。

Executor 执行程序不仅执行驱动 driver 程序发送的任务,还会在本地存储数据。随着 executor 执行程序不断创建和销毁(请参阅后文的“启用 executor 执行程序的动态资源分配”部分),它们通过 driver 驱动程序注册和注销。Driver 驱动程序和 executor 执行程序直接进行通信。

要执行应用程序,driver 驱动程序会组织要在作业中完成的工作。每个作业分为多个阶段,每个阶段由一组并行运行的独立任务组成。任务是 Spark 中的最小工作单元,用于在不同的分区执行相同的代码。

Spark 编程模型

弹性分布式数据集 (RDD) 是 Spark 中的一个重要抽象层。这一抽象层是执行内存中计算的关键。RDD 是分布在集群节点之间的只读和不可变数据分区集合。Spark 中的分区允许并行执行数据子集。Spark 应用程序会创建 RDD 并对 RDD 执行操作。尽管 Spark 会自动对 RDD 进行分区,但您也可以自己设置分区数。

RDD 支持两种类型的操作:转换(transform)和动作 (action)。转换是生成新 RDD 的操作,而动作是在数据集上运行转换后,将数据写入外部存储或将值返回 driver 驱动程序的操作。常见的转换包括按照键进行筛选、排序和分组。常见的动作包括收集任务结果并将其发送给 driver 驱动程序、保存 RDD 或计算 RDD 中的元素数。

spark-submit

在集群上启动应用程序的常用方法是使用 spark-submit 脚本。该脚本提供了几个标志,可让您控制应用程序使用的资源。

设置 spark-submit 标志是向在 driver 驱动程序中实例化的 SparkContext 对象动态提供配置的一种方法。当您创建集群和在应用程序中进行硬编码(但不建议这么做)时,spark-submit 还可以读取您使用 EMR 配置选项设置的 conf/spark-defaults.conf) 文件中设置的配置值 _。_ 更改 conf/spark-defaults.conf 的另一种方法是使用 –conf prop = value 标志。我将介绍 spark-submit 标志和在 spark-defaults.conf 文件中使用的属性名称以及 –conf 标志。

在 EMR 上运行的 Spark 应用程序

提交到 EMR 上的 Spark 的任何应用程序都会在 YARN 上运行,且每个 Spark executor 执行程序都会作为 YARN 容器运行。在 YARN 上运行时,driver 驱动程序可以在集群中的某个 YARN 容器中运行(集群模式),也可以在 spark-submit 进程中本地运行(客户端模式)。

在集群模式下运行时,driver 驱动程序在 ApplicationMaster 上运行,ApplicationMaster 根据应用程序所需资源向 YARN ResourceManager 提交 YARN 容器请求。一个简化的和高维度的应用程序提交流程示意图如下所示。

使用 spark-submit 提交用户应用程序

在客户端模式下运行时,driver 驱动程序在 ApplicationMaster 外部运行,它运行在提交应用程序的机器上 的 _spark-submit_ 脚本进程中。

使用 spark-submit 提交用户应用程序

设置driver 驱动程序位置

借助 spark-submit,可以使用 –deploy-mode 标志选择 driver 驱动程序的位置。

如果希望进行调试并快速查看应用程序的输出时,可以选择以客户端模式提交应用程序。对于生产环境的应用程序,最佳实践是在集群模式下运行应用程序。此模式可确保在应用程序执行期间 driver 驱动程序始终可用。但是,如果您使用客户端模式,并且是在 EMR 集群外部(例如,在本地的笔记本电脑上)提交应用程序,需要注意 driver 驱动程序是在 EMR 集群外部运行,driver 驱动程序与 executor 执行程序之间的通信延迟会增加。

设置driver 驱动程序资源

Driver 驱动程序的大小取决于它执行的计算量,以及它从 executor 执行程序收集的数据量。在集群模式下运行 driver 驱动程序时,spark-submit 为您提供了相关选项,可控制 driver 驱动程序使用的核心数 (–driver-cores) 和内存 (–driver-memory)。在客户端模式下,driver 驱动程序内存的默认值为 1024MB 和 1 个核心。

设置executor 核心数和执行程序数

Executor 执行程序核心数 _(__–executor-cores_ 或 spark.executor.cores)确定了每个 executor 执行程序可以并行执行的任务数。最佳实践是:为操作系统保留 1 个核心,为每个 executor 执行程序保留大约 4-5 个核心。请求的核心数受配置属性 yarn.nodemanager.resource.cpu-vcores 限制,该属性可以控制在一个节点中运行的所有 YARN 容器可用的核心数,该属性可以在 yarn-site.xml 文件中设置。

可以使用以下公式计算每个节点的 executor 执行程序数:

每个节点的 __executor 执行程序数 = (节点的核心数 为操作系统保留的 1 个核心)__/ 每个 __executor 执行程序的任务数

Spark 作业的 executor 执行程序 _(__—_num-executorsspark.executor.instances)总数为:

executor 执行程序总数 = 每个节点的 executor 执行程序数 * 实例数 – 1。

设置executor 执行程序的内存

每个 executor 执行程序容器的内存空间都分为两个主要区域:Spark executor 执行程序内存和 overhead 内存。

使用 spark-submit 提交用户应用程序

请注意:可以分配给 executor 执行程序容器的最大内存取决于 yarn-site.xml 中可用的 yarn.nodemanager.resource.memory-mb 属性。executor 执行程序内存(–executor-memoryspark.executor.memory)确定了每个 executor 执行程序进程可以使用的内存量。overhead 内存 (spark.yarn.executor.memoryOverHead) 是堆外内存,会自动添加到 executor 执行程序内存,它的默认值是 executorMemory * 0.10

Executor 执行程序内存将用于存储和执行目的堆的各个部分进行了统一。如果超出使用量,这两部分可以彼此借用空间。相关属性是 spark.memory.fractionspark.memory.storageFraction。有关更多信息,请参阅 Spark 1.6 中的统一内存管理白皮书。

可以使用以下公式计算每个 executor 执行程序的内存:

每个 executor 执行程序的内存 = 节点上最大容器大小 / 每个节点的 executor 执行程序数

spark-submit 示例

为了向您展示如何设置之前介绍的标志,我会提交一个 wordcount 示例应用程序,然后使用 Spark history server 获取执行的视图。

首先,以 EMR 步骤的形式,向现有集群提交一个修改版的 word count 示例应用程序。代码如下所示:

Bash

复制代码
from __future__ import print_function
from pyspark import SparkContext
import sys
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: wordcount ", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="WordCount")
text_file = sc.textFile(sys.argv[1])
counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile(sys.argv[2])
sc.stop()

集群具有 6 个 m3.2xlarge 核心实例和 1 个主实例,每个实例有 8 个 vCPU 和 30GB 内存 _。_此实例类型 yarn.nodemanager.resource.memory-mb 默认值是 23GB。

根据上述介绍的公式,spark-submit 命令如下所示:

Bash

复制代码
spark-submit --deploy-mode cluster --master yarn --num-executors 5 --executor-cores 5 --executor-memory 20g –conf spark.yarn.submit.waitAppCompletion=false wordcount.py s3://inputbucket/input.txt s3://outputbucket/

通过以下命令以 EMR 步骤的形式提交应用程序:

Bash

复制代码
aws emr add-steps --cluster-id j-xxxxx --steps Type=spark,Name=SparkWordCountApp,Args=[--deploy-mode,cluster,--master,yarn,--conf,spark.yarn.submit.waitAppCompletion=false,--num-executors,5,--executor-cores,5,--executor-memory,20g,s3://codelocation/wordcount.py,s3://inputbucket/input.txt,s3://outputbucket/],ActionOnFailure=CONTINUE

请注意:还可通过步骤定义设置属性 spark.yarn.submit.waitAppCompletion。当此属性设置为 false 时,客户端提交应用程序然后就可以退出,不需要等待应用程序完成。此设置可让您提交多个应用程序由集群同时执行,但是它仅在集群模式下可用。

使用 –driver-memory–driver-cores 的默认值,因为示例应用程序将直接写入 Amazon S3,并且 driver 驱动程序没有从 executor 执行程序接收任何数据。

启用 executor**** 执行程序的动态资源分配

YARN 上的 Spark 可以动态扩展和缩减 executor 执行程序的数量。该功能非常有用,在同时处理多个应用程序时可以释放空闲 executor 执行程序,让某个应用程序可以按需请求其他 executor 执行程序。

要启用该功能,请参阅 EMR 文档,了解相关步骤。

Spark 具有以下属性,可为动态分配机制提供精细控制:

  • executor 执行程序的初始数量 (spark.dynamicAllocation.initalExecutors)
  • 应用程序使用的最小 executor 执行程序数 (spark.dynamicAllocation.minExecutors)
  • 可以请求的最大 executor 执行程序数 (spark.dynamicAllocation.maxExecutors)
  • 删除空闲 executor 执行程序的时间 (sparkdynamicAllocation.executorIdleTime)
  • 请求新 executor 执行程序处理等待任务的时间(spark.dynamicAllocation.schedulerBacklogTimeoutspark.dynamicAllocation.sustainedSchedulerBacklogTimeout

使用最大资源分配自动配置 ****executor 执行程序

EMR 提供了一个能够自动配置这个属性的选项,以实现整个集群资源利用率最大化。如果集群一次只处理一个应用程序,此配置选项会非常有用。如果您希望同时运行多个应用程序,请不要使用该选项。

要启用该配置选项,请参阅 EMR 文档,了解相关步骤。

在集群创建期间设置此配置选项后,EMR 会自动更新 spark-defaults.conf 文件,来控制 executor 执行程序计算和内存资源的属性,如下所示:

  • spark.executor.memory = (yarn.scheduler.maximum-allocation-mb – 1g) -spark.yarn.executor.memoryOverhead
  • spark.executor.instances = [将此设置为核心节点的初始数量加上任务节点数]
  • spark.executor.cores = yarn.nodemanager.resource.cpu-vcores
  • spark.default.parallelism = spark.executor.instances * spark.executor.cores

查看并行任务视图

您可以通过 EMR 控制台访问 Spark history server UI。它提供了有关应用程序性能和行为的有用信息。您可以查看计划的阶段和任务列表、检索有关 executor 执行程序的信息、获取内存使用情况摘要,以及检索提交到 SparkContext 对象的配置。在本文中,我将介绍如何将以上示例中使用的 spark-submit 脚本设置的标志转换为可视化工具。

要访问 Spark history server,请启用 SOCKS 代理,然后在连接下选择 Spark History Server

使用 spark-submit 提交用户应用程序

对于Completed applications,请选择唯一可选的条目,然后展开事件时间轴,如下所示。Spark 根据 _–num-executors_ 标志中定义的要求添加了 5 个 executor 执行程序。

使用 spark-submit 提交用户应用程序

然后,导航到阶段详细信息,您就可以看到每个 executor 执行程序并行运行的任务数了,该值与 –executor-cores 标志的值相同。

使用 spark-submit 提交用户应用程序

小结

本文向您介绍了如何使用 spark-submit 标志向集群提交应用程序。我们详细阐述了如何控制 driver 驱动程序运行位置、如何设置分配给 driver 驱动程序和 executor 执行程序的资源以及 executor 执行程序数量。此外,我们还介绍了在哪些情况下需使用 maximumResourceAllocation 配置选项和 executor 执行程序的动态资源分配。

如果您有任何问题或建议,欢迎留言。

—————————-

相关内容

使用支持 S3 的笔记本通过 Amazon EMR 上的 Spark 运行外部 Zeppelin 实例

使用 spark-submit 提交用户应用程序

想要了解有关大数据或流数据的更多信息? 请查看我们的大数据流数据教学页。

本文转载自 AWS 技术博客。

原文链接: https://amazonaws-china.com/cn/blogs/china/submitting-user-applications-with-spark-submit/

欲了解 AWS 的更多信息,请访问【AWS 技术专区】

评论

发布