AIGC革命已来,如何在企业场景落地?如何选择模型、怎样应用RAG、需要哪些组织流程配套? 了解详情
写点什么

浅谈 Kubernetes Scheduling-Framework 插件的实现

  • 2020-03-21
  • 本文字数:8583 字

    阅读完需:约 28 分钟

浅谈 Kubernetes Scheduling-Framework 插件的实现

最近几个月一直在研究 Kubernetes 的 scheduling-framework 调度框架,发现还是十分有意思的,我自己也实现了一个基于 scheduling-framework 调度框架的自定义调度器,希望感兴趣的同学一起学习:https://github.com/NJUPT-ISL/Yoda-Scheduler

Scheduling-framework 调度框架

Kubernetes 的 scheduling-framework 调度框架(以下简称调度框架)是针对当前 Kubernetes 调度器的增强,它不同于之前的 scheduler-extender,用户可以编写多个插件,这些插件可以在调度的不同阶段作为原有调度器的扩展,并且这些插件会和 Kubernetes 原有的调度器源代码会一起编译到调度程序中。

调度框架设计目标

  • 增强 Kubernetes 原有调度器的可扩展性。

  • 通过将调度程序的某些功能移至插件,可以简化调度程序核心。

  • 调度框架中可设置多个扩展点。

  • 调度框架通过插件机制来接收插件结果,并根据接收到的结果继续或中止。

  • 提出一种处理错误并将其与插件进行通信的机制。

Proposal

调度框架在 Kubernetes 调度器中定义了很多 Go 的接口和 Go API,用于用户设计插件使用。这些用户设计的插件将会被添加到调度程序中,并在编译时包含在内。可以通过配置调度程序的 ComponentConfig 将允许启用、禁用和重新排序插件。自定义调度程序可以“ 在树外 ” 编写其插件并编译包含其自己的插件的调度程序二进制文件。

调度周期和绑定周期

调度器调度一个 Pod 的过程分为两个阶段:调度周期绑定周期


在调度周期中,调度器会为 Pod 选择一个最合适它运行的节点,然后调度过程将进入绑定周期。


在绑定周期中,调度器会检测调度周期中选中的那个“最合适的节点”是不是真的可以让这个 Pod 稳定的运行(比如检测 PV、检测是否有端口冲突等),或者需不需要做一些初始化操作(比如设置这个节点上的 FPGA 板子的状态、设置 GPU 显卡的驱动版本、CUDA 的版本等)。

扩展点

Kubernetes 调度框架在调度周期和绑定周期都为我们提供了丰富的扩展点,这些扩展点可以“插上”我们自己设计的调度插件,一个插件可以在多个扩展点注册以执行更复杂或有状态的任务,实现我们想要的调度功能:



下面阐述下各个扩展点可以实现的功能。

Sort 排序

排序扩展点,由于调度器是按照 FIFO 的顺序调度 Pod 的,因此当队列里出现多个等待调度的 Pod 时,可以对这些 Pod 的先后顺序进行排序,把我们想要的 Pod(可能优先级比较高)往出队方向移动,让它可以更快地被调度。


目前的 Sort 扩展点只能启用一个,不可以启用多个 Sort 扩展插件。


我们可以看下 Sort 的接口,代码位于 Kubernetes 项目的 /pkg/scheduler/framework/interface.go 中:


