写点什么

使用 spark-submit 提交用户应用程序

  • 2019-12-11
  • 本文字数:5027 字

    阅读完需:约 16 分钟

使用 spark-submit 提交用户应用程序

Francisco Oliveira AWS _专业服务团队的顾问_客户开始大数据之旅时,通常会需要一些关于如何将用户应用程序提交到


Amazon EMR


Spark 方面的指导。例如,客户会需要以下方面的指导:如何设定可用于其应用程序的内存和计算资源的大小,以及适用于其使用案例的最佳资源分配模型。在本文中,我将介绍如何通过设置


spark-submit 标志来控制提交到在 EMR 上运行的 Spark 应用程序可用的内存和计算资源。我还会探讨在哪些情况下使用 maximumResourceAllocation 配置选项和 executor 执行程序的动态资源分配。

Spark 执行模型

从较高维度看,每个 Spark 应用程序都有一个 driver 驱动程序,它以任务形式为集群多个节点上运行的 executor 执行程序分配工作。



Driver 驱动程序是一种应用程序代码,用于定义应用于数据集的转换和动作。driver 驱动程序的核心是实例化的 SparkContext 类的对象。该对象允许 driver 驱动程序获取到集群的连接、请求资源、将应用程序操作拆分为任务,以及在 executor 执行程序中安排和启动任务。


Executor 执行程序不仅执行驱动 driver 程序发送的任务,还会在本地存储数据。随着 executor 执行程序不断创建和销毁(请参阅后文的“启用 executor 执行程序的动态资源分配”部分),它们通过 driver 驱动程序注册和注销。Driver 驱动程序和 executor 执行程序直接进行通信。


要执行应用程序,driver 驱动程序会组织要在作业中完成的工作。每个作业分为多个阶段,每个阶段由一组并行运行的独立任务组成。任务是 Spark 中的最小工作单元,用于在不同的分区执行相同的代码。

Spark 编程模型

弹性分布式数据集 (RDD) 是 Spark 中的一个重要抽象层。这一抽象层是执行内存中计算的关键。RDD 是分布在集群节点之间的只读和不可变数据分区集合。Spark 中的分区允许并行执行数据子集。Spark 应用程序会创建 RDD 并对 RDD 执行操作。尽管 Spark 会自动对 RDD 进行分区,但您也可以自己设置分区数。


RDD 支持两种类型的操作:转换(transform)和动作(action)。转换是生成新 RDD 的操作,而动作是在数据集上运行转换后,将数据写入外部存储或将值返回 driver 驱动程序的操作。常见的转换包括按照键进行筛选、排序和分组。常见的动作包括收集任务结果并将其发送给 driver 驱动程序、保存 RDD 或计算 RDD 中的元素数。

spark-submit

在集群上启动应用程序的常用方法是使用 spark-submit 脚本。该脚本提供了几个标志,可让您控制应用程序使用的资源。


设置 spark-submit 标志是向在 driver 驱动程序中实例化的 SparkContext 对象动态提供配置的一种方法。当您创建集群和在应用程序中进行硬编码(但不建议这么做)时,spark-submit 还可以读取您使用 EMR 配置选项设置的 conf/spark-defaults.conf) 文件中设置的配置值_。_更改 conf/spark-defaults.conf 的另一种方法是使用 –conf prop = value 标志。我将介绍 spark-submit 标志和在 spark-defaults.conf 文件中使用的属性名称以及 –conf 标志。

在 EMR 上运行的 Spark 应用程序

提交到 EMR 上的 Spark 的任何应用程序都会在 YARN 上运行,且每个 Spark executor 执行程序都会作为 YARN 容器运行。在 YARN 上运行时,driver 驱动程序可以在集群中的某个 YARN 容器中运行(集群模式),也可以在 spark-submit 进程中本地运行(客户端模式)。


在集群模式下运行时,driver 驱动程序在 ApplicationMaster 上运行,ApplicationMaster 根据应用程序所需资源向 YARN ResourceManager 提交 YARN 容器请求。一个简化的和高维度的应用程序提交流程示意图如下所示。



在客户端模式下运行时,driver 驱动程序在 ApplicationMaster 外部运行,它运行在提交应用程序的机器上 的_spark-submit_ 脚本进程中。


设置 driver 驱动程序位置

