【锁定直播】字节、华为云、阿里云等技术专家讨论如何将大模型接入 AIOps 解决实际问题,戳>>> 了解详情
写点什么

详解 Kubernetes Job 和 CronJob 的实现原理

  • 2019-12-03
  • 本文字数:6274 字

    阅读完需:约 21 分钟

详解 Kubernetes Job 和 CronJob 的实现原理

之前介绍了 Kubernetes 中用于长期提供服务的 ReplicaSetDeploymentStatefulSetDaemonSet 等资源,但是作为一个容器编排引擎,任务和定时任务的支持是一个必须要支持的功能。


Kubernetes 中使用 Job 和 CronJob 两个资源分别提供了一次性任务和定时任务的特性,这两种对象也使用控制器模型来实现资源的管理,我们在这篇文章种就会介绍它们的实现原理。

概述

Kubernetes 中的 Job 可以创建并且保证一定数量 Pod 的成功停止,当 Job 持有的一个 Pod 对象成功完成任务之后,Job 就会记录这一次 Pod 的成功运行;当一定数量的 Pod 的任务执行结束之后,当前的 Job 就会将它自己的状态标记成结束。



上述图片中展示了一个 spec.completions=3 的任务的状态随着 Pod 的成功执行而更新和迁移状态的过程,从图中我们能比较清楚的看到 Job 和 Pod 之间的关系,假设我们有一个用于计算圆周率的如下任务:


YAML


apiVersion: batch/v1kind: Jobmetadata:  name: pispec:  completions: 10  parallelism: 5  template:    spec:      containers:      - name: pi        image: perl        command: ["perl",  "-Mbignum=bpi", "-wle", "print bpi(2000)"]      restartPolicy: Never  backoffLimit: 4
复制代码


当我们在 Kubernetes 中创建上述任务时,使用 kubectl 能够观测到以下的信息:


Bash


$ k apply -f job.yamljob.batch/pi created
$ kubectl get job --watchNAME COMPLETIONS DURATION AGEpi 0/10 1s 1spi 1/10 36s 36spi 2/10 46s 46spi 3/10 54s 54spi 4/10 60s 60spi 5/10 65s 65spi 6/10 90s 90spi 7/10 99s 99spi 8/10 104s 104spi 9/10 107s 107spi 10/10 109s 109s
复制代码


由于任务 pi 在配置时指定了 spec.completions=10,所以当前的任务需要等待 10 个 Pod 的成功执行,另一个比较重要的 spec.parallelism=5 表示最多有多少个并发执行的任务,如果 spec.parallelism=1 那么所有的任务都会依次顺序执行,只有前一个任务执行成功时,后一个任务才会开始工作。



每一个 Job 对象都会持有一个或者多个 Pod,而每一个 CronJob 就会持有多个 Job 对象,CronJob 能够按照时间对任务进行调度,它与 crontab 非常相似,我们可以使用 Cron 格式快速指定任务的调度时间:


YAML


apiVersion: batch/v1beta1kind: CronJobmetadata:  name: pispec:  schedule: "*/1 * * * *"  jobTemplate:    spec:      completions: 2      parallelism: 1      template:        spec:          containers:          - name: pi            image: perl            command: ["perl",  "-Mbignum=bpi", "-wle", "print bpi(2000)"]          restartPolicy: OnFailure
复制代码


上述的 CronJob 对象被创建之后,每分钟都会创建一个新的 Job 对象,所有的 CronJob 创建的任务都会带有调度时的时间戳,例如:pi-1551660600 和 pi-1551660660 两个任务:


Bash


$ k get cronjob --watchNAME   SCHEDULE      SUSPEND   ACTIVE   LAST SCHEDULE   AGEpi    */1 * * * *    False     0        <none>          3spi    */1 * * * *    False     1        1s              7s
$ k get job --watchNAME COMPLETIONS DURATION AGEpi-1551660600 0/3 0s 0spi-1551660600 1/3 16s 16spi-1551660600 2/3 31s 31spi-1551660600 3/3 44s 44spi-1551660660 0/3 1spi-1551660660 0/3 1s 1spi-1551660660 1/3 14s 14spi-1551660660 2/3 28s 28spi-1551660660 3/3 42s 43s
复制代码


CronJob 中保存的任务其实是有上限的,spec.successfulJobsHistoryLimitspec.failedJobsHistoryLimit 分别记录了能够保存的成功或者失败的任务上限,超过这个上限的任务都会被删除,默认情况下这两个属性分别为 spec.successfulJobsHistoryLimit=3spec.failedJobsHistoryLimit=1