type QueueSortPlugin interface {    Plugin    // Less are used to sort pods in the scheduling queue.    Less(*PodInfo, *PodInfo) bool}
复制代码


也就是只需要实现 Less 方法即可,比如如下的实现:


func Less(podInfo1, podInfo2 *framework.PodInfo) bool {    return GetPodPriority(podInfo1) > GetPodPriority(podInfo2)}
复制代码

Pre-filter 预过滤

该扩展点用于预处理有关 Pod 的信息,或检查集群或 Pod 必须满足的某些条件。预过滤器插件应实现 PreFilter 函数,如果 PreFilter 返回错误,则调度周期将中止。注意,在每个调度周期中,只会调用一次 PreFilter。


Pre-filter 插件可以选择实现 PreFilterExtensions 接口,这个接口定义了 AddPodRemovePod 方法以增量方式修改其预处理信息。


type PreFilterPlugin interface {    Plugin    PreFilter(ctx context.Context, state *CycleState, p *v1.Pod) *Status    PreFilterExtensions() PreFilterExtensions}
复制代码


这里的 CycleState ,表示调度的上下文,其实是一个 map 的封装,结构体内部通过读写锁实现了并发安全,开发者可以通过 CycleState 来实现多个调度插件直接的数据传递,也就是多个插件可以共享状态或通过此机制进行通信。


 type CycleState struct {     mx      sync.RWMutex     storage map[StateKey]StateData     recordFrameworkMetrics bool// 该值为 true, 则调度框架会记录此次调度周期的数据 }
复制代码


这里的 StateKey 是 string 类型,StateData 是一个接口类型:


 type StateData interface {     // Clone is an interface to make a copy of StateData. For performance reasons,     // clone should make shallow copies for members (e.g., slices or maps) that are not     // impacted by PreFilter's optional AddPod/RemovePod methods.     Clone() StateData }
复制代码


我们可以做一个简单的接口实现,来实现 StateData:


 type Data struct {     Value int64 }
func (s *Data) Clone() framework.StateData { c := &Data{ Value: s.Value, } return c }
复制代码


那么当插件在该扩展点想传递数据时就可以使用如下类似的代码实现数据的传递:


 Max := Data{Value: 0} state.Lock() state.Write(framework.StateKey("Max"), &Max) defer state.Unlock()
复制代码

Filter 过滤

用于过滤不能满足当前被调度 Pod 运行需求的节点。对于每个节点,调度程序将按配置的顺序调用该类插件。如果有任何过滤器插件将节点标记为不可行,则不会为该节点调用其余插件。可以同时评估节点,并且在同一调度周期中可以多次调用 Filter 插件。这块其实是调度器会启动多个 go 协程以实现对多个节点并发调用 filter,来提高过滤效率。过滤插件其实类似于上一代 Kubernetes 调度器中的预选环节,即 Predicates。


我们看下接口定义:


type FilterPlugin interface {    Plugin    Filter(ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *Status}
复制代码


我们可以对应的实现,比如我这里需要做 GPU 的调度,我需要检查每个节点的 GPU 是否满足 Pod 的运行要求:


func (y *Yoda) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, node *nodeinfo.NodeInfo) *framework.Status {    klog.V(3).Infof("filter pod: %v, node: %v", pod.Name, node.Node().Name)  // 检查节点 GPU 的健康状态    if ok, msg := filter.CheckGPUHealth(node); ok {    // 节点的 GPU 是否符合Pod 运行等级        if !filter.PodFitsLevel(pod, node) {            return framework.NewStatus(framework.Unschedulable, "Node:"+node.Node().Name+" GPU Level Not Fit")        }    // 节点的 GPU 显存是否符合 Pod 运行        if !filter.PodFitsMemory(pod, node) {            return framework.NewStatus(framework.Unschedulable, "Node:"+node.Node().Name+" GPU Memory Not Fit")        }    // 节点的 GPU 数量是否符合 Pod 运行        if !filter.PodFitsNumber(pod, node) {            return framework.NewStatus(framework.Unschedulable, "Node:"+node.Node().Name+" GPU Number Not Fit")        }        return framework.NewStatus(framework.Success, "")    } else {        return framework.NewStatus(framework.Unschedulable, "Node:"+node.Node().Name+msg)    }}
复制代码

Pre-Score 预打分 (v1alpha1 版本称为 Post-Filter)

注意:Pre-Score 从 v1alpha2 开始可用。


该扩展点将使用 通过 Filter 阶段的节点列表 来调用插件。插件可以使用此数据来更新内部状态或生成日志、指标。比如可以通过该扩展点收集各个节点中性能指标,所有节点中最大的内存的节点,性能最好的 CPU 节点等。


我们继续来看接口里长什么样子(我这里是 v1alpha1):


type PostFilterPlugin interface {    Plugin    PostFilter(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses NodeToStatusMap) *Status}
复制代码


针对这个扩展点,通过传递的参数可以看出,接口传入了节点的切片,因此开发者可以通过启动多个并发协程来获取数据,并且可以把这些数据存在 CycleState 中,给之后的插件扩展点使用:


func (y *Yoda) PostFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses framework.NodeToStatusMap) *framework.Status {    klog.V(3).Infof("collect info for scheduling  pod: %v", pod.Name)    return collection.ParallelCollection(collection.Workers, state, nodes, filteredNodesStatuses)}
复制代码


并发这块我们也可以参考 1.13 调度器中经常使用的经典并发模型:


func ParallelCollection(workers int, state *framework.CycleState, nodes []*v1.Node, filteredNodesStatuses framework.NodeToStatusMap) *framework.Status {    var (        stop <-chan struct{}        mx   sync.RWMutex        msg  = ""    )  // 数据存入管道    pieces := len(Sum)    toProcess := make(chan string, pieces)    for _, v := range Sum {        toProcess <- v    }    close(toProcess)  // 并发协程数限制    if pieces < workers {        workers = pieces    }    wg := sync.WaitGroup{}    wg.Add(workers)    for i := 0; i < workers; i++ {        go func() {      // 协程消费管道数据            for value := range toProcess {                select {                case <-stop:                    return                default:          // state 并发安全,调用的时候可以不用加锁                    if re := CollectMaxValue(value, state, nodes, filteredNodesStatuses); !re.IsSuccess() {                        klog.V(3).Infof(re.Message())                        mx.Lock()            // message非并发安全,加锁                        msg += re.Message()                        mx.Unlock()                    }                }            }            wg.Done()        }()    }    wg.Wait()    if msg != "" {        return framework.NewStatus(framework.Error, msg)    }    return framework.NewStatus(framework.Success, "")}
复制代码

Score 打分

Score 扩展点和上一代的调度器的优选流程很像,它分为两个阶段:


  1. 第一阶段称为 “打分”,用于对已通过过滤阶段的节点进行排名。调度程序将为 Score 每个节点调用每个计分插件。

  2. 第二阶段是 “归一化”,用于在调度程序计算节点的最终排名之前修改分数,可以不实现, 但是需要保证 Score 插件的输出必须是 [MinNodeScore,MaxNodeScore][0-100]) 范围内的整数 。如果不是,则调度器会报错,你需要实现 NormalizeScore 来保证最后的得分范围。如果不实现 NormalizeScore,则 Score 的输出必须在此范围内。调度程序将根据配置的插件权重合并所有插件的节点分数。


看看接口的定义:


type ScorePlugin interface {    Plugin    // Score is called on each filtered node. It must return success and an integer    // indicating the rank of the node. All scoring plugins must return success or    // the pod will be rejected.    Score(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) (int64, *Status)    // ScoreExtensions returns a ScoreExtensions interface if it implements one, or nil if does not.    ScoreExtensions() ScoreExtensions}
复制代码


我们也可以做如下简单的实现:


func (y *Yoda) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) {    nodeInfo, err := y.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)    if err != nil {        return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))    }    s, err := score.Score(state, nodeInfo)    if err != nil {        return 0, framework.NewStatus(framework.Error, fmt.Sprintf("Score Node Error: %v", err))    }    klog.V(3).Infof("node : %v yoda-score: %v",nodeName,s)    return s, framework.NewStatus(framework.Success, "")}
复制代码