借助 spark-submit,可以使用 –deploy-mode 标志选择 driver 驱动程序的位置。


如果希望进行调试并快速查看应用程序的输出时,可以选择以客户端模式提交应用程序。对于生产环境的应用程序,最佳实践是在集群模式下运行应用程序。此模式可确保在应用程序执行期间 driver 驱动程序始终可用。但是,如果您使用客户端模式,并且是在 EMR 集群外部(例如,在本地的笔记本电脑上)提交应用程序,需要注意 driver 驱动程序是在 EMR 集群外部运行,driver 驱动程序与 executor 执行程序之间的通信延迟会增加。

设置 driver 驱动程序资源

Driver 驱动程序的大小取决于它执行的计算量,以及它从 executor 执行程序收集的数据量。在集群模式下运行 driver 驱动程序时,spark-submit 为您提供了相关选项,可控制 driver 驱动程序使用的核心数 (–driver-cores) 和内存 (–driver-memory)。在客户端模式下,driver 驱动程序内存的默认值为 1024MB 和 1 个核心。

设置 executor 核心数和执行程序数

Executor 执行程序核心数_(__–executor-cores_ 或 spark.executor.cores)确定了每个 executor 执行程序可以并行执行的任务数。最佳实践是:为操作系统保留 1 个核心,为每个 executor 执行程序保留大约 4-5 个核心。请求的核心数受配置属性 yarn.nodemanager.resource.cpu-vcores 限制,该属性可以控制在一个节点中运行的所有 YARN 容器可用的核心数,该属性可以在 yarn-site.xml 文件中设置。


可以使用以下公式计算每个节点的 executor 执行程序数:


每个节点的__executor 执行程序数 = (节点的核心数 为操作系统保留的 1 个核心)__/ 每个__executor 执行程序的任务数


Spark 作业的 executor 执行程序_(__—_num-executorsspark.executor.instances)总数为:


executor 执行程序总数 = 每个节点的 executor 执行程序数 * 实例数 – 1。

设置 executor 执行程序的内存

每个 executor 执行程序容器的内存空间都分为两个主要区域:Spark executor 执行程序内存和 overhead 内存。



请注意:可以分配给 executor 执行程序容器的最大内存取决于 yarn-site.xml 中可用的 yarn.nodemanager.resource.memory-mb 属性。executor 执行程序内存(–executor-memoryspark.executor.memory)确定了每个 executor 执行程序进程可以使用的内存量。overhead 内存 (spark.yarn.executor.memoryOverHead) 是堆外内存,会自动添加到 executor 执行程序内存,它的默认值是 executorMemory * 0.10


Executor 执行程序内存将用于存储和执行目的堆的各个部分进行了统一。如果超出使用量,这两部分可以彼此借用空间。相关属性是 spark.memory.fractionspark.memory.storageFraction。有关更多信息,请参阅 Spark 1.6 中的统一内存管理白皮书。


可以使用以下公式计算每个 executor 执行程序的内存:


每个 executor 执行程序的内存 = 节点上最大容器大小 / 每个节点的 executor 执行程序数

spark-submit 示例

为了向您展示如何设置之前介绍的标志,我会提交一个 wordcount 示例应用程序,然后使用 Spark history server 获取执行的视图。


首先,以 EMR 步骤的形式,向现有集群提交一个修改版的 word count 示例应用程序。代码如下所示:


Bash


from __future__ import print_functionfrom pyspark import SparkContextimport sysif __name__ == "__main__":    if len(sys.argv) != 3:        print("Usage: wordcount  ", file=sys.stderr)        exit(-1)    sc = SparkContext(appName="WordCount")    text_file = sc.textFile(sys.argv[1])    counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)    counts.saveAsTextFile(sys.argv[2])    sc.stop()
复制代码


集群具有 6 个 m3.2xlarge 核心实例和 1 个主实例,每个实例有 8 个 vCPU 和 30GB 内存_。_此实例类型 yarn.nodemanager.resource.memory-mb 默认值是 23GB。


根据上述介绍的公式,spark-submit 命令如下所示:


Bash


spark-submit --deploy-mode cluster --master yarn --num-executors 5 --executor-cores 5 --executor-memory 20g –conf spark.yarn.submit.waitAppCompletion=false wordcount.py s3://inputbucket/input.txt s3://outputbucket/
复制代码


