详尽干货!从源码角度看 Golang 的调度(上)

阅读数:337 2019 年 9 月 18 日 10:31

详尽干货!从源码角度看 Golang 的调度(上)

本文由于篇幅过长,分文上下两篇,此篇为上篇。

本文主要从源码角度针对 Go 调度相关进行分析,从进程的启动,到调度循环分析,再到分析几个常见 runtime 下的场景可以清晰的了解调度过程。本文仅关注 linux 系统下的逻辑。代码版本参考 Go1.9.2。

阅读索引

  • 1. 简单概念

    • 1.1 调度器的三个抽象概念:G、M、P

    • 1.2 调度的大致轮廓

  • 2. 进程启动时都做了什么

    • 2.1 runtime.osinit(SB) 方法针对系统环境的初始化

    • 2.2 runtime.schedinit(SB) 调度相关的一些初始化

    • 2.3 runtime·mainPC(SB) 启动监控任务

  • 3. 调度循环都做了什么

    • 3.1 调度器如何开启调度循环

    • 3.2 调度器如何进行调度循环

    • 3.3 多个线程下如何调度

  • 4. 调度循环中如何让出 CPU

    • 4.1 执行完成让出 CPU

    • 4.2 主动让出 CPU

    • 4.3 抢占让出 CPU

    • 4.4 系统调用让出 CPU

  • 5. 待执行 G 的来源

    • 5.1 go func 创建 G

    • 5.2 epoll 来源

  • 6. 看几个主动让出 CPU 的场景

    • 6.1 time.Sleep

    • 6.2 sync.Mutex

    • 6.3 channel

调度器的三个抽象概念:G、M、P

  • G:代表一个 goroutine,每个 goroutine 都有自己独立的栈存放当前的运行内存及状态。可以把一个 G 当做一个任务。

  • M: 代表内核线程 (Pthread),它本身就与一个内核线程进行绑定,goroutine 运行在 M 上。

  • P:代表一个处理器,可以认为一个“有运行任务”的 P 占了一个 CPU 线程的资源,且只要处于调度的时候就有 P。

注:内核线程和 CPU 线程的区别,在系统里可以有上万个内核线程,但 CPU 线程并没有那么多,CPU 线程也就是 Top 命令里看到的 CPU0、CPU1、CPU2…的数量。

三者关系大致如下图:

详尽干货!从源码角度看 Golang 的调度(上)

图 1、图 2 代表 2 个有运行任务时的状态。M 与一个内核线程绑定,可运行的 goroutine 列表存放到 P 里面,然后占用了一个 CPU 线程来运行。

图 3 代表没有运行任务时的状态,M 依然与一个内核线程绑定,由于没有运行任务因此不占用 CPU 线程,同时也不占用 P。

调度的大致轮廓

详尽干货!从源码角度看 Golang 的调度(上)

图中表述了由 go func 触发的调度。先创建 M 通过 M 启动调度循环,然后调度循环过程中获取 G 来执行,执行过程中遇到图中 running G 后面几个 case 再次进入下一循环。

下面从程序启动、调度循环、G 的来源三个角度分析调度的实现。

进程启动时都做了什么?

下面先看一段程序启动的代码

复制代码
// runtime/asm_amd64.s
TEXT runtime·rt0_go(SB),NOSPLIT,$0
...... 此处省略 N 多代码......
ok:
// set the per-goroutine and per-mach "registers"
get_tls(BX) // 将 g0 放到 tls(thread local storage) 里
LEAQ runtime·g0(SB), CX
MOVQ CX, g(BX)
LEAQ runtime·m0(SB), AX
// save m->g0 = g0 // 将全局 M0 与全局 G0 绑定
MOVQ CX, m_g0(AX)
// save m0 to g0->m
MOVQ AX, g_m(CX)
CLD // convention is D is always left cleared
CALL runtime·check(SB)
MOVL 16(SP), AX // copy argc
MOVL AX, 0(SP)
MOVQ 24(SP), AX // copy argv
MOVQ AX, 8(SP)
CALL runtime·args(SB) // 解析命令行参数
CALL runtime·osinit(SB) // 只初始化了 CPU 核数
CALL runtime·schedinit(SB) // 内存分配器、栈、P、GC 回收器等初始化
// create a new goroutine to start program
MOVQ $runtime·mainPC(SB), AX //
PUSHQ AX
PUSHQ $0 // arg size
CALL runtime·newproc(SB) // 创建一个新的 G 来启动 runtime.main
POPQ AX
POPQ AX
// start this M
CALL runtime·mstart(SB) // 启动 M0, 开始等待空闲 G, 正式进入调度循环
MOVL $0xf1, 0xf1 // crash
RET

