Spark on K8s :提升集群资源利用率的方法我也有

阅读数:431 2019 年 11 月 20 日 19:29

Spark on K8s :提升集群资源利用率的方法我也有

external-shuffle-service 是 Spark 里一个重要的特性,有了它后,executor 可以在不同的 stage 阶段动态改变数量,大大提升集群资源利用率。但是这个特性当前在 K8s 上并不能很好的运行。让我们来看看,在 K8s 上实现这个 external shuffle service 特性的最新进展吧。

如果你想在 Kubernetes 集群中运行 Spark 任务,那么你可能会对:如何在 K8s 上运行 external-shuffle-service 感兴趣。把 Driver 和 Executor 都当做容器,丢到 K8s 上(K8s 集群则把他们当做一般的容器,和其他业务类 app 一样对待),这种模式,可以使得集群资源池归一,避免 Spark 一个资源池,业务类(K8s)集群一个资源池。提升整体资源利用率,统一维护也降低运维成本。这也是 Spark 官方在 2.3 版本后为什么要支持 Spark on K8s 的主要驱动力。

external-shuffle-service 作用

如果想要 executor 数量可以动态变化,就需要依赖 external-shuffle-service 功能(因为在 K8s 集群中,容器启动关闭很方便。所以非常希望 executor 数量可以动态调整,提升资源利用率)。

原因是在 shuffle 过程中,一个 executor 会到另一个 executor 那里取数据。如果一个 executor 节点挂掉了,那么它也就无法处理其他 executor 发过来的 shuffle 的数据读取请求,之前生成的数据也就都没有意义了。

为了解决“取 shuffle 数据”,和“目标 executor 是否运行”分开的问题。Spark 引入了 external-shuffle-service 服务。相当于先把 shuffle 数据暂存到 external-shuffle-service,然后大家去 external-shuffle-service 那里取就行了(类似于中介)。

更多运行原理请见

原来怎么部署

在原 Spark 框架中,external-shuffle-service 是部署在每个节点上的。

Spark on K8s :提升集群资源利用率的方法我也有

图片来自

(1)executor 告诉 external-shuffle-service 数据存放在哪里,(2) external-shuffle-service 记下来,供别人查询。所以问题的关键是,数据放“哪里”支持哪些格式呢?我们看(1)里面通知是结构是这样的:

复制代码
public class RegisterExecutor extends BlockTransferMessage {
public final String appId; // spark application id
public final String execId; // executor id
public final ExecutorShuffleInfo executorInfo; // 《== 文件路径
}

可以看出,关键在 “在哪里” 要看(2):

复制代码
public class ExecutorShuffleInfo implements Encodable {
public final String\[\] localDirs; // 《== 第一级目录列表
public final int subDirsPerLocalDir; // 第二级目录列表
public final String shuffleManager; // shuffleManager 的类型,目前只有一种类型 SortShuffleManager
}

可以看出,shuffle 数据 “在哪里” 只能支持 HostPath(本地路径)。

问题的关键来了:executor 容器跑在 K8s 节点上面,external-shuffle-service 跑在另一个容器里,要想共享相同 Path 文件,那就必须使用节点路径(k8s-hostpath);要用 Hostpath 还得拥有节点的所有权,这个对于多用户共享的 K8s 集群来说,权限不安全,数据未隔离。

在 K8s 上如何解决

Spark 的 external-shuffle-service 要怎么在 K8s 上运行,这是个问题。

[Spark 社区关于这个有个讨论:]

这个文档主要是表达当前 external-shuffle-service 的实现有缺点:

  1. 多个 Spark 应用共用一个 external-shuffle-service,如果 external-shuffle-service 出问题,多个 Spark 应用都受影响,即隔离性差。
  2. 一个节点一个 external-shuffle-service,导致不同节点间压力不均衡。同时如果节点挂了,external-shuffle-service 也就没了,这个节点上面的所有 executor 都受影响,可靠性差。
  3. 在当前较火热的 Docker 容器环境下,executor 写入的 shuffle 数据(在一个容器内)。不一定就能被 external-shuffle-service 读取到(在另一个容器内)。因为有些 K8s 集群中,管理员出于安全考虑,会强制隔离不同用户的容器,禁止任何共享。

所以提出了改进方向:即 executor 保存 shuffle 数据时,不限定非得是保存在本地 Path 中。

具体实现方案可以有多种:

  1. 保存 shuffle 数据时,通过 external-shuffle-service 上传的方式。
  2. external-shuffle-service 支持 shuffle 数据为远端 uri 地址,而不仅仅是主机路径。
  3. 由 Driver 来维护所有的 shuffle 数据信息,取消 external-shuffle-service 组件。
  4. 将 shuffle 数据保存到分布式存储中。
  5. 将 shuffle 数据上传到 external-shuffle-service,然后由 Driver 跟踪文件路径。

总体思路就是:以前 external-shuffle-service 是本地写,远程读,调整为远程写,远程读

其实要在 K8s 上实现 executor 数量动态调整(dynamic resource allocation),还有另一条小路(即不通过 external-shuffle-service 的方式)。并且这条路已经实现了,在如下 PR 里

实现原理:

当发现 executor 里面是 shuffle 数据没有用了,则可以删除该 executor。如果这个 executor 里面的 shuffle 数据,还会被其他 Jop 读取,那么就保持这个 executor 存活着不被删除。从而实现 executor 数量可以动态调整。

缺点:

可以看出,这种方式其实是缓兵之计。

  • 删除部分暂时不被使用 executor,但是必须保留那些还会被使用的 executor。所以动态效果并不是最优的。
  • 一个 executor 也许最近不被使用,被删除了。但是后续其他 Stage 又有可能去访问那个 shuffle 数据。结果发现找不到(被动态删除嘛),这个时候又得重新计算,浪费性能。

PR 里面的讨论也表示这是无法用来完整替代 external-shuffle-service 的。

社区路标

通过上面的分析,可以基本了解在 K8s 上面跑 external-shuffle-service 的困难和思路。

所以要达到目的的路径为:

  • external-shuffle-service 支持远端保存 shuffle 数据
  • executor 和 external-shuffle-service 共享云端 shuffle 数据
  • executor 数量可以动态调整,不影响功能
  • 在 K8s 上支持了 executor 数量动态调整(dynamic resource allocation)。

Spark 的规划是在 3.0.0 版本提供完整能力, 让我们期待 Spark on K8s 越来越 6 吧。

Spark on K8s :提升集群资源利用率的方法我也有

添加小助手微信,加入【容器魔方】技术社群。
Spark on K8s :提升集群资源利用率的方法我也有

评论

发布