任务

Job 遵循 Kubernetes 的控制器模式进行设计,在发生需要监听的事件时,Informer 就会调用控制器中的回调将需要处理的资源 Job 加入队列,而控制器持有的工作协程就会处理这些任务。



用于处理 Job 资源的 JobController 控制器会监听 Pod 和 Job 这两个资源的变更事件,而资源的同步还是需要运行 syncJob 方法。

同步

syncJobJobController 中主要用于同步 Job 资源的方法,这个方法的主要作用就是对资源进行同步,它的大体架构就是先获取一些当前同步的基本信息,然后调用 manageJob 方法管理 Job 对应的 Pod 对象,最后计算出处于 activefailedsucceed 三种状态的 Pod 数量并更新 Job 的状态:


Go


func (jm *JobController) syncJob(key string) (bool, error) {  ns, name, _ := cache.SplitMetaNamespaceKey(key)  sharedJob, _ := jm.jobLister.Jobs(ns).Get(name)  job := *sharedJob
jobNeedsSync := jm.expectations.SatisfiedExpectations(key)
pods, _ := jm.getPodsForJob(&job)
activePods := controller.FilterActivePods(pods) active := int32(len(activePods)) succeeded, failed := getStatus(pods) conditions := len(job.Status.Conditions)
复制代码


这一部分代码会从 apiserver 中该名字对应的 Job 对象,然后获取 Job 对应的 Pod 数组并根据计算不同状态 Pod 的数量,为之后状态的更新和比对做准备。


Go


var manageJobErr error  if jobNeedsSync && job.DeletionTimestamp == nil {    active, manageJobErr = jm.manageJob(activePods, succeeded, &job)  }
复制代码


准备工作完成之后,会调用 manageJob 方法,该方法主要负责管理 Job 持有的一系列 Pod 的运行,它最终会返回目前集群中当前 Job 对应的活跃任务的数量。


Go


completions := succeeded  complete := false  if job.Spec.Completions == nil {    if succeeded > 0 && active == 0 {      complete = true    }  } else {    if completions >= *job.Spec.Completions {      complete = true    }  }  if complete {    job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete, "", ""))    now := metav1.Now()    job.Status.CompletionTime = &now  }
if job.Status.Active != active || job.Status.Succeeded != succeeded || job.Status.Failed != failed || len(job.Status.Conditions) != conditions { job.Status.Active = active job.Status.Succeeded = succeeded job.Status.Failed = failed
jm.updateHandler(&job) }
return forget, manageJobErr}
复制代码


最后的这段代码会将 Job 规格中设置的 spec.completions 和已经完成的任务数量进行比对,确认当前的 Job 是否已经结束运行,如果任务已经结束运行就会更新当前 Job 的完成时间,同时当 JobController 发现有一些状态没有正确同步时,也会调用 updateHandler 更新资源的状态。

并行执行

Pod 的创建和删除都是由 manageJob 这个方法负责的,这个方法根据 Job 的 spec.parallelism 配置对目前集群中的节点进行创建和删除。



如果当前正在执行的活跃节点数量超过了 spec.parallelism,那么就会按照创建的升删除多余的任务,删除任务时会使用 DeletePod 方法:


Go


func (jm *JobController) manageJob(activePods []*v1.Pod, succeeded int32, job *batch.Job) (int32, error) {  active := int32(len(activePods))  parallelism := *job.Spec.Parallelism
if active > parallelism { diff := active - parallelism sort.Sort(controller.ActivePods(activePods))
active -= diff wait := sync.WaitGroup{} wait.Add(int(diff)) for i := int32(0); i < diff; i++ { go func(ix int32) { defer wait.Done() jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, job) }(i) } wait.Wait()
复制代码


当正在活跃的节点数量小于 spec.parallelism 时,我们就会根据当前未完成的任务数和并行度计算出最大可以处于活跃的 Pod 个数 wantActive 以及与当前的活跃 Pod 数相差的 diff


Go


} else if active < parallelism {    wantActive := int32(0)    if job.Spec.Completions == nil {      if succeeded > 0 {        wantActive = active      } else {        wantActive = parallelism      }    } else {      wantActive = *job.Spec.Completions - succeeded      if wantActive > parallelism {        wantActive = parallelism      }    }    diff := wantActive - active    if diff < 0 {      diff = 0    }
active += diff
复制代码


在方法的最后就会以批量的方式并行创建 Pod,所有 Pod 的创建都是通过 CreatePodsWithControllerRef 方法在 Goroutine 中执行的。



这里会使用 WaitGroup 等待每个 Batch 中创建的结果返回才会执行下一个 Batch,Batch 的大小是从 1 开始指数增加的,以冷启动的方式避免首次创建的任务过多造成失败:


Go


wait := sync.WaitGroup{}    for batchSize := int32(integer.IntMin(int(diff), controller.SlowStartInitialBatchSize)); diff > 0; batchSize = integer.Int32Min(2*batchSize, diff) {      wait.Add(int(batchSize))      for i := int32(0); i < batchSize; i++ {        go func() {          defer wait.Done()          jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, metav1.NewControllerRef(job, controllerKind))        }()      }      wait.Wait()      diff -= batchSize    }  }
return active, nil}
复制代码