在启动过程里主要做了这三个事情 (这里只跟调度相关的):

  • 初始化固定数量的 P

  • 创建一个新的 G 来启动 runtime.main, 也就是 runtime 下的 main 方法

  • 创建全局 M0、全局 G0,启动 M0 进入第一个调度循环

M0 是什么?程序里会启动多个 M,第一个启动的叫 M0。

G0 是什么?G 分三种,第一种是执行用户任务的叫做 G,第二种执行 runtime 下调度工作的叫 G0,每个 M 都绑定一个 G0。第三种则是启动 runtime.main 用到的 G。写程序接触到的基本都是第一种

我们按照顺序看是怎么完成上面三个事情的。

runtime.osinit(SB) 方法针对系统环境的初始化

这里实质只做了一件事情,就是获取 CPU 的线程数,也就是 Top 命令里看到的 CPU0、CPU1、CPU2…的数量。

复制代码
// runtime/os_linux.go
func osinit() {
ncpu = getproccount()
}

runtime.schedinit(SB) 调度相关的一些初始化

复制代码
// runtime/proc.go
// 设置最大 M 数量
sched.maxmcount = 10000
// 初始化当前 M, 即全局 M0
mcommoninit(_g_.m)
// 查看应该启动的 P 数量,默认为 cpu core 数.
// 如果设置了环境变量 GOMAXPROCS 则以环境变量为准, 最大不得超过 _MaxGomaxprocs(1024)
procs := ncpu
if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
procs = n
}
if procs > _MaxGomaxprocs {
procs = _MaxGomaxprocs
}
// 调整 P 数量,此时由于是初始化阶段,所以 P 都是新建的
if procresize(procs) != nil {
throw("unknown runnable goroutine during bootstrap")
}

这里 sched.maxmcount 设置了 M 最大的数量,而 M 代表的是系统内核线程,因此可以认为一个进程最大只能启动 10000 个系统线程。

procresize 初始化 P 的数量,procs 参数为初始化的数量,而在初始化之前先做数量的判断,默认是 ncpu(与 CPU 核数相等)。也可以通过环境变量 GOMAXPROCS 来控制 P 的数量。_MaxGomaxprocs 控制了最大的 P 数量只能是 1024。

有些人在进程初始化的时候经常用到 runtime.GOMAXPROCS() 方法,其实也是调用的 procresize 方法重新设置了最大 CPU 使用数量。

runtime·mainPC(SB) 启动监控任务

复制代码
// runtime/proc.go
// The main goroutine.
func main() {
......
// 启动后台监控
systemstack(func() {
newm(sysmon, nil)
})
......
}

在 runtime 下会启动一个全程运行的监控任务,该任务用于标记抢占执行过长时间的 G,以及检测 epoll 里面是否有可执行的 G。下面会详细说到。

最后 runtime·mstart(SB) 启动调度循环

前面都是各种初始化操作,在这里开启了调度器的第一个调度循环。(这里启动的 M 就是 M0)

下面来围绕 G、M、P 三个概念介绍 Goroutine 调度循环的运作流程。

调度循环都做了什么

详尽干货!从源码角度看 Golang 的调度(上)

图 1 代表 M 启动的过程,把 M 跟一个 P 绑定再一起。在程序初始化的过程中说到在进程启动的最后一步启动了第一个 M(即 M0),这个 M 从全局的空闲 P 列表里拿到一个 P,然后与其绑定。而 P 里面有 2 个管理 G 的链表 (runq 存储等待运行的 G 列表,gfree 存储空闲的 G 列表),M 启动后等待可执行的 G。

图 2 代表创建 G 的过程。创建完一个 G 先扔到当前 P 的 runq 待运行队列里。在图 3 的执行过程里,M 从绑定的 P 的 runq 列表里获取一个 G 来执行。当执行完成后,图 4 的流程里把 G 仍到 gfree 队列里。注意此时 G 并没有销毁 (只重置了 G 的栈以及状态),当再次创建 G 的时候优先从 gfree 列表里获取,这样就起到了复用 G 的作用,避免反复与系统交互创建内存。

M 即启动后处于一个自循环状态,执行完一个 G 之后继续执行下一个 G,反复上面的图 2~ 图 4 过程。当第一个 M 正在繁忙而又有新的 G 需要执行时,会再开启一个 M 来执行。