如果最后的分数不在范围内,我们可能需要实现 NormalizeScore 函数做进一步处理:


func (y *Yoda) NormalizeScore(ctx context.Context, state *framework.CycleState, p *v1.Pod, scores framework.NodeScoreList) *framework.Status {    var (        highest int64 = 0    )    // 归一化     for i, nodeScore := range scores {        scores[i].Score = nodeScore.Score * framework.MaxNodeScore / highest    }    return framework.NewStatus(framework.Success, "")}
复制代码

Reserve 保留

为给定的 Pod 保留节点上的资源时,维护运行时状态的插件可以应实现此扩展点,以由调度程序通知。这是在调度程序实际将 Pod 绑定到 Node 之前发生的,它的存在是为了防止在调度程序等待绑定成功时发生争用情况。


type ReservePlugin interface {    Plugin    // Reserve is called by the scheduling framework when the scheduler cache is    // updated.    Reserve(ctx context.Context, state *CycleState, p *v1.Pod, nodeName string) *Status}
复制代码


这里和上面的 Score 类似,函数并没有提供 nodeInfo 接口,我们可以通过调用 handle.SnapshotSharedLister 来获取节点的信息。


nodeInfo, err := y.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
复制代码


那么以上就是调度周期的插件与实现,其实绑定周期的插件实现和上述的方法也都类似,实现相关的函数即可。

插件注册

每个插件必须定义一个构造函数,并将其添加到硬编码的注册表中。


type PluginFactory = func(runtime.Unknown, FrameworkHandle) (Plugin, error)
type Registry map[string]PluginFactory
func NewRegistry() Registry { return Registry{ fooplugin.Name: fooplugin.New, barplugin.Name: barplugin.New, // New plugins are registered here. }}
复制代码


那么在编译的时候,编译器会将我们的插件和调度源码一起编译成我们的自定义调度器。


在声明插件的时候也需要实现构造函数和对应的方法:


type Yoda struct {    args   *Args    handle framework.FrameworkHandle}
func (y *Yoda) Name() string { return Name}
func New(configuration *runtime.Unknown, f framework.FrameworkHandle) (framework.Plugin, error) { args := &Args{} if err := framework.DecodeInto(configuration, args); err != nil { return nil, err } klog.V(3).Infof("get plugin config args: %+v", args) return &Yoda{ args: args, handle: f, }, nil}
复制代码

编译小技巧

由于最终的调度器还是以容器的方式运行的,我们可以写一个 Makefile 来简化编译流程:


all: local
local: GOOS=linux GOARCH=amd64 go build -o=my-scheduler ./cmd/scheduler
build: sudo docker build --no-cache . -t registry.cn-hangzhou.aliyuncs.com/my/scheduler
push: sudo docker push registry.cn-hangzhou.aliyuncs.com/my/scheduler
format: sudo gofmt -l -w .clean: sudo rm -f my-scheduler
复制代码


编写调度器的 Dockerfile:


FROM debian:stretch-slim
WORKDIR /
COPY my-scheduler /usr/local/bin
CMD ["my-scheduler"]
复制代码


那么编译 -> 构建就可以三步走了:


  • 编译

  • 构建镜像

  • 上传镜像

自定义调度器的配置

首先需要设置一个 ConfigMap ,用于存放调度器的配置文件:


apiVersion: v1kind: ConfigMapmetadata:  name: scheduler-config  namespace: kube-systemdata:  scheduler-config.yaml: |    apiVersion: kubescheduler.config.k8s.io/v1alpha1    kind: KubeSchedulerConfiguration    schedulerName: yoda-scheduler    leaderElection:      leaderElect: true      lockObjectName: yoda-scheduler      lockObjectNamespace: kube-system    plugins:      queueSort:        enabled:          - name: "yoda"      filter:        enabled:        - name: "yoda"      score:        enabled:        - name: "yoda"      postFilter:        enabled:        - name: "yoda"    pluginConfig:    - name: "yoda"      args: {"master": "master", "kubeconfig": "kubeconfig"}
复制代码


这里主要需要修改的就是 schedulerName 字段的调度器名称和 plugins 字段中各个扩展点的插件名称,enable 才能保证该扩展点运行了你的插件。


接着为调度器创建 RBAC:


kind: ClusterRoleapiVersion: rbac.authorization.k8s.io/v1metadata:  name: yoda-crrules:  - apiGroups:      - ""    resources:      - endpoints      - events    verbs:      - create      - get      - update  - apiGroups:      - ""    resourceNames:      - yoda-scheduler    resources:      - endpoints    verbs:      - delete      - get      - patch      - update  - apiGroups:      - ""    resources:      - nodes    verbs:      - get      - list      - watch  - apiGroups:      - ""    resources:      - pods    verbs:      - delete      - get      - list      - watch      - update  - apiGroups:      - ""    resources:      - bindings      - pods/binding    verbs:      - create  - apiGroups:      - ""    resources:      - pods/status    verbs:      - patch      - update  - apiGroups:      - ""    resources:      - replicationcontrollers      - services    verbs:      - get      - list      - watch  - apiGroups:      - apps      - extensions    resources:      - replicasets    verbs:      - get      - list      - watch  - apiGroups:      - apps    resources:      - statefulsets    verbs:      - get      - list      - watch  - apiGroups:      - policy    resources:      - poddisruptionbudgets    verbs:      - get      - list      - watch  - apiGroups:      - ""    resources:      - persistentvolumeclaims      - persistentvolumes    verbs:      - get      - list      - watch  - apiGroups:      - ""    resources:      - configmaps    verbs:      - get      - list      - watch  - apiGroups:      - "storage.k8s.io"    resources:      - storageclasses      - csinodes    verbs:      - watch      - list      - get  - apiGroups:      - "coordination.k8s.io"    resources:      - leases    verbs:      - create      - get      - list      - update  - apiGroups:      - "events.k8s.io"    resources:      - events    verbs:      - create      - patch      - update---apiVersion: v1kind: ServiceAccountmetadata:  name: yoda-sa  namespace: kube-system---kind: ClusterRoleBindingapiVersion: rbac.authorization.k8s.io/v1metadata:  name: yoda-crb  namespace: kube-systemroleRef:  apiGroup: rbac.authorization.k8s.io  kind: ClusterRole  name: yoda-crsubjects:  - kind: ServiceAccount    name: yoda-sa    namespace: kube-system
复制代码


最后配置调度器的 Deployment:


apiVersion: apps/v1kind: Deploymentmetadata:  name: yoda-scheduler  namespace: kube-system  labels:    component: yoda-schedulerspec:  replicas: 1  selector:    matchLabels:      component: yoda-scheduler  template:    metadata:      labels:        component: yoda-scheduler    spec:      serviceAccount: yoda-sa      priorityClassName: system-cluster-critical      volumes:        - name: scheduler-config          configMap:            name: scheduler-config      containers:        - name: yoda-scheduler          image: registry.cn-hangzhou.aliyuncs.com/geekcloud/yoda-scheduler          imagePullPolicy: Always          args:            - yoda-scheduler            - --config=/scheduler/scheduler-config.yaml            - --v=3          resources:            requests:              cpu: "50m"          volumeMounts:            - name: scheduler-config              mountPath: /scheduler
复制代码


随着云计算技术的不断发展,Kubernetes scheduler 也在根据各种复杂的需求不断进化,未来也会涌现更多各种各样的丰富的、支持不同功能的调度器在不同的生产环境中发挥着更多强劲的作用,一起期待吧!

作者介绍

李俊江


Kubernetes & istio member


南京邮电大学物联网学院研究生,热衷于 Kubernetes 与云原生相关技术。


微信:FUNKY-STARS 欢迎交流!

参考

  • Scheduling Framework

  • enhancements/624

  • scheduler-framework-sample

  • Kubernetes 1.13 源码分析

致谢

感谢 Scheduler-SIG Leader HuangWei 大佬在 kubecon 2018 的 Q&A 和指导!


感谢张磊、车漾大佬在 kubecon 2018 的分享和讨论!


作者介绍


本文转载自公众号 ServiceMesher(ID:ServiceMesher)。


原文链接


https://mp.weixin.qq.com/s/If-Y72rnZxXfQvEjd1UsFQ


2020-03-21 14:003963

评论

发布
暂无评论
发现更多内容

借生态力量助力人工智能发展 英特尔这些年做了哪些事?

E科讯

融云参编中国信通院「办公即时通信研究报告」,并入选「典型行业案例」

融云 RongCloud

PaaS 即时通讯 办公 信息 融云

软件测试/测试开发丨Web自动化测试-高级定位CSS

测试人

CSS 程序员 软件测试 自动化测试 测试开发

公用事业财务共享如何建,看南京水务立标杆

用友BIP

财务共享

全球化财务如何通过全球核算助力跨国业务的稳定增长和持续发展?

用友BIP

中企出海 全球化财务

自动驾驶研发解决方案,百度智能云市场份额第一!

Baidu AICLOUD

自动驾驶 AI 大底座

网络性能问题排查思路

蓝胖子的编程梦

TCP 网络 问题排查 问题定位 问题解析

PAG动效框架源码笔记 (四)渲染框架

olinone

ios android 动画 移动 特效

神笔马良看了都鼓掌

百度Geek说

人工智能 AIGC 企业号 5 月 PK 榜

2022 Kube-OVN开源社区年度报告

York

开源 云原生 k8s 容器网络 cni

强!PCB“金手指”从设计到生产全流程

华秋PCB

工具 PCB 连接器 PCB设计 金手指

JWT真的安全吗?如何解决该问题

做梦都在改BUG

JWT

央企财务共享建设路径四大趋势洞察

用友BIP

财务共享

国内一流地方金融控股集团-长投控股怎样建设财务共享平台?

用友BIP

财务共享

业务高速增长,如祺出行如何用腾讯云消息队列 RocketMQ 应对挑战

科技热闻

2023上海国际嵌入式展 | 如何通过生命周期管理工具创建完整、可追溯的嵌入式开发

龙智—DevSecOps解决方案

嵌入式系统 嵌入式开发 上海国际嵌入式展

NGINX Service Mesh 中的 mTLS 架构

NGINX开源社区

nginx Service Mesh

“源生无限,同行致远”,加速迈向智能世界

说山水

架构训练营模块二作业

Geek_3d7c4d

SpringCloud 中 Zuul 网关原理及其配置

做梦都在改BUG

Java Spring Cloud 网关 Zuul

优秀的屏幕取色软件:ColorSnapper2激活版

真大的脸盆

Mac Mac 软件 屏幕取色器

复盘的价值是什么?

老张

复盘 复盘归因

解读与用户一起“跳动”的开源实时监控工具 HertzBeat

华为云开发者联盟

后端 开发 华为云 华为云开发者联盟 企业号 5 月 PK 榜

淬体归元,运营商资源域元数据管理

鲸品堂

大数据 管理 元数据 企业号 5 月 PK 榜

DB-GPT: Github 两周2.6k star 数据库领域的GPT来了~

csunny

GPT autogpt LLMs

代码级质量技术之基本框架介绍

百度Geek说

单元测试 开发语言 C++ 企业号 5 月 PK 榜

平行云X火山引擎:探索XR观展的极致体验

火山引擎边缘云

XR 火山引擎 实时云渲染 平行云 火山引擎边缘云

伊利与腾讯云 CODING 的「水乳交融」启示录

CODING DevOps

DevOps 最佳实践 数字化转型 CODING

MaxCompute 物化视图智能推荐最佳实践

阿里云大数据AI技术

大数据 算力加速 智能推荐 企业号 5 月 PK 榜

合合信息商业大数据技术为农商行打造数字化转型专项方案

合合技术团队

数据挖掘 大数据 金融

OIDC & OAuth2.0 协议及其授权模式详解|认证协议最佳实践系列【1】

Authing

身份认证 OAuth 2.0 单点登录 OIDC

浅谈 Kubernetes Scheduling-Framework 插件的实现_移动_李俊江_InfoQ精选文章