Stopper提供了基于channel来同步不同routine的组件。
有两种可执行任务:Task和Worker。
Stopper结构体
|
|
RunWorker流程
|
|
- 对信号量stop加1
- 开启goroutine执行函数f
- 执行之后对信号量stop减1
RunTask流程
|
|
- runPrelude
- 如果是quiesce状态,则无法执行
- numTask++, tasks[taskName]++
- 顺序执行函数
- runPostlude
- numTask–, tasks[taskName]–
- 广播quiesce,以便被等待的Quiesce获取
RunAsyncTask执行流程
|
|
与runTask逻辑基本一直,只不过函数的执行使用goroutine执行。
Stopper.Stop流程
-
Quiesce
- 执行所有的cancel函数
- 关闭channle quiescer,
close(s.quiescer)
, 不允许新的Task进入 - 等待所有正在执行的Task完成
-
关闭 channel stopper
- close(s.stopper)
-
等待所有的worker结束
- s.stop.Wait()
-
执行所有的close函数
123for _, c := range s.mu.closers {c.Close()} -
关闭channel stopped
举个例子
crdb的Server结构体使用stopper来对服务进行管理。
- 添加close函数到stopper,服务器停止时会调用close函数
关闭用于分布式计算的临时引擎, 最后会调用RocksDB Close方法
|
|
- 关闭服务器
|
|
- runWorker用来等待quiesce,进行关闭操作
|
|
- runWorker用来启动服务
|
|
- runWorker用来启动连接管理
|
|