通过对 manageJob 的分析,我们其实能够看出这个方法就是根据规格中的配置对 Pod 进行管理,它在较多并行时删除 Pod,较少并行时创建 Pod,也算是一个简单的资源利用和调度机制,代码非常直白并且容易理解,不需要花太大的篇幅。

定时任务

用于管理 CronJob 资源的 CronJobController 虽然也使用了控制器模式,但是它的实现与其他的控制器不太一样,他没有从 Informer 中接受其他消息变动的通知,而是直接访问 apiserver 中的数据:



从 apiserver 中获取了 Job 和 CronJob 的信息之后就会调用 JobControl 向 apiserver 发起请求创建新的 Job 资源,这一个过程都是由 CronJobControllersyncAll 方法驱动的,我们接下来就来介绍这一方法的实现。

同步

syncAll 方法会从 apiserver 中取出所有的 Job 和 CronJob 对象,然后通过 groupJobsByParent 将任务按照 spec.ownerReferences 进行分类并遍历去同步所有的 CronJob:


Go


func (jm *CronJobController) syncAll() {  jl, _ := jm.kubeClient.BatchV1().Jobs(metav1.NamespaceAll).List(metav1.ListOptions{})  js := jl.Items
sjl, _ := jm.kubeClient.BatchV1beta1().CronJobs(metav1.NamespaceAll).List(metav1.ListOptions{}) sjs := sjl.Items
jobsBySj := groupJobsByParent(js)
for _, sj := range sjs { syncOne(&sj, jobsBySj[sj.UID], time.Now(), jm.jobControl, jm.sjControl, jm.recorder) cleanupFinishedJobs(&sj, jobsBySj[sj.UID], jm.jobControl, jm.sjControl, jm.recorder) }}
复制代码


syncOne 就是用于同步单个 CronJob 对象的方法,这个方法会首先遍历全部的 Job 对象,只保留正在运行的活跃对象并更新 CronJob 的状态:


Go


func syncOne(sj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, recorder record.EventRecorder) {  childrenJobs := make(map[types.UID]bool)  for _, j := range js {    childrenJobs[j.ObjectMeta.UID] = true    found := inActiveList(*sj, j.ObjectMeta.UID)    if found && IsJobFinished(&j) {      deleteFromActiveList(sj, j.ObjectMeta.UID)    }  }
for _, j := range sj.Status.Active { if found := childrenJobs[j.UID]; !found { deleteFromActiveList(sj, j.UID) } }
updatedSJ, _ := sjc.UpdateStatus(sj) *sj = *updatedSJ
复制代码


随后的 getRecentUnmetScheduleTimes 方法会根据 CronJob 的调度配置 spec.schedule 和上一次执行任务的时间计算出我们缺失的任务次数。


Go


times, _ := getRecentUnmetScheduleTimes(*sj, now)  if len(times) == 0 {    return  }
scheduledTime := times[len(times)-1] if sj.Spec.ConcurrencyPolicy == batchv1beta1.ForbidConcurrent && len(sj.Status.Active) > 0 { return } if sj.Spec.ConcurrencyPolicy == batchv1beta1.ReplaceConcurrent { for _, j := range sj.Status.Active { job, _ := jc.GetJob(j.Namespace, j.Name) if !deleteJob(sj, job, jc, recorder) { return } } }
复制代码


如果现在需要调度新的任务,但是当前已经存在活跃的任务,就会根据并发策略的配置执行不同的操作:


  1. 使用 ForbidConcurrent 策略跳过这一次任务的调度直接返回;

  2. 使用 ReplaceConcurrent 策略获取并删除全部活跃的任务,通过创建新的 Pod 替换这些正在执行的活跃 Pod;


Go


jobReq, _ := getJobFromTemplate(sj, scheduledTime)  jobResp, _ := jc.CreateJob(sj.Namespace, jobReq)
ref,_ := getRef(jobResp) sj.Status.Active = append(sj.Status.Active, *ref) sj.Status.LastScheduleTime = &metav1.Time{Time: scheduledTime} sjc.UpdateStatus(sj)
return}
复制代码