下面详细看下调度循环的实现。

调度器如何开启调度循环

先看一下 M 的启动过程(M0 启动是个特殊的启动过程,也是第一个启动的 M,由汇编实现的初始化后启动,而后续的 M 创建以及启动则是 Go 代码实现)。

复制代码
// runtime/proc.go
func startm(_p_ *p, spinning bool) {
lock(&sched.lock)
if _p_ == nil {
// 从空闲 P 里获取一个
_p_ = pidleget()
......
}
// 获取一个空闲的 m
mp := mget()
unlock(&sched.lock)
// 如果没有空闲 M,则 new 一个
if mp == nil {
var fn func()
if spinning {
// The caller incremented nmspinning, so set m.spinning in the new M.
fn = mspinning
}
newm(fn, _p_)
return
}
......
// 唤醒 M
notewakeup(&mp.park)
}
func newm(fn func(), _p_ *p) {
// 创建一个 M 对象, 且与 P 关联
mp := allocm(_p_, fn)
// 暂存 P
mp.nextp.set(_p_)
mp.sigmask = initSigmask
......
execLock.rlock() // Prevent process clone.
// 创建系统内核线程
newosproc(mp, unsafe.Pointer(mp.g0.stack.hi))
execLock.runlock()
}
// runtime/os_linux.go
func newosproc(mp *m, stk unsafe.Pointer) {
// Disable signals during clone, so that the new thread starts
// with signals disabled. It will enable them in minit.
var oset sigset
sigprocmask(_SIG_SETMASK, &sigset_all, &oset)
ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart)))
sigprocmask(_SIG_SETMASK, &oset, nil)
}
func allocm(_p_ *p, fn func()) *m {
......
mp := new(m)
mp.mstartfn = fn // 设置启动函数
mcommoninit(mp) // 初始化 m
// 创建 g0
// In case of cgo or Solaris, pthread_create will make us a stack.
// Windows and Plan 9 will layout sched stack on OS stack.
if iscgo || GOOS == "solaris" || GOOS == "windows" || GOOS == "plan9" {
mp.g0 = malg(-1)
} else {
mp.g0 = malg(8192 * sys.StackGuardMultiplier)
}
// 把新创建的 g0 与 M 做关联
mp.g0.m = mp
......
return mp
}
func mstart() {
......
mstart1()
}
func mstart1() {
......
// 进入调度循环 (阻塞不返回)
schedule()
}

非 M0 的启动首先从 startm 方法开始启动,要进行调度工作必须有调度处理器 P,因此先从空闲的 P 链表里获取一个 P,在 newm 方法创建一个 M 与 P 绑定。

newm 方法中通过 newosproc 新建一个内核线程,并把内核线程与 M 以及 mstart 方法进行关联,这样内核线程执行时就可以找到 M 并且找到启动调度循环的方法。最后 schedule 启动调度循环

allocm 方法中创建 M 的同时创建了一个 G 与自己关联,这个 G 就是我们在上面说到的 g0。为什么 M 要关联一个 g0?因为 runtime 下执行一个 G 也需要用到栈空间来完成调度工作,而拥有执行栈的地方只有 G,因此需要为每个执行线程里配置一个 g0。

调度器如何进行调度循环

调用 schedule 进入调度器的调度循环后,在这个方法里永远不再返回。下面看下实现。

复制代码
// runtime/proc.go
func schedule() {
_g_ := getg()
// 进入 gc MarkWorker 工作模式
if gp == nil && gcBlackenEnabled != 0 {
gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())
}
if gp == nil {
// Check the global runnable queue once in a while to ensure fairness.
// Otherwise two goroutines can completely occupy the local runqueue
// by constantly respawning each other.
// 每处理 n 个任务就去全局队列获取 G 任务, 确保公平
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
// 从 P 本地获取
if gp == nil {
gp, inheritTime = runqget(_g_.m.p.ptr())
if gp != nil && _g_.m.spinning {
throw("schedule: spinning with local work")
}
}
// 从其它地方获取 G, 如果获取不到则沉睡 M,并且阻塞在这里,直到 M 被再次使用
if gp == nil {
gp, inheritTime = findrunnable() // blocks until work is available
}
......
// 执行找到的 G
execute(gp, inheritTime)
}
// 从 P 本地获取一个可运行的 G
func runqget(_p_ *p) (gp *g, inheritTime bool) {
// If there's a runnext, it's the next G to run.
// 优先从 runnext 里获取一个 G,如果没有则从 runq 里获取
for {
next := _p_.runnext
if next == 0 {
break
}
if _p_.runnext.cas(next, 0) {
return next.ptr(), true
}
}
// 从队头获取
for {
h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with other consumers
t := _p_.runqtail
if t == h {
return nil, false
}
gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
if atomic.Cas(&_p_.runqhead, h, h+1) { // cas-release, commits consume
return gp, false
}
}
}
// 从其它地方获取 G
func findrunnable() (gp *g, inheritTime bool) {
......
// 从本地队列获取
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime
}
// 全局队列获取
if sched.runqsize != 0 {
lock(&sched.lock)
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
if gp != nil {
return gp, false
}
}
// 从 epoll 里取
if netpollinited() && sched.lastpoll != 0 {
if gp := netpoll(false); gp != nil { // non-blocking
......
return gp, false
}
}
......
// 尝试 4 次从别的 P 偷
for i := 0; i < 4; i++ {
for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
if sched.gcwaiting != 0 {
goto top
}
stealRunNextG := i > 2 // first look for ready queues with more than 1 g
// 在这里开始针对 P 进行偷取操作
if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
return gp, false
}
}
}
}
// 尝试从全局 runq 中获取 G
// 在 "sched.runqsize/gomaxprocs + 1"、"max"、"len(_p_.runq))/2" 三个数字中取最小的数字作为获取的 G 数量
func globrunqget(_p_ *p, max int32) *g {
if sched.runqsize == 0 {
return nil
}
n := sched.runqsize/gomaxprocs + 1
if n > sched.runqsize {
n = sched.runqsize
}
if max > 0 && n > max {
n = max
}
if n > int32(len(_p_.runq))/2 {
n = int32(len(_p_.runq)) / 2
}
sched.runqsize -= n
if sched.runqsize == 0 {
sched.runqtail = 0
}
gp := sched.runqhead.ptr()
sched.runqhead = gp.schedlink
n--
for ; n > 0; n-- {
gp1 := sched.runqhead.ptr()
sched.runqhead = gp1.schedlink
runqput(_p_, gp1, false) // 放到本地 P 里
}
return gp
}

schedule 中首先尝试从 P 本地队列中获取 (runqget) 一个可执行的 G,如果没有则从其它地方获取 (findrunnable), 最终通过 execute 方法执行 G。

runqget 先通过 runnext 拿到待运行 G, 没有的话,再从 runq 里面取。

findrunnable 从全局队列、epoll、别的 P 里获取。(后面会扩展分析实现)

在调度的开头出还做了一个小优化:每处理一些任务之后,就优先从全局队列里获取任务,以保障公平性,防止由于每个 P 里的 G 过多,而全局队列里的任务一直得不到执行机会。

这里用到了一个关键方法 getg(),runtime 的代码里大量使用该方法,它由汇编实现,该方法就是获取当前运行的 G,具体实现不再这里阐述。

多个线程下如何调度

抛出一个问题:每个 P 里面的 G 执行时间是不可控的,如果多个 P 同时在执行,会不会出现有的 P 里面的 G 执行不完,有的 P 里面几乎没有 G 可执行呢?

这就要从 M 的自循环过程中如何获取 G、归还 G 的行为说起了,先看图:

详尽干货!从源码角度看 Golang 的调度(上)

图中可以看出有两种途径:1. 借助全局队列 sched.runq 作为中介,本地 P 里的 G 太多的话就放全局里,G 太少的话就从全局取。2. 全局列表里没有的话直接从 P1 里偷取 (steal)。(更多 M 在执行的话,同样的原理,这里就只拿 2 个来举例)

第 1 种途径实现如下:

复制代码
// runtime/proc.go
func runqput(_p_ *p, gp *g, next bool) {
if randomizeScheduler && next && fastrand()%2 == 0 {
next = false
}
// 尝试把 G 添加到 P 的 runnext 节点,这里确保 runnext 只有一个 G,如果之前已经有一个 G 则踢出来放到 runq 里
if next {
retryNext:
oldnext := _p_.runnext
if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
goto retryNext
}
if oldnext == 0 {
return
}
// 把老的 g 踢出来,在下面放到 runq 里
gp = oldnext.ptr()
}
retry:
// 如果 _p_.runq 队列不满,则放到队尾就结束了。
// 试想如果不放到队尾而放到队头里会怎样?如果频繁的创建 G 则可能后面的 G 总是不被执行,对后面的 G 不公平
h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with consumers
t := _p_.runqtail
if t-h < uint32(len(_p_.runq)) {
_p_.runq[t%uint32(len(_p_.runq))].set(gp)
atomic.Store(&_p_.runqtail, t+1) // store-release, makes the item available for consumption
return
}
// 如果队列满了,尝试把 G 和当前 P 里的一部分 runq 放到全局队列
// 因为操作全局需要加锁, 所以名字里带个 slow
if runqputslow(_p_, gp, h, t) {
return
}
// the queue is not full, now the put above must succeed
goto retry
}
func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
var batch [len(_p_.runq)/2 + 1]*g
// First, grab a batch from local queue.
n := t - h
n = n / 2
if n != uint32(len(_p_.runq)/2) {
throw("runqputslow: queue is not full")
}
// 从 runq 头部开始取出一半的 runq 放到临时变量 batch 里
for i := uint32(0); i < n; i++ {
batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr()
}
if !atomic.Cas(&_p_.runqhead, h, h+n) { // cas-release, commits consume
return false
}
// 把要 put 的 g 也放进 batch 去
batch[n] = gp
if randomizeScheduler {
for i := uint32(1); i <= n; i++ {
j := fastrandn(i + 1)
batch[i], batch[j] = batch[j], batch[i]
}
}
// 把取出来的一半 runq 组成链表
for i := uint32(0); i < n; i++ {
batch[i].schedlink.set(batch[i+1])
}
// 将一半的 runq 放到 global 队列里, 一次多转移一些省得转移频繁
lock(&sched.lock)
globrunqputbatch(batch[0], batch[n], int32(n+1))
unlock(&sched.lock)
return true
}
func globrunqputbatch(ghead *g, gtail *g, n int32) {
gtail.schedlink = 0
if sched.runqtail != 0 {
sched.runqtail.ptr().schedlink.set(ghead)
} else {
sched.runqhead.set(ghead)
}
sched.runqtail.set(gtail)
sched.runqsize += n
}

runqput 方法归还执行完的 G,runq 定义是 runq [256]guintptr,有固定的长度,因此当前 P 里的待运行 G 超过 256 的时候说明过多了,则执行 runqputslow 方法把一半 G 扔给全局 G 链表,globrunqputbatch 连接全局链表的头尾指针。

但可能别的 P 里面并没有超过 256,就不会放到全局 G 链表里,甚至可能一直维持在不到 256 个。这就借助第 2 个途径了:

第 2 种途径实现如下:

复制代码
// runtime/proc.go
// 从其它地方获取 G
func findrunnable() (gp *g, inheritTime bool) {
......
// 尝试 4 次从别的 P 偷
for i := 0; i < 4; i++ {
for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
if sched.gcwaiting != 0 {
goto top
}
stealRunNextG := i > 2 // first look for ready queues with more than 1 g
// 在这里开始针对 P 进行偷取操作
if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
return gp, false
}
}
}
}

从别的 P 里面 " 偷取 " 一些 G 过来执行了。runqsteal 方法实现了 " 偷取 " 操作。

复制代码
// runtime/proc.go
// 偷取 P2 一半到本地运行队列,失败则返回 nil
func runqsteal(_p_, p2 *p, stealRunNextG bool) *g {
t := _p_.runqtail
n := runqgrab(p2, &_p_.runq, t, stealRunNextG)
if n == 0 {
return nil
}
n--
// 返回尾部的一个 G
gp := _p_.runq[(t+n)%uint32(len(_p_.runq))].ptr()
if n == 0 {
return gp
}
h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with consumers
if t-h+n >= uint32(len(_p_.runq)) {
throw("runqsteal: runq overflow")
}
atomic.Store(&_p_.runqtail, t+n) // store-release, makes the item available for consumption
return gp
}
// 从 P 里获取一半的 G, 放到 batch 里
func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
for {
// 计算一半的数量
h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with other consumers
t := atomic.Load(&_p_.runqtail) // load-acquire, synchronize with the producer
n := t - h
n = n - n/2
......
// 将偷到的任务转移到本地 P 队列里
for i := uint32(0); i < n; i++ {
g := _p_.runq[(h+i)%uint32(len(_p_.runq))]
batch[(batchHead+i)%uint32(len(batch))] = g
}
if atomic.Cas(&_p_.runqhead, h, h+n) { // cas-release, commits consume
return n
}
}
}

上面可以看出从别的 P 里面偷 (steal) 了一半,这样就足够运行了。有了“偷取”操作也就充分利用了多线程的资源。

评论

发布
用户头像
👍
2019 年 11 月 10 日 18:29
回复
没有更多了