通过以下命令以 EMR 步骤的形式提交应用程序:


Bash


aws emr add-steps --cluster-id j-xxxxx --steps Type=spark,Name=SparkWordCountApp,Args=[--deploy-mode,cluster,--master,yarn,--conf,spark.yarn.submit.waitAppCompletion=false,--num-executors,5,--executor-cores,5,--executor-memory,20g,s3://codelocation/wordcount.py,s3://inputbucket/input.txt,s3://outputbucket/],ActionOnFailure=CONTINUE
复制代码


请注意:还可通过步骤定义设置属性 spark.yarn.submit.waitAppCompletion。当此属性设置为 false 时,客户端提交应用程序然后就可以退出,不需要等待应用程序完成。此设置可让您提交多个应用程序由集群同时执行,但是它仅在集群模式下可用。


使用 –driver-memory–driver-cores 的默认值,因为示例应用程序将直接写入 Amazon S3,并且 driver 驱动程序没有从 executor 执行程序接收任何数据。

启用 executor****执行程序的动态资源分配

YARN 上的 Spark 可以动态扩展和缩减 executor 执行程序的数量。该功能非常有用,在同时处理多个应用程序时可以释放空闲 executor 执行程序,让某个应用程序可以按需请求其他 executor 执行程序。


要启用该功能,请参阅 EMR 文档,了解相关步骤。


Spark 具有以下属性,可为动态分配机制提供精细控制:


  • executor 执行程序的初始数量 (spark.dynamicAllocation.initalExecutors)

  • 应用程序使用的最小 executor 执行程序数 (spark.dynamicAllocation.minExecutors)

  • 可以请求的最大 executor 执行程序数 (spark.dynamicAllocation.maxExecutors)

  • 删除空闲 executor 执行程序的时间 (sparkdynamicAllocation.executorIdleTime)

  • 请求新 executor 执行程序处理等待任务的时间(spark.dynamicAllocation.schedulerBacklogTimeoutspark.dynamicAllocation.sustainedSchedulerBacklogTimeout

使用最大资源分配自动配置****executor 执行程序

EMR 提供了一个能够自动配置这个属性的选项,以实现整个集群资源利用率最大化。如果集群一次只处理一个应用程序,此配置选项会非常有用。如果您希望同时运行多个应用程序,请不要使用该选项。


要启用该配置选项,请参阅 EMR 文档,了解相关步骤。


在集群创建期间设置此配置选项后,EMR 会自动更新 spark-defaults.conf 文件,来控制 executor 执行程序计算和内存资源的属性,如下所示:


  • spark.executor.memory = (yarn.scheduler.maximum-allocation-mb – 1g) -spark.yarn.executor.memoryOverhead

  • spark.executor.instances = [将此设置为核心节点的初始数量加上任务节点数]

  • spark.executor.cores = yarn.nodemanager.resource.cpu-vcores

  • spark.default.parallelism = spark.executor.instances * spark.executor.cores

查看并行任务视图

您可以通过 EMR 控制台访问 Spark history server UI。它提供了有关应用程序性能和行为的有用信息。您可以查看计划的阶段和任务列表、检索有关 executor 执行程序的信息、获取内存使用情况摘要,以及检索提交到 SparkContext 对象的配置。在本文中,我将介绍如何将以上示例中使用的 spark-submit 脚本设置的标志转换为可视化工具。


要访问 Spark history server,请启用 SOCKS 代理,然后在连接下选择 Spark History Server



对于 Completed applications,请选择唯一可选的条目,然后展开事件时间轴,如下所示。Spark 根据_–num-executors_ 标志中定义的要求添加了 5 个 executor 执行程序。



然后,导航到阶段详细信息,您就可以看到每个 executor 执行程序并行运行的任务数了,该值与 –executor-cores 标志的值相同。


小结

本文向您介绍了如何使用 spark-submit 标志向集群提交应用程序。我们详细阐述了如何控制 driver 驱动程序运行位置、如何设置分配给 driver 驱动程序和 executor 执行程序的资源以及 executor 执行程序数量。此外,我们还介绍了在哪些情况下需使用 maximumResourceAllocation 配置选项和 executor 执行程序的动态资源分配。


如果您有任何问题或建议,欢迎留言。


—————————-


相关内容


使用支持 S3 的笔记本通过 Amazon EMR 上的 Spark 运行外部 Zeppelin 实例



想要了解有关大数据或流数据的更多信息? 请查看我们的大数据流数据教学页。


本文转载自 AWS 技术博客。


原文链接:https://amazonaws-china.com/cn/blogs/china/submitting-user-applications-with-spark-submit/


2019-12-11 15:381227

评论

发布
暂无评论
发现更多内容

FinClip 与 uniapp:轻应用平台与前端开发框架

FinClip

为什么都是技术合伙人被踢出局?

方云AI研发绩效

团队管理 研发管理 CTO SaaS

企业怎样有效地进行文档管理

小炮

企业 文档管理

中国设计师品牌Le Arome乐欧幕靠什么做到爆款10分钟售罄?

科技大数据

真的是最全的一致性hash环讲解了

Java工程师

Java 架构 分布式 算法 hash

如何在 Zadig 上玩转自动化测试,为业务质量保障提供最大价值

Zadig

云原生 软件测试 CI/CD 软件交付

浅谈Java虚拟机(HotSpot)的内存回收相关细节

CRMEB

机器人流程自动化评估体系全面助力垂直行业智能化转型

王吉伟频道

RPA 机器人流程自动化 信通院

Flink 在众安保险金融业务的应用

Apache Flink

大数据 flink 编程 流计算 实时计算

3月月更中奖名单新鲜出炉!快来看有没有你呀!

InfoQ写作社区官方

3月月更 热门活动

低代码平台常见的安全隐患,J2PaaS低代码平台如何解决?

J2PaaS低代码平台

低代码开发 低代码平台 企业级低代码平台 J2PaaS低代码平台

玩转LiteOS组件:Openexif

华为云开发者联盟

LiteOS Huawei LiteOS Openexif Exif JPEG文件

接口自动化的关键思路和解决方案,本文全讲清楚了

Liam

Jmeter Postman API 测试工具 接口自动化测试

想减少代码量,快设置一个有感知的 Aware Spring Bean

华为云开发者联盟

spring bean Aware 接口

龙蜥开发者说:学无止境的 Linux ,以及我的第一个定制版本发布之路 | 第4期

OpenAnolis小助手

Linux 龙蜥社区 开发者说 宝贵经历

Pulsar—新一代云原生消息平台

中原银行

分布式 pulsar 中原银行 分布式消息

能让程序员涨薪5K的Hystrix核心工作原理,你真的不打算学吗?

Java工程师

Java 程序员 互联网 微服务 科技

结合实际案例谈谈项目管理经验

云智慧AIOps社区

学习 项目管理 pmp 软考 沟通技巧

jackson学习之三:常用API操作

程序员欣宸

4月月更

行云管家荣膺《中国网络安全行业全景图(第九版)》收录

行云管家

网络安全 行云管家 安全牛

【IT运维】国内优秀的IT运维企业有哪些?

行云管家

云计算 运维 网络运维 IT运维

Zadig 基于 OPA 实现 RBAC 和 ABAC 权限管理技术方案详解

Zadig

云原生 CI/CD 软件交付

专访丨用友网络副总裁邹达:如何应对创新型数字化挑战?

YonBuilder低代码开发平台

EMAS隐私合规检测专项服务,从确保形式合规及实质合规规避风险

移动研发平台EMAS

阿里云 开发 数据安全 移动开发 隐私合规

政企上云网络适配复杂,看华为云Stack有妙招

华为云开发者联盟

数据中心 云网络 华为云Stack 政企上云 L3GW服务

为什么企业对私有化部署IM如此青睐有加?

WorkPlus

TASKCTL产品安装常见问题

敏捷调度TASKCTL

分布式 kettle ETL ETL任务 调度任务

在APICloud开发平台使用友盟统计功能教程

YonBuilder低代码开发平台

APP开发 APICloud 友盟

如何以卫语句取代嵌套条件表达式

华为云开发者联盟

条件表达式 卫语句 嵌套条件表达式 代码结构

领域驱动设计(DDD)理论与方法

Java工程师

Java 程序员 互联网 DDD 架构设计

腾讯WeTest微信小程序上线啦!产品资讯一手掌握!

WeTest

使用 spark-submit 提交用户应用程序_语言 & 开发_亚马逊云科技 (Amazon Web Services)_InfoQ精选文章