在方法的最后,它会从 CronJob 的 spec.jobTemplate 中拿到创建 Job 使用的模板并调用 JobControlCreateJob 向 apiserver 发起创建任务的 HTTP 请求,接下来的操作就都是由 JobController 负责了。

总结

Job 作为 Kubernetes 中用于处理任务的资源,与其他的资源没有太多的区别,它也使用 Kubernetes 中常见的控制器模式,监听 Informer 中的事件并运行 syncHandler 同步任务


而 CronJob 由于其功能的特殊性,每隔 10s 会从 apiserver 中取出资源并进行检查是否应该触发调度创建新的资源,需要注意的是 CronJob 并不能保证在准确的目标时间执行,执行会有一定程度的滞后。


两个控制器的实现都比较清晰,只是边界条件比较多,分析其实现原理时一定要多注意。

相关文章

Referenece


**本文转载自 Draveness 技术博客。


原文链接:https://draveness.me/kubernetes-job-cronjob


2019-12-03 15:12944

评论

发布
暂无评论
发现更多内容

FX影视特效3D动画渲染工具SideFX Houdini安装破解教程

Rose

mac/win Animate 2021新功能 (An 2021中文直装版安装)

Rose

QT项目第一弹-自定义日志输出

springIce

日志 qt

DJ必备:djay - DJ 应用&混音器 (djay pro Ai 激活版mac下载)

Rose

深入理解 Java 变量类型、声明及应用

小万哥

Java 程序人生 编程语言 软件工程 后端开发

5G-A华彩开局,风流还看北京城

脑极体

通信

Minitab Express数据分析适合什么人群?minitab express mac破解资源

Rose

Nodejs - 9步开启JWT身份验证

南城FE

JavaScript 前端 nodejs JWT

听GPT 讲Rust Cargo源代码(5)

fliter

Databend Stream 的设计与实现 | Data Infra 第 18 期

Databend

AI for Science,开启智能科学时代!

脑极体

AI

reallusion卡通动画师最新下载 Cartoon Animator mac破解中文版

Rose

GreatSQL荣获2023 InfoQ“技术生态构建奖” 助力行业变革之路

GreatSQL

10000+AI绘画关键词-涵盖Mid和StableDiffusion

Geek_bbbdb0

Endurance for Mac 专业苹果mac电脑 电池续航提升工具

Rose

灵伴科技(Rokid)借助 Knative 实现 AI 应用云原生 Serverless 化

阿里巴巴云原生

阿里云 云原生 云原生容器

《计算机程序的构造和解释(原书第2版)》PDF

程序员李木子

SwitchResX for Mac v4.13.3 正式版 自定义苹果电脑分辨率

Rose

数据科学家的IDE:JetBrains DataSpell for mac v2023.3.3中文激活版

影影绰绰一往直前

PHP 增量代码规范 PHPCS 通过极狐 GitLab CI 平滑落地

极狐GitLab

Total Video Converter Pro超级转霸 mac破解版 视频格式转换

Rose

MacBooster 8 mac版:一站式系统清理维护工具

Rose

幻兽帕鲁服务器搭建攻略:阿里云平台快速上手指南

全栈若城

【亿级数据专题】「分布式消息引擎」 盘点本年度我们探索服务的HA高可用解决方案

洛神灬殇

分布式 高可用 ha 优化技术 2024年第二十九篇文章

文心一言 VS 讯飞星火 VS chatgpt (192)-- 算法导论14.2 2题

福大大架构师每日一题

福大大架构师每日一题

Splunk Enterprise for Mac(数据分析管理工具) v9.2.0激活版

影影绰绰一往直前

听GPT 讲Rust Cargo源代码(6)

fliter

照片放大工具 Topaz Gigapixel AI for Mac v7.0.1激活版

影影绰绰一往直前

详解 Kubernetes Job 和 CronJob 的实现原理_文化 & 方法_Draveness_InfoQ精选文章