0%

【Go语言设计与实现】Timer

准确的时间对于任何一个正在运行的应用非常重要,但是在一个分布式系统中我们很难保证各个节点上绝对时间的一致,哪怕通过 NTP 这种标准的对时协议也只能把各个节点上时间的误差控制在毫秒级,所以准确的相对时间在分布式系统中显得更为重要,本节会分析用于获取相对时间的计时器(Timer)的设计与实现原理。

设计原理

Go 语言从实现计时器到现在经历过很多个版本的迭代,到最新的 1.14 版本为止,计时器的实现分别经历了以下几个过程:

  1. Go 1.9 版本之前,所有的计时器由全局唯一的四叉堆维护1
  2. Go 1.10 ~ 1.13,全局使用 64 个四叉堆维护全部的计时器,每个处理器(P)创建的计时器会由对应的四叉堆维护2
  3. Go 1.14 版本之后,每个处理器单独管理计时器并通过网络轮询器触发3

我们在这一节会分别介绍计时器在不同版本的不同设计,梳理计时器实现的演进过程。

全局四叉堆

Go 1.10 之前的计时器都使用最小四叉堆实现,所有的计时器都会存储在如下所示的结构体 runtime.timers#093ade 中:

1
2
3
4
5
6
7
8
9
10
var timers struct {
lock mutex
gp *g
created bool
sleeping bool
rescheduling bool
sleepUntil int64
waitnote note
t []*timer
}

这个结构体中的字段 t 就是最小四叉堆,创建的所有计时器都会加入到四叉堆中。runtime.timerproc#093ade Goroutine 会运行时间驱动的事件,它会在发生以下事件时会被唤醒:

  • 四叉堆中的计时器到期;
  • 四叉堆中加入了触发时间更早的新计时器;

golang-timer-quadtree

图 6-14 计时器四叉堆

然而全局四叉堆共用的互斥锁对计时器的影响非常大,计时器的各种操作都需要获取全局唯一的互斥锁,这会严重影响计时器的性能4

分片四叉堆

Go 1.10 将全局的四叉堆分割成了 64 个更小的四叉堆5。在理想情况下,四叉堆的数量应该等于处理器的数量,但是这需要实现动态的分配过程,所以经过权衡最终选择初始化 64 个四叉堆,以牺牲内存占用的代价换取性能的提升。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
const timersLen = 64

var timers [timersLen]struct {
timersBucket
}

type timersBucket struct {
lock mutex
gp *g
created bool
sleeping bool
rescheduling bool
sleepUntil int64
waitnote note
t []*timer
}

如果当前机器上的处理器 P 的个数超过了 64,多个处理器上的计时器就可能存储在同一个桶中。每一个计时器桶都由一个运行 runtime.timerproc#76f4fd8 函数的 Goroutine 处理。

golang-timer-bucket

图 6-15 分片计时器桶

将全局计时器分片的方式,虽然能够降低锁的粒度,提高计时器的性能,但是 runtime.timerproc#76f4fd8 造成的处理器和线程之间频繁的上下文切换却成为了影响计时器性能的首要因素6

网络轮询器

在最新版本的实现中,计时器桶已经被移除7,所有的计时器都以最小四叉堆的形式存储在处理器 runtime.p 中。

处理器中的最小四叉堆

处理器 runtime.p 中与计时器相关的有以下字段:

  • timersLock — 用于保护计时器的互斥锁;
  • timers — 存储计时器的最小四叉堆;
  • numTimers — 处理器中的计时器数量;
  • adjustTimers — 处理器中处于 timerModifiedEarlier 状态的计时器数量;
  • deletedTimers — 处理器中处于 timerDeleted 状态的计时器数量;
1
2
3
4
5
6
7
8
9
10
type p struct {
...
timersLock mutex
timers []*timer

numTimers uint32
adjustTimers uint32
deletedTimers uint32
...
}

原本用于管理计时器的 runtime.timerproc#76f4fd8 函数也已经被移除,目前计时器都交由处理器的网络轮询器和调度器触发,这种方式能够充分利用本地性、减少线上上下文的切换开销,也是目前性能最好的实现方式。

数据结构

runtime.timer 是 Go 语言计时器的内部表示,每一个计时器都存储在对应处理器的最小四叉堆中,下面是运行时计时器对应的结构体:

1
2
3
4
5
6
7
8
9
10
11
type timer struct {
pp puintptr

when int64
period int64
f func(interface{}, uintptr)
arg interface{}
seq uintptr
nextwhen int64
status uint32
}
  • when — 当前计时器被唤醒的时间;
  • period — 两次被唤醒的间隔;
  • f — 每当计时器被唤醒时都会调用的函数;
  • arg — 计时器被唤醒时调用 f 传入的参数;
  • nextWhen — 计时器处于 timerModifiedXX 状态时,用于设置 when 字段;
  • status — 计时器的状态;

然而这里的 runtime.timer 只是私有的计时器运行时表示,而对外暴露的计时器使用 time.Timer 结构体:

1
2
3
4
type Timer struct {
C <-chan Time
r runtimeTimer
}

time.Timer 计时器必须通过 time.NewTimertime.AfterFunc 或者 time.After 函数创建。 当计时器失效时,失效的时间就会被发送给计时器持有的 Channel,订阅 Channel 的 Goroutine 会收到计时器失效的时间。

状态机

运行时使用状态机的方式处理全部的计时器,其中包括 10 种状态和 7 种操作。由于 Go 语言的计时器需要同时支持增加、删除、修改和重置等操作,所以它的状态非常复杂,目前会包含以下 10 种可能:

状态 解释
timerNoStatus 还没有设置状态
timerWaiting 等待触发
timerRunning 运行计时器函数
timerDeleted 被删除
timerRemoving 正在被删除
timerRemoved 已经被停止并从堆中删除
timerModifying 正在被修改
timerModifiedEarlier 被修改到了更早的时间
timerModifiedLater 被修改到了更晚的时间
timerMoving 已经被修改正在被移动

上述表格已经展示了不同状态的含义,但是我们还需要展示一些重要的信息,例如状态的存在时间、计时器是否在堆上等:

  • timerRunningtimerRemovingtimerModifyingtimerMoving — 停留的时间都比较短;
  • timerWaitingtimerRunningtimerDeletedtimerRemovingtimerModifyingtimerModifiedEarliertimerModifiedLatertimerMoving — 计时器在处理器的堆上;
  • timerNoStatustimerRemoved — 计时器不在堆上;
  • timerModifiedEarliertimerModifiedLater — 计时器虽然在堆上,但是可能位于错误的位置上,需要重新排序;

当我们对计时器执行增删改查等不同操作时,运行时会根据状态的不同而做出不同的反应,所以我们在分析计时器时会从状态的维度去分析其实现原理。

计时器的状态机中包含如下所示的 7 种不同操作,这些操作分别由不同的提交引入运行时负责不同的工作:

  • runtime.addtimer — 向当前处理器增加新的计时器8
  • runtime.deltimer — 将计时器标记成 timerDeleted 删除处理器中的计时器9
  • runtime.modtimer — 网络轮询器会调用该函数修改计时器10
  • runtime.resettimer — 修改已经失效的计时器的到期时间,将其变成活跃的计时器11
  • runtime.cleantimers — 清除队列头中的计时器,能够提升程序创建和删除计时器的性能12
  • runtime.adjusttimers — 调整处理器持有的计时器堆,包括移动会稍后触发的计时器、删除标记为 timerDeleted 的计时器13
  • runtime.runtimer — 检查队列头中的计时器,在其准备就绪时运行该计时器14

我们在这里会依次分析计时器的上述 7 个不同操作。

增加计时器

当我们调用 time.NewTimer 增加新的计时器时,会执行程序中的 runtime.addtimer 函数根据以下的规则处理计时器:

  • timerNoStatus -> timerWaiting
  • 其他状态 -> 崩溃:不合法的状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func addtimer(t *timer) {
if t.status != timerNoStatus {
badTimer()
}
t.status = timerWaiting
addInitializedTimer(t)
}

func addInitializedTimer(t *timer) {
when := t.when
pp := getg().m.p.ptr()
ok := cleantimers(pp) && doaddtimer(pp, t)
if !ok {
badTimer()
}
wakeNetPoller(when)
}
  1. 调用 runtime.addInitializedTimer 将当前计时器加入处理器的 timers 四叉堆中;
  2. 调用 runtime.netpollGenericInit 函数惰性初始化网络轮询器;
  3. 调用 runtime.wakeNetPoller 唤醒网络轮询器中休眠的线程;
  4. 调用 runtime.netpollBreak 函数中断正在阻塞的网络轮询15

每次增加新的计时器都会中断正在阻塞的轮询,触发调度器检查是否有计时器到期,我们会在本节的后面详细介绍计时器的触发过程。

删除计时器

runtime.deltimer 函数会标记需要删除的计时器,它会根据以下的规则处理计时器:

  • timerNoStatus -> 状态保持不变
  • timerModifiedEarlier -> timerModifying -> timerDeleted
  • timerModifiedLater -> timerDeleted
  • timerWaiting -> timerDeleted
  • timerRunningtimerMoving -> 等待状态改变
  • timerModifying -> 崩溃:并发删除或者修改计时器

修改计时器

runtime.modtimer 会修改已经存在的计时器,它会根据以下的规则处理计时器:

  • timerWaiting -> timerModifying -> timerModifiedXX
  • timerModifiedXX -> timerModifying -> timerModifiedYY
  • timerNoStatus -> timerWaiting
  • timerRemoved -> timerWaiting
  • timerRunningtimerMovingtimerRemoving -> 等待状态改变
  • timerDeletedtimerModifying -> 崩溃:并发删除或者修改计时器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func modtimer(t *timer, when, period int64, f func(interface{}, uintptr), arg interface{}, seq uintptr) {
status := uint32(timerNoStatus)
wasRemoved := false
loop:
for {
switch status = atomic.Load(&t.status); status {
...
}
}

t.period = period
t.f = f
t.arg = arg
t.seq = seq

if wasRemoved {
t.when = when
addInitializedTimer(t)
} else {
t.nextwhen = when
newStatus := uint32(timerModifiedLater)
if when < t.when {
newStatus = timerModifiedEarlier
}
...
if newStatus == timerModifiedEarlier {
wakeNetPoller(when)
}
}
}

如果待修改的计时器已经被删除,那么该函数就会调用runtime.addInitializedTimer 创建新的计时器。在正常情况下会根据修改后的时间进行不同的处理:

  • 如果修改后的时间大于或者等于修改前时间,设置计时器的状态为 timerModifiedLater
  • 如果修改后的时间小于修改前时间,设置计时器的状态为 timerModifiedEarlier 并调用 runtime.netpollBreak 触发调度器的重新调度;

因为修改后的时间会影响计时器的处理,所以用于修改计时器的 runtime.modtimer 也是状态机中最复杂的函数了。

重置计时器

runtime.resettimer 会使用新的时间重置一个已经不活跃的计时器,该函数会遵循以下的规则修改计时器的触发时间:

  • timerNoStatus -> timerWaiting
  • timerRemoved -> timerWaiting
  • timerDeleted -> timerModifying -> timerModifiedXX
  • timerRemovingtimerRunning -> 等待状态改变
  • timerWaitingtimerMovingtimerModifiedXXtimerModifying -> 崩溃:在活跃的计时器上调用重置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func resettimer(t *timer, when int64) {
for {
switch s := atomic.Load(&t.status); s {
case timerNoStatus, timerRemoved:
...
case timerDeleted:
tpp := t.pp.ptr()
if atomic.Cas(&t.status, s, timerModifying) {
t.nextwhen = when
newStatus := uint32(timerModifiedLater)
if when < t.when {
newStatus = timerModifiedEarlier
}
atomic.Cas(&t.status, timerModifying, newStatus)
if newStatus == timerModifiedEarlier {
wakeNetPoller(when)
}
return
}
case timerRemoving, timerRunning:
osyield()
default:
badTimer()
}
}
}
  • 如果当前计时器还没有加入四叉堆(

    1
    timerNoStatus

    )或者已经被移除(

    1
    timerRemoved

    );

  • 如果当前计时器刚刚被标记为删除(

    1
    timerDeleted

    );

    • 修改计时器下次触发的时间 nextWhen
    • 根据新的触发时间修改状态至 timerModifiedEarliertimerModifiedLater
    • 如果新的触发时间早于当前状态,调用 runtime.wakeNetPoller 函数触发调度器的调度;

重置计时器的过程与修改计时器的过程有些相似,因为它们修改了计时器的到期时间,所以都需要与 timerModifiedXX 状态和网络轮询器打交道。

清除计时器

runtime.cleantimers 函数会根据状态清理处理器队列头中的计时器,该函数会遵循以下的规则修改计时器的触发时间:

  • timerDeleted -> timerRemoving -> timerRemoved
  • timerModifiedXX -> timerMoving -> timerWaiting
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func cleantimers(pp *p) bool {
for {
if len(pp.timers) == 0 {
return true
}
t := pp.timers[0]
switch s := atomic.Load(&t.status); s {
case timerDeleted:
atomic.Cas(&t.status, s, timerRemoving)
dodeltimer0(pp)
atomic.Cas(&t.status, timerRemoving, timerRemoved)
case timerModifiedEarlier, timerModifiedLater:
atomic.Cas(&t.status, s, timerMoving)

t.when = t.nextwhen

dodeltimer0(pp)
doaddtimer(pp, t)
atomic.Cas(&t.status, timerMoving, timerWaiting)
default:
return true
}
}
}

runtime.cleantimers 函数只会处理计时器状态为 timerDeletedtimerModifiedEarliertimerModifiedLater 的情况:

  • 如果计时器的状态为
1
timerDeleted

  • 将计时器的状态修改成 timerRemoving
  • 调用 runtime.dodeltimer0 删除四叉堆顶上的计时器;
  • 将计时器的状态修改成 timerRemoved
  • 如果计时器的状态为
1
timerModifiedEarlier

或者

1
timerModifiedLater

  • 将计时器的状态修改成 timerMoving
  • 使用计时器下次触发的时间 nextWhen 覆盖 when
  • 调用 runtime.dodeltimer0 删除四叉堆顶上的计时器;
  • 调用 runtime.doaddtimer 将计时器加入四叉堆中;
  • 将计时器的状态修改成 timerWaiting

runtime.cleantimers 函数会删除已经标记的计时器,修改状态为 timerModifiedXX 的计时器。

调整计时器

runtime.adjusttimersruntime.cleantimers 函数的作用比较相似,它们都会删除堆中的计时器并修改状态为 timerModifiedEarliertimerModifiedLater 的计时器的时间,它们也会遵循相同的规则处理计时器状态:

  • timerDeleted -> timerRemoving -> timerRemoved
  • timerModifiedXX -> timerMoving -> timerWaiting
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func adjusttimers(pp *p) {
var moved []*timer
loop:
for i := 0; i < len(pp.timers); i++ {
t := pp.timers[i]
switch s := atomic.Load(&t.status); s {
case timerDeleted:
// 删除堆中的计时器
case timerModifiedEarlier, timerModifiedLater:
// 修改计时器的时间
case ...
}
}
if len(moved) > 0 {
addAdjustedTimers(pp, moved)
}
}

runtime.cleantimers 不同的是,上述函数可能会遍历处理器堆中的全部计时器(包含退出条件),而不是只修改四叉堆顶部。

运行计时器

runtime.runtimer 函数会检查处理器四叉堆上最顶上的计时器,该函数也会处理计时器的删除以及计时器时间的更新,它会遵循以下的规则处理计时器:

  • timerNoStatus -> 崩溃:未初始化的计时器

  • ```
    timerWaiting

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35

    - -> `timerWaiting`
    - -> `timerRunning` -> `timerNoStatus`
    - -> `timerRunning` -> `timerWaiting`

    - `timerModifying` -> 等待状态改变

    - `timerModifiedXX` -> `timerMoving` -> `timerWaiting`

    - `timerDeleted` -> `timerRemoving` -> timerRemoved

    - `timerRunning` -> 崩溃:并发调用该函数

    - `timerRemoved`、`timerRemoving`、`timerMoving` -> 崩溃:计时器堆不一致

    ```go
    func runtimer(pp *p, now int64) int64 {
    for {
    t := pp.timers[0]
    switch s := atomic.Load(&t.status); s {
    case timerWaiting:
    if t.when > now {
    return t.when
    }
    atomic.Cas(&t.status, s, timerRunning)
    runOneTimer(pp, t, now)
    return 0
    case timerDeleted:
    // 删除计时器
    case timerModifiedEarlier, timerModifiedLater:
    // 修改计时器的时间
    case ...
    }
    }
    }

如果处理器四叉堆顶部的计时器没有到触发时间会直接返回,否则调用 runtime.runOneTimer 运行堆顶的计时器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func runOneTimer(pp *p, t *timer, now int64) {
f := t.f
arg := t.arg
seq := t.seq

if t.period > 0 {
delta := t.when - now
t.when += t.period * (1 + -delta/t.period)
siftdownTimer(pp.timers, 0)
atomic.Cas(&t.status, timerRunning, timerWaiting)
updateTimer0When(pp)
} else {
dodeltimer0(pp)
atomic.Cas(&t.status, timerRunning, timerNoStatus)
}

unlock(&pp.timersLock)
f(arg, seq)
lock(&pp.timersLock)
}

根据计时器的 period 字段,上述函数会做出不同的处理:

  • 如果
1
period

字段大于 0;

  • 修改计时器下一次触发的时间并更新其在堆中的位置;
  • 将计时器的状态更新至 timerWaiting
  • 调用 runtime.updateTimer0When 函数设置处理器的 timer0When 字段;
  • 如果
1
period

字段小于或者等于 0;

  • 调用 runtime.dodeltimer0 函数删除计时器;
  • 将计时器的状态更新至 timerNoStatus

更新计时器之后,上述函数会运行计时器中存储的函数并传入触发时间等参数。

触发计时器

我们在上一小节已经分析了计时器状态机中的 10 种状态以及 7 种操作。这里将分析器的触发过程,Go 语言会在两个模块触发计时器,运行计时器中保存的函数:

  • 调度器调度时会检查处理器中的计时器是否准备就绪;
  • 系统监控会检查是否有未执行的到期计时器;

我们将依次分析上述这两个触发过程。

调度器

runtime.checkTimers 是调度器用来运行处理器中计时器的函数,它会在发生以下情况时被调用:

这里就不展开介绍 runtime.scheduleruntime.findrunnable 的实现,重点分析用于执行计时器的runtime.checkTimers 函数,我们将该函数的实现分成调整计时器、运行计时器和删除计时器三个部分。首先是调整堆中计时器的过程:

  • 如果处理器中不存在需要调整的计时器;
    • 当没有需要执行的计时器时,直接返回;
    • 当下一个计时器没有到期并且需要删除的计时器较少时都会直接返回;
  • 如果处理器中存在需要调整的计时器,会调用 runtime.adjusttimers 函数;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {
if atomic.Load(&pp.adjustTimers) == 0 {
next := int64(atomic.Load64(&pp.timer0When))
if next == 0 {
return now, 0, false
}
if now == 0 {
now = nanotime()
}
if now < next {
if pp != getg().m.p.ptr() || int(atomic.Load(&pp.deletedTimers)) <= int(atomic.Load(&pp.numTimers)/4) {
return now, next, false
}
}
}

lock(&pp.timersLock)
adjusttimers(pp)

调整了堆中的计时器之后,会通过 runtime.runtimer 函数依次查找堆中是否存在需要执行的计时器:

  • 如果存在,直接运行计时器;
  • 如果不存在,获取最新计时器的触发时间;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
rnow = now
if len(pp.timers) > 0 {
if rnow == 0 {
rnow = nanotime()
}
for len(pp.timers) > 0 {
if tw := runtimer(pp, rnow); tw != 0 {
if tw > 0 {
pollUntil = tw
}
break
}
ran = true
}
}

runtime.checkTimers 函数的最后,如果当前 Goroutine 的处理器和传入的处理器相同,并且处理器中删除的计时器是堆中计时器的 1/4 以上,就会调用 runtime.clearDeletedTimers 删除处理器全部被标记为 timerDeleted 的计时器,保证堆中靠后的计时器被删除。

1
2
3
4
5
6
7
	if pp == getg().m.p.ptr() && int(atomic.Load(&pp.deletedTimers)) > len(pp.timers)/4 {
clearDeletedTimers(pp)
}

unlock(&pp.timersLock)
return rnow, pollUntil, ran
}

runtime.clearDeletedTimers 能够避免堆中出现大量长时间运行的计时器,该函数和 runtime.moveTimers 也是唯二会遍历计时器堆的函数。

系统监控

系统监控函数 runtime.sysmon 也可能会触发函数的计时器,下面的代码片段中省略了大量与计时器无关的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func sysmon() {
...
for {
...
now := nanotime()
next, _ := timeSleepUntil()
...
lastpoll := int64(atomic.Load64(&sched.lastpoll))
if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
list := netpoll(0)
if !list.empty() {
incidlelocked(-1)
injectglist(&list)
incidlelocked(1)
}
}
if next < now {
startm(nil, false)
}
...
}
  1. 调用 runtime.timeSleepUntil 函数获取计时器的到期时间以及持有该计时器的堆;
  2. 如果超过 10ms 的时间没有轮询,调用 runtime.netpoll 轮询网络;
  3. 如果当前有应该运行的计时器没有执行,可能因为存在无法被抢占的处理器,这时我们应该系统新的线程计时器;

在上述过程中 runtime.timeSleepUntil 函数会遍历 allp 中的全部处理器并查找下一个需要执行的计时器。

小结

Go 语言的计时器在并发编程起到了非常重要的作用,它能够为我们提供比较准确的相对时间,基于它的功能,标准库中还提供了定时器、休眠等接口能够我们在 Go 语言程序中更好地处理过期和超时等问题。

标准库中的计时器在大多数情况下是能够正常工作并且高效完成任务的,但是在遇到极端情况或者性能敏感场景时,它可能没有办法胜任,而在 10ms 的这个粒度下,作者在社区中也没有找到能够使用的计时器实现,一些使用时间轮算法的开源库也不能很好地完成这个任务。

延伸阅读