NVIDIA 初创加速计划,免费加速您的创业启动 了解详情
写点什么

Golang 并发编程与定时器

  • 2019-12-03
  • 本文字数:7053 字

    阅读完需:约 23 分钟

Golang 并发编程与定时器

5.3 定时器

对于任何一个正在运行的应用,如何获取准确的绝对时间都非常重要,但是在一个分布式系统中我们很难保证各个节点上绝对时间的一致性,哪怕通过 NTP 这种标准的对时协议也只能把时间的误差控制在毫秒级,所以相对时间在一个分布式系统中显得更为重要,我们在这一节中就会介绍 Go 语言中的定时器以及它在并发编程中起到什么样的作用。


绝对时间一定不会是完全准确的,它对于一个运行中的分布式系统其实没有太多指导意义,但是由于相对时间的计算不依赖于外部的系统,所以它的计算可以做的比较准确,我们在这一节中就会介绍 Go 语言中用于计算相对时间的定时器的实现原理。

__1. 结构

timer 就是 Golang 定时器的内部表示,每一个 timer 其实都存储在堆中,tb 就是用于存储当前定时器的桶,而 i 是当前定时器在堆中的索引,我们可以通过这两个变量找到当前定时器在堆中的位置:


type timer struct {    tb *timersBucket    i  int
when int64 period int64 f func(interface{}, uintptr) arg interface{} seq uintptr}
复制代码


when 表示当前定时器(Timer)被唤醒的时间,而 period 表示两次被唤醒的间隔,每当定时器被唤醒时都会调用 f(args, now) 函数并传入 args 和当前时间作为参数。然而这里的 timer 作为一个私有结构体其实只是定时器的运行时表示,time 包对外暴露的定时器使用了如下所示的结构体:


type Timer struct {    C <-chan Time    r runtimeTimer}
复制代码


Timer 定时器必须通过 NewTimer 或者 AfterFunc 函数进行创建,其中的 runtimeTimer 其实就是上面介绍的 timer 结构体,当定时器失效时,失效的时间就会被发送给当前定时器持有的 Channel C,订阅管道中消息的 Goroutine 就会收到当前定时器失效的时间。


time 包中,除了 timerTimer 两个分别用于表示运行时定时器和对外暴露的 API 之外,timersBucket 这个用于存储定时器的结构体也非常重要,它会存储一个处理器上的全部定时器,不过如果当前机器的核数超过了 64 核,也就是机器上的处理器 P 的个数超过了 64 个,多个处理器上的定时器就可能存储在同一个桶中:


type timersBucket struct {    lock         mutex    gp           *g    created      bool    sleeping     bool    rescheduling bool    sleepUntil   int64    waitnote     note    t            []*timer}
复制代码


每一个 timersBucket 中的 t 就是用于存储定时器指针的切片,每一个运行的 Go 语言程序都会在内存中存储着 64 个桶,这些桶中都存储定时器的信息:



每一个桶持有的 timer 切片其实都是一个最小堆,这个最小堆会按照 timer 应该触发的时间对它们进行排序,最小堆最上面的定时器就是最近需要被唤醒的 timer,我们会在下面展开介绍定时器的创建和触发过程。

__2. 工作原理

既然我们已经介绍了定时器的数据结构,接下来我们就可以开始分析它的常见操作以及工作原理了,在这一节中我们将介绍定时器的创建、触发、time.Sleep 与定时器的关系以及计时器 Ticker 的实现原理。

__2.1. 创建

time 包对外提供了两种创建定时器的方法,第一种方法就是 NewTimer 接口,这个接口会创建一个用于通知触发时间的 Channel、调用 startTimer 方法并返回一个创建指向 Timer 结构体的指针:


func NewTimer(d Duration) *Timer {    c := make(chan Time, 1)    t := &Timer{        C: c,        r: runtimeTimer{            when: when(d),            f:    sendTime,            arg:  c,        },    }    startTimer(&t.r)    return t}
复制代码


另一个用于创建 Timer 的方法 AfterFunc 其实也提供了非常相似的结构,与 NewTimer 方法不同的是该方法没有创建一个用于通知触发时间的 Channel,它只会在定时器到期时调用传入的方法:


func AfterFunc(d Duration, f func()) *Timer {    t := &Timer{        r: runtimeTimer{            when: when(d),            f:    goFunc,            arg:  f,        },    }    startTimer(&t.r)    return t}
复制代码


startTimer 基本上就是创建定时器的入口了,所有定时器的创建和重启基本上都需要调用该函数:


func startTimer(t *timer) {    addtimer(t)}
func addtimer(t *timer) { tb := t.assignBucket() tb.addtimerLocked(t)}
复制代码


它会调用 addTimer 函数,这个函数总共做了两件事情,首先通过 assignBucket 方法为当前定时器选择一个 timersBucket 桶,我们会根据当前 Goroutine 所在处理器 P 的 id 选择一个合适的桶,随后调用 addTimerLocked 方法将当前定时器加入桶中:


func (tb *timersBucket) addtimerLocked(t *timer) bool {    t.i = len(tb.t)    tb.t = append(tb.t, t)    if !siftupTimer(tb.t, t.i) {        return false    }    if t.i == 0 {        if tb.sleeping && tb.sleepUntil > t.when {            tb.sleeping = false            notewakeup(&tb.waitnote)        }        if tb.rescheduling {            tb.rescheduling = false            goready(tb.gp, 0)        }        if !tb.created {            tb.created = true            go timerproc(tb)        }    }    return true}
复制代码


addtimerLocked 会先将最新加入的定时器加到队列的末尾,随后调用 siftupTimer 将当前定时器与四叉树(或者四叉堆)中的父节点进行比较,保证父节点的到期时间一定小于子节点:



这个四叉树只能保证父节点的到期时间大于子节点,这对于我们来说其实也足够了,因为我们只关心即将被触发的计数器,如果当前定时器是第一个被加入四叉树的定时器,我们还会通过 go timerproc(tb) 启动一个 Goroutine 用于处理当前树中的定时器,这也是处理定时器的核心方法。

__2.2. 触发

定时器的触发都是由 timerproc 中的一个双层 for 循环控制的,外层的 for 循环主要负责对当前 Goroutine 进行控制,它不仅会负责锁的获取和释放,还会在合适的时机触发当前 Goroutine 的休眠:


func timerproc(tb *timersBucket) {    tb.gp = getg()    for {        tb.sleeping = false        now := nanotime()        delta := int64(-1)
// inner loop
if delta < 0 { tb.rescheduling = true goparkunlock(&tb.lock, waitReasonTimerGoroutineIdle, traceEvGoBlock, 1) continue } tb.sleeping = true tb.sleepUntil = now + delta noteclear(&tb.waitnote) notetsleepg(&tb.waitnote, delta) }}
复制代码


如果距离下一个定时器被唤醒的时间小于 0,当前的 timerproc 就会将 rescheduling 标记设置成 true 并立刻陷入休眠,这其实也意味着当前 timerproc 中不包含任何待处理的定时器,当我们再向该 timerBucket 加入定时器时就会重新唤醒 timerproc Goroutine。


在其他情况下,也就是下一次计数器的响应时间是 now + delta 时,timerproc 中的外层循环会通过 notesleepg 将当前 Goroutine 陷入休眠。


func notetsleepg(n *note, ns int64) bool {    gp := getg()    if gp == gp.m.g0 {        throw("notetsleepg on g0")    }    semacreate(gp.m)    entersyscallblock()    ok := notetsleep_internal(n, ns, nil, 0)    exitsyscall()    return ok}
复制代码


该函数会先获取当前的 Goroutine 并在当前的『CPU 上』创建一个信号量,随后在 entersyscallblockexitsyscall 之间执行系统调用让当前的 Goroutine 陷入休眠并在 ns 纳秒后返回。


内部循环的主要作用就是触发已经到期的定时器,在这个内部循环中,我们会按照以下的流程对当前桶中的定时器进行处理:


  1. 如果桶中不包含任何定时器就会直接返回并陷入休眠等待定时器加入当前桶;

  2. 如果四叉树最上面的定时器还没有到期会通过 notetsleepg 方法陷入休眠等待最近定时器的到期;

  3. 如果四叉树最上面的定时器已经到期;

  4. 当定时器的 period > 0 就会设置下一次会触发定时器的时间并将当前定时器向下移动到对应的位置;

  5. 当定时器的 period <= 0 就会将当前定时器从四叉树中移除;

  6. 在每次循环的最后都会从定时器中取出定时器中的函数、参数和序列号并调用函数触发该计数器;


for {            if len(tb.t) == 0 {                delta = -1                break            }            t := tb.t[0]            delta = t.when - now            if delta > 0 {                break            }            ok := true            if t.period > 0 {                t.when += t.period * (1 + -delta/t.period)                if !siftdownTimer(tb.t, 0) {                    ok = false                }            } else {                last := len(tb.t) - 1                if last > 0 {                    tb.t[0] = tb.t[last]                    tb.t[0].i = 0                }                tb.t[last] = nil                tb.t = tb.t[:last]                if last > 0 {                    if !siftdownTimer(tb.t, 0) {                        ok = false                    }                }                t.i = -1 // mark as removed            }            f := t.f            arg := t.arg            seq := t.seq            f(arg, seq)        }
复制代码


使用 NewTimer 创建的定时器,传入的函数时 sendTime,它会将当前时间发送到定时器持有的 Channel 中,而使用 AfterFunc 创建的定时器,在内层循环中调用的函数就会是调用方传入的函数了。

__2.3. 休眠

如果你使用过一段时间的 Go 语言,你一定在项目中使用过 time 包中的 Sleep 方法让当前的 Goroutine 陷入休眠以等待某些条件的完成或者触发一些定时任务,time.Sleep 就是通过如下所示的 timeSleep 方法完成的:


func timeSleep(ns int64) {    if ns <= 0 {        return    }
gp := getg() t := gp.timer if t == nil { t = new(timer) gp.timer = t } *t = timer{} t.when = nanotime() + ns t.f = goroutineReady t.arg = gp tb := t.assignBucket() lock(&tb.lock) if !tb.addtimerLocked(t) { unlock(&tb.lock) badTimer() } goparkunlock(&tb.lock, waitReasonSleep, traceEvGoSleep, 2)}
复制代码


timeSleep 会创建一个新的 timer 结构体,在初始化的过程中我们会传入当前 Goroutine 应该被唤醒的时间以及唤醒时需要调用的函数 goroutineReady,随后会调用 goparkunlock 将当前 Goroutine 陷入休眠状态,当定时器到期时也会调用 goroutineReady 方法唤醒当前的 Goroutine:


func goroutineReady(arg interface{}, seq uintptr) {    goready(arg.(*g), 0)}
复制代码


time.Sleep 方法其实只是创建了一个会在到期时唤醒当前 Goroutine 的定时器并通过 goparkunlock 将当前的协程陷入休眠状态等待定时器触发的唤醒。

__2.4. Ticker

除了只用于一次的定时器(Timer)之外,Go 语言的 time 包中还提供了用于多次通知的 Ticker 计时器,计时器中包含了一个用于接受通知的 Channel 和一个定时器,这两个字段共同组成了用于连续多次触发事件的计时器:


type Ticker struct {    C <-chan Time // The channel on which the ticks are delivered.    r runtimeTimer}
复制代码


想要在 Go 语言中创建一个计时器只有两种方法,一种是使用 NewTicker 方法显示地创建Ticker 计时器指针,另一种可以直接通过 Tick 方法获取一个会定期发送消息的 Channel:


func NewTicker(d Duration) *Ticker {    if d <= 0 {        panic(errors.New("non-positive interval for NewTicker"))    }    c := make(chan Time, 1)    t := &Ticker{        C: c,        r: runtimeTimer{            when:   when(d),            period: int64(d),            f:      sendTime,            arg:    c,        },    }    startTimer(&t.r)    return t}
func Tick(d Duration) <-chan Time { if d <= 0 { return nil } return NewTicker(d).C}
复制代码


Tick 其实也只是对 NewTicker 的简单封装,从实现上我们就能看出来它其实就是调用了 NewTicker 获取了计时器并返回了计时器中 Channel,两个创建计时器的方法的实现都并不复杂而且费容易理解,所以在这里也就不详细展开介绍了。


需要注意的是每一个 NewTicker 方法开启的计时器都需要在不需要使用时调用 Stop 进行关闭,如果不显示调用 Stop 方法,创建的计时器就没有办法被垃圾回收,而通过 Tick 创建的计时器由于只对外提供了 Channel,所以是一定没有办法关闭的,我们一定要谨慎使用这一接口创建计时器。

__3. 性能分析

定时器在内部使用四叉树的方式进行实现和存储,当我们在生产环境中使用定时器进行毫秒级别的计时时,在高并发的场景下会有比较明显的性能问题,我们可以通过实验测试一下定时器在高并发时的性能,假设我们有以下的代码:


func runTimers(count int) {    durationCh := make(chan time.Duration, count)
wg := sync.WaitGroup{} wg.Add(count) for i := 0; i < count; i++ { go func() { startedAt := time.Now() time.AfterFunc(10*time.Millisecond, func() { defer wg.Done() durationCh <- time.Since(startedAt) }) }()
} wg.Wait()
close(durationCh)
durations := []time.Duration{} totalDuration := 0 * time.Millisecond for duration := range durationCh { durations = append(durations, duration) totalDuration += duration } averageDuration := totalDuration / time.Duration(count) sort.Slice(durations, func(i, j int) bool { return durations[i] < durations[j] })
fmt.Printf("run %v timers with average=%v, pct50=%v, pct99=%v\n", count, averageDuration, durations[count/2], durations[int(float64(count)*0.99)])}
复制代码


完整的性能测试代码可以在 benchmark_timers.go 中找到,需要注意的是:由于机器和性能的不同,多次运行测试可能会有不一样的结果。


这段代码开了 N 个 Goroutine 并在每一个 Goroutine 中运行一个定时器,我们会在定时器到期时将开始计时到定时器到期所用的时间加入 Channel 并用于之后的统计,在函数的最后我们会计算出 N 个 Goroutine 中定时器到期时间的平均数、50 分位数和 99 分位数:


$ go test ./... -v=== RUN   TestTimersrun 1000 timers with average=10.367111ms, pct50=10.234219ms, pct99=10.913219msrun 2000 timers with average=10.431598ms, pct50=10.37367ms, pct99=11.025823msrun 5000 timers with average=11.873773ms, pct50=11.986249ms, pct99=12.673725msrun 10000 timers with average=11.954716ms, pct50=12.313613ms, pct99=13.507858msrun 20000 timers with average=11.456237ms, pct50=10.625529ms, pct99=25.246254msrun 50000 timers with average=21.223818ms, pct50=14.792982ms, pct99=34.250143msrun 100000 timers with average=36.010924ms, pct50=31.794761ms, pct99=128.089527msrun 500000 timers with average=176.676498ms, pct50=138.238588ms, pct99=676.967558ms--- PASS: TestTimers (1.21s)
复制代码


我们将上述代码输出的结果绘制成如下图所示的折线图,其中横轴是并行定时器的个数,纵轴表示定时器从开始到触发时间的差值,三个不同的线分别表示时间的平均值、50 分位数和 99 分位数:



虽然测试的数据可能有一些误差,但是从图中我们也能得出一些跟定时器性能和现象有关的结论:


  • 定时器触发的时间一定会晚于创建时传入的时间,假设定时器需要等待 10ms 触发,那它触发的时间一定是晚于 10ms 的;

  • 当并发的定时器数量达到 5000 时,定时器的平均误差达到了 ~18%,99 分位数上的误差达到了 ~26%;

  • 并发定时器的数量超过 5000 之后,定时器的误差就变得非常明显,不能有效、准确地完成计时任务;


这其实也是因为定时器从开始到触发的时间间隔非常短,当我们将计时的时间改到 100ms 时就会发现性能问题有比较明显的改善:



哪怕并行运行了 10w 个定时器,99 分位数的误差也只有 ~12%,我们其实能够发现 Go 语言标准库中的定时器在计时时间较短并且并发较高时有着非常明显的问题,所以在一些性能非常敏感的基础服务中使用定时器一定要非常注意 —— 它可能达不到我们预期的效果。


不过哪怕我们不主动使用定时器,而是使用 context.WithDeadline 这种方法,由于它底层也会使用定时器实现,所以仍然会受到影响。

__4. 总结

Go 语言的定时器在并发编程起到了非常重要的作用,它能够为我们提供比较准确的相对时间,基于它的功能,标准库中还提供了计时器、休眠等接口能够帮助我们在 Go 语言程序中更好地处理过期和超时等问题。


标准库中的定时器在大多数情况下是能够正常工作并且高效完成任务的,但是在遇到极端情况或者性能敏感场景时,它可能没有办法胜任,而在 10ms 的这个粒度下,作者在社区中也没有找到能够使用的定时器实现,一些使用时间轮算法的开源库也不能很好地完成这个任务。

__5. Reference

__6. 其他

__6.1. 关于图片和转载

本文转载自 Draveness 技术博客。


原文链接:https://draveness.me/golang/concurrency/golang-timer.html


2019-12-03 15:091907

评论

发布
暂无评论
发现更多内容

2022-04-27:用go语言重写ffmpeg的remuxing.c示例。

福大大架构师每日一题

golang ffmpeg

如何在 Linux 中查看正在运行的进程?这三个命令轻松实现!

wljslmz

Linux 三周年连更

性能大PK count(*)、count(1)和count(列)

架构精进之路

MySQL 数据库 后端 innodb 三周年连更

数据标注,优化模型辅助标注、Label 库管理|ModelWhale 版本更新

ModelWhale

云计算 编程 模型 数据标注 数据门户

龙蜥开发者说:亲历从基础设施构建到系统质量保障,龙蜥未来可期 | 第 19 期

OpenAnolis小助手

Linux 开源 sig 龙蜥开发者说 联通数科

中国厨房更净一步:一场科技“下凡”带来的方太式浪漫

脑极体

Django笔记十八之save函数的继承操作和指定字段更新等实例方法

Hunter熊

Python django save auto_increment primary key

在oracle apex中的Dialog的做法(第一部门):

back_wang

oracle dialog oracle ebs ebs

ChatGPT 在 Python WEB 的Prompt项目分享

Marvin Ma

Python 开发 ChatGPT

ChatGPT-4!又双叒叕写了一本量化交易的书(附下载)

量化投资与机器学习

机器学习 量化投资 ChatGPT4 对冲基金 Quant

和ChatGPT结对完成VS CODE插件项目分享

Marvin Ma

vscode 插件 插件开发 ChatGPT

Go 语言中没有枚举类型,但是我们可以这样做

陈明勇

Go golang 枚举 Enum 三周年连更

NCCL源码解析③:机器内拓扑分析

OneFlow

Go的内存模型:如何保证并发读写的顺序性?

Jack

轻量级协作任务管理看板

顿顿顿

敏捷开发 任务管理 敏捷开发管理工具 看板工具 scrum工具

【源码分析】【seata】at 模式分布式事务 -xid隐式传递

如果晴天

源码分析 分布式事务 seata spring-cloud Seata框架

ZincSearch 一款 Elasticsearch 的轻量级替代品

宇宙之一粟

Go 三周年连更 ZincSearch

华为阅读联手20余家头部内容平台,共建数字化阅读

最新动态

2023 年要避免的 17 个致命的网站设计错误

海拥(haiyong.site)

三周年连更

eBPF的发展演进---从石器时代到成为神(四)

统信软件

操作系统 Linux Kenel

Apache Doris 1.2.4 Release 版本正式发布|版本通告

SelectDB

数据库 大数据 数据分析 Doris 联邦查询和分析

场景篇-ChatGPT帮我搭建博客网站并自动写博客!

Marvin Ma

博客 ChatGPT

Java 中的 null 到底是什么?

Java架构历程

Java 三周年连更

iOS MachineLearning 系列(8)—— 图片热区分析

珲少

音视频八股文(7)-- 音频aac adts

福大大架构师每日一题

音视频 流媒体

挑战 30 天学完 Python:Day13 列表推导式和Lambda

MegaQi

挑战30天学完Python 三周年连更

2023移动端技术探索

轻口味

android 移动端 行业趋势 三周年连更

人工智能训练数据集:误区、挑战与应对方法

来自四九城儿

一文读懂低代码和零代码两者的区别

这我可不懂

低代码 应用开发 JNPF

《爱在黎明破晓前 | 我有话要说》

后台技术汇

三周年连更

场景篇-ChatGPT帮我实现发送公众号推文

Marvin Ma

微信公众号 代码生成 ChatGPT

Golang 并发编程与定时器_文化 & 方法_Draveness_InfoQ精选文章