golang源码阅读之定时器以及避坑指南

栏目: Go · 发布时间: 4年前

内容简介:本文分为三部分:第一部分为阅读源码后的总结。第二部分为高性能场景使用定时器需要注意的地方。

本文分为三部分:

第一部分为阅读源码后的总结。

第二部分为高性能场景使用定时器需要注意的地方。

第三部分为系统库源码以及我写的注释。

本文基于go version 1.11.4

先放总结

所有业务层的timer对象都被底层的全局容器变量所持有及管理。这里说的全局容器是一个桶(bucket)数组,数组大小固定为64,数组的每个元素为一个桶对象,每个桶内包含一个最小堆和一个loop循环协程(以下简称 桶协程 )。

timer对象归哪个桶管理取决于申请该timer对象时G所在的P(通过P的id取余64作为桶数组下标)。

(关于golang线程调度模型中G P M的概念超出了本文的讨论范围。这里只简单理解G为当前goroutine,P为当前goroutine所属的任务队列。)

由于hash算法和P的id相关,所以一个程序最多有min(64, GOMAXPROCS)个桶在使用。

另外,和桶一对一关联的桶协程是懒开启的,只在桶被初次使用时(即有timer对象hash到了这个桶)才开启,开启后桶协程内部的循环永远不会退出。

不将桶数量直接设置为GOMAXPROCS是因为那样的话数组需要动态申请。

桶数量设置为64是权衡在不同环境下(GOMAXPROCS不同)内存使用以及性能间的一种经验值。

每个桶都有一个最小堆,根据桶内所有timer的超时触发绝对时间点做调整。

关于数据结构最小堆的详细介绍读者可以自行查找资料,这里你只需要知道堆的底层使用数组实现,插入和删除的时间复杂度都是O(logn),并且插入和删除后,最小堆始终保持最小的元素在堆顶位置,所以获取最小元素是O(1)的。

事实上,golang定时器中的最小堆使用的是四叉树实现,相较于常见的二叉树实现,在节点数量比较多时,四叉树对底层数组的访问路径的局部性更好,CPU cache更友好些。

当桶内没timer时,桶协程被挂起。即rescheduling状态。

当桶内还有timer时,桶内协程睡眠直到最小超时触发时间点后再唤醒。即sleeping状态。

当往桶内加入新timer而该timer的超时触发时间点正好是当前桶内最小的,则唤醒桶协程。让桶协程重新判断,设置新的最小超时触发时间点后进入sleeping状态。

由于桶数量是固定的,所以hash桶的操作是无锁的。

但是桶内有互斥锁,因为 桶协程业务层调用Timer的接口 可能并行操作桶内的最小堆和各种标志等变量。

使用timer时,以下几点开销要做到心里有数, 桶内互斥锁的开销,最小堆容器管理的开销,协程调度的开销,创建timer对象时、超时触发返回当前时间时、桶协程内部都会有获取当前时间调用的开销。

高性能场景如何使用

阅读源码的目的,是学习别人写的好的地方,以及保证正确的使用姿势。

你能看出下面这段伪代码存在的问题吗?

func consume() {
  t := new time.NewTimer(5 * time.Second)
  select {
    case <- ch:
      // 做相关的业务
    case <- t:
      // 超时了,做超时处理
  }
}

这是timer常见的一种用法,为某个 消费者 设置消费超时时间。

如果在超时时间内消费 ch 成功了,则timer对象在业务层没有被触发。

那么问题来了,底层从最小堆中删除timer只有两种情况,要么在业务层显式调用Stop方法停止定时器,要么底层判断timer已经到达超时触发时间。刚才这种情况,底层只能等到超时触发时间(伪代码中为5秒后)才能从容器中移除该timer。 即资源被延时释放了。

作为写业务层代码的人,很可能会误认为业务层已经不再使用且不再持有该timer了,资源就被释放了。

如果我们的生产消费非常的频繁,底层容器将堆积大量的timer,从而浪费大量内存和CPU资源。

另外,假设你在其它场景使用了time.Ticker(不同于Timer只在超时后触发一次,Ticker将周期性触发超时)而没有调用Stop(即使业务层已不再持有Ticker对象了),情况将更糟糕,底层容器将一直持有Ticker对象,并周期性触发超时,然后修改下次超时时间点。 资源将永远得不到释放,内存和CPU将永久性的泄漏。

正确的做法应该是:

Ticker对象不再使用后,显式调用Stop方法。

Timer对象不再使用后,在高性能场景下,也应该显式调用Stop方法,及时释放资源。

那么这又分为两种情况,Timer是否已经在业务层触发超时了。

通过阅读系统库源码我们可以得知,对已超时的Timer调用Stop方法内部有变量保护,是安全的。但是这种保护需要拿一次桶内的互斥锁,高性能场景下也需要考虑这个消耗。

所以正确释放Timer对象的做法是,简单点就在上面伪代码的 select 结束后统一调用Stop,精细点就在 ch 得到消费时调用Stop。

我之后会再写一篇文章,关于在某些特定场景下如何自己实现一个简易timer,牺牲部分我们不需要的精确度来大幅提高超时业务逻辑的性能。

这里跑个题,作为一名写 c++ 的golang初学者,我觉得有个根本原因是golang没有析构函数,导致在某些场景下,某些类型对象的清理工作需要显式调用,且有时候还需要保证这些清理工作是once的。而 c++ 在这些场景下可以通过智能指针加析构函数的RAII手法完成。这方面我也没想得太明白。也许是因为golang对象的真正释放可能会依赖gc,而gc的时间点是不固定的,所以即使提供析构函数,也无法做到及时释放对象内部的资源。

部分源码的说明

涉及到文件为:

  • src/time/sleep.go
  • src/time/tick.go
  • src/runtime/time.go
  • 其它一些runtime中的代码

首先看 time/sleep.go ,里面有time.Timer的实现,time.Timer比较简单,只是对runtime包中timer的一层wrap。这层自身实现的最核心功能是将底层的超时回调转换为发送channel消息。

// 这里可以看到是对runtimeTimer的wrap
type Timer struct {
	C <-chan Time
	r runtimeTimer
}

func NewTimer(d Duration) *Timer {
	// 注意,这里的channel是带缓冲的,保证了业务层如果不接收这个channel,底层的
	// 桶协程不会因为发送channel而被阻塞
	c := make(chan Time, 1)
	t := &Timer{
		C: c,
		r: runtimeTimer{
			when: when(d),
			// 向底层timer传入sendTime回调函数
			f:    sendTime,
			arg:  c,
		},
	}
	startTimer(&t.r)
	return t
}

// 将底层的超时回调转化为channel发送,并写入了当前时间
func sendTime(c interface{}, seq uintptr) {
	// Non-blocking send of time on c.
	// Used in NewTimer, it cannot block anyway (buffer).
	// Used in NewTicker, dropping sends on the floor is
	// the desired behavior when the reader gets behind,
	// because the sends are periodic.
	select {
	case c.(chan Time) <- Now():
	default:
	}
}

// After就是匿名Timer
func After(d Duration) <-chan Time {
	return NewTimer(d).C
}

接下来我们看 runtime/time.go

// timer结构体
type timer struct {
	tb *timersBucket // timer所属的桶
	i  int           // 最小堆中的下标,为-1时则不可用了

	// Timer wakes up at when, and then at when+period, ... (period > 0 only)
	// each time calling f(arg, now) in the timer goroutine, so f must be
	// a well-behaved function and not block.
	when   int64 // 超时时间点
	period int64 // 如果是Ticker,会有这个值,周期性触发
	f      func(interface{}, uintptr) // 回调
	arg    interface{} // time.Timer会传入channel变量,一会回调时把channel带回去
	seq    uintptr // 这个变量目前没有用
}

// 桶数量固定为64
const timersLen = 64

// 全局桶数组,还对cache伪共享做了优化
var timers [timersLen]struct {
	timersBucket

	// The padding should eliminate false sharing
	// between timersBucket values.
	pad [sys.CacheLineSize - unsafe.Sizeof(timersBucket{})%sys.CacheLineSize]byte
}

// addTimer时,首先P id取余64获取timer所属的bucket
func (t *timer) assignBucket() *timersBucket {
	id := uint8(getg().m.p.ptr().id) % timersLen
	t.tb = &timers[id].timersBucket
	return t.tb
}

func (tb *timersBucket) addtimerLocked(t *timer) bool {
	// 负数参数保护性代码
	if t.when < 0 {
		t.when = 1<<63 - 1
	}
	// 最小堆插入操作
	t.i = len(tb.t)
	tb.t = append(tb.t, t)
	if !siftupTimer(tb.t, t.i) {
		return false
	}
	// 下标为0,说明该timer的触发时间为当前桶中最早的
	if t.i == 0 {
		// 桶协程在sleep,唤醒它
		if tb.sleeping {
			tb.sleeping = false
			notewakeup(&tb.waitnote)
		}
		// 桶协程被挂起了,重新调度
		if tb.rescheduling {
			tb.rescheduling = false
			goready(tb.gp, 0)
		}
	}
	// 如果timer所属的桶还没有创建,创建并开启桶协程
	if !tb.created {
		tb.created = true
		go timerproc(tb)
	}
	return true
}

// 桶协程,注意,这里有两层for循环,最外面的for是永远不会退出的
func timerproc(tb *timersBucket) {
	tb.gp = getg()
	for {
		// 进互斥锁
		lock(&tb.lock)
		// 睡眠标志修改
		tb.sleeping = false
		// 获取当前时间
		now := nanotime()
		delta := int64(-1)
		for {
			// 如果桶内没有timer,直接退出内层for
			if len(tb.t) == 0 {
				delta = -1
				break
			}
			// 获取最早触发timer,并检查是否到达触发时间
			t := tb.t[0]
			delta = t.when - now
			// 还没到时间,直接退出内层for
			if delta > 0 {
				break
			}
			ok := true
			// 如果是period有值,说明需要周期性触发,我们将该timer修改触发时间后,重新
			// 插入最小堆中
			if t.period > 0 {
				// leave in heap but adjust next time to fire
				t.when += t.period * (1 + -delta/t.period)
				if !siftdownTimer(tb.t, 0) {
					ok = false
				}
			} else {
				// 从最小堆中删除
				last := len(tb.t) - 1
				if last > 0 {
					tb.t[0] = tb.t[last]
					tb.t[0].i = 0
				}
				tb.t[last] = nil
				tb.t = tb.t[:last]
				if last > 0 {
					if !siftdownTimer(tb.t, 0) {
						ok = false
					}
				}
				// 下标设置为-1,deltimer时发现下标为-1则不用删除了
				t.i = -1 // mark as removed
			}
			// 把t中变量拷贝出来,就可以出锁了
			f := t.f
			arg := t.arg
			seq := t.seq
			unlock(&tb.lock)
			// 堆调整时如果下标设置越界了,则丢到这里来处理,badTimer会直接panic
			if !ok {
				badTimer()
			}
			// 如果开了race检查的话
			if raceenabled {
				raceacquire(unsafe.Pointer(t))
			}
			f(arg, seq)
			lock(&tb.lock)
		}
		// 如果桶内没有timer了,把协程挂起
		if delta < 0 || faketime > 0 {
			// No timers left - put goroutine to sleep.
			tb.rescheduling = true
			goparkunlock(&tb.lock, waitReasonTimerGoroutineIdle, traceEvGoBlock, 1)
			continue
		}
		// At least one timer pending. Sleep until then.
		// 如果还有协程,睡眠直到桶内最早触发时间点到达后唤醒
		tb.sleeping = true
		tb.sleepUntil = now + delta
		noteclear(&tb.waitnote)
		unlock(&tb.lock)
		notetsleepg(&tb.waitnote, delta)
	}
}

// Delete timer t from the heap.
// Do not need to update the timerproc: if it wakes up early, no big deal.
func deltimer(t *timer) bool {
	if t.tb == nil {
		// t.tb can be nil if the user created a timer
		// directly, without invoking startTimer e.g
		//    time.Ticker{C: c}
		// In this case, return early without any deletion.
		// See Issue 21874.
		return false
	}

	tb := t.tb

	lock(&tb.lock)
	// t may not be registered anymore and may have
	// a bogus i (typically 0, if generated by Go).
	// Verify it before proceeding.
	i := t.i
	last := len(tb.t) - 1
	// 如果已经触发过或已经被删除了,则返回false告知调用方
	if i < 0 || i > last || tb.t[i] != t {
		unlock(&tb.lock)
		return false
	}
	if i != last {
		tb.t[i] = tb.t[last]
		tb.t[i].i = i
	}
	tb.t[last] = nil
	tb.t = tb.t[:last]
	ok := true
	if i != last {
		if !siftupTimer(tb.t, i) {
			ok = false
		}
		if !siftdownTimer(tb.t, i) {
			ok = false
		}
	}
	unlock(&tb.lock)
	if !ok {
		badTimer()
	}
	return true
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

深入React技术栈

深入React技术栈

陈屹 / 人民邮电出版社 / 2016-11-1 / CNY 79.00

全面讲述React技术栈的第一本原创图书,pure render专栏主创倾力打造 覆盖React、Flux、Redux及可视化,帮助开发者在实践中深入理解技术和源码 前端组件化主流解决方案,一本书玩转React“全家桶” 本书讲解了非常多的内容,不仅介绍了面向普通用户的API、应用架构和周边工具,还深入介绍了底层实现。此外,本书非常重视实战,每一节都有实际的例子,细节丰富。我从这......一起来看看 《深入React技术栈》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具