crdb代码阅读05——Stopper

Stopper提供了基于channel来同步不同routine的组件。
有两种可执行任务:Task和Worker。

Stopper结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type Stopper struct {
quiescer chan struct{} // Closed when quiescing
stopper chan struct{} // Closed when stopping
stopped chan struct{} // Closed when stopped completely
onPanic func(interface{}) // called with recover() on panic on any goroutine
stop sync.WaitGroup // Incremented for outstanding workers
mu struct {
syncutil.Mutex
quiesce *sync.Cond // Conditional variable to wait for outstanding tasks
quiescing bool // true when Stop() has been called
numTasks int // number of outstanding tasks
tasks TaskMap
closers []Closer
cancels []func()
}
}

RunWorker流程

1
2
3
4
5
6
7
8
9
10
11
12
func (s *Stopper) RunWorker(ctx context.Context, f func(context.Context)) {
s.stop.Add(1)
go func() {
// Remove any associated span; we need to ensure this because the
// worker may run longer than the caller which presumably closes
// any spans it has created.
ctx = opentracing.ContextWithSpan(ctx, nil)
defer s.Recover(ctx)
defer s.stop.Done()
f(ctx)
}()
}
  1. 对信号量stop加1
  2. 开启goroutine执行函数f
  3. 执行之后对信号量stop减1

RunTask流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (s *Stopper) runPrelude(taskName string) bool {
s.mu.Lock()
defer s.mu.Unlock()
if s.mu.quiescing {
return false
}
s.mu.numTasks++
s.mu.tasks[taskName]++
return true
}
func (s *Stopper) runPostlude(taskName string) {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.numTasks--
s.mu.tasks[taskName]--
s.mu.quiesce.Broadcast()
}
func (s *Stopper) RunTask(ctx context.Context, taskName string, f func(context.Context)) error {
if !s.runPrelude(taskName) {
return errUnavailable
}
// Call f.
defer s.Recover(ctx)
defer s.runPostlude(taskName)
f(ctx)
return nil
}
  1. runPrelude
    • 如果是quiesce状态,则无法执行
    • numTask++, tasks[taskName]++
  2. 顺序执行函数
  3. runPostlude
    • numTask–, tasks[taskName]–
    • 广播quiesce,以便被等待的Quiesce获取

RunAsyncTask执行流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (s *Stopper) RunAsyncTask(
ctx context.Context, taskName string, f func(context.Context),
) error {
taskName = asyncTaskNamePrefix + taskName
if !s.runPrelude(taskName) {
return errUnavailable
}
ctx, span := tracing.ForkCtxSpan(ctx, taskName)
// Call f.
go func() {
defer s.Recover(ctx)
defer s.runPostlude(taskName)
defer tracing.FinishSpan(span)
f(ctx)
}()
return nil
}

与runTask逻辑基本一直,只不过函数的执行使用goroutine执行。

Stopper.Stop流程

  1. Quiesce

    • 执行所有的cancel函数
    • 关闭channle quiescer, close(s.quiescer), 不允许新的Task进入
    • 等待所有正在执行的Task完成
  2. 关闭 channel stopper

    • close(s.stopper)
  3. 等待所有的worker结束

    • s.stop.Wait()
  4. 执行所有的close函数

    1
    2
    3
    for _, c := range s.mu.closers {
    c.Close()
    }
  5. 关闭channel stopped

举个例子

crdb的Server结构体使用stopper来对服务进行管理。

  • 添加close函数到stopper,服务器停止时会调用close函数

关闭用于分布式计算的临时引擎, 最后会调用RocksDB Close方法

1
2
3
4
5
6
7
8
9
10
11
12
tempEngine, err := engine.NewTempEngine(s.cfg.TempStorageConfig)
if err != nil {
return nil, errors.Wrap(err, "could not create temp storage")
}
s.stopper.AddCloser(tempEngine)
// Close closes the database by deallocating the underlying handle.
func (r *RocksDB) Close() {
if r.rdb == nil {
log.Errorf(context.TODO(), "closing unopened rocksdb instance")
return
}
  • 关闭服务器
1
2
3
func (s *Server) Stop() {
s.stopper.Stop(context.TODO())
}
  • runWorker用来等待quiesce,进行关闭操作
1
2
3
4
5
6
s.stopper.RunWorker(workersCtx, func(workersCtx context.Context) {
<-s.stopper.ShouldQuiesce()
if err := httpLn.Close(); err != nil {
log.Fatal(workersCtx, err)
}
})
  • runWorker用来启动服务
1
2
3
s.stopper.RunWorker(workersCtx, func(context.Context) {
netutil.FatalIfUnexpected(httpServer.Serve(httpLn))
})
  • runWorker用来启动连接管理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
s.stopper.RunWorker(pgCtx, func(pgCtx context.Context) {
select {
case <-serveSQL:
case <-s.stopper.ShouldQuiesce():
return
}
netutil.FatalIfUnexpected(httpServer.ServeWith(pgCtx, s.stopper, pgL, func(conn net.Conn) {
connCtx := log.WithLogTagStr(pgCtx, "client", conn.RemoteAddr().String())
setTCPKeepAlive(connCtx, conn)
var serveFn func(ctx context.Context, conn net.Conn) error
if !s.cfg.UseLegacyConnHandling {
serveFn = s.pgServer.ServeConn2
} else {
serveFn = s.pgServer.ServeConn
}
if err := serveFn(connCtx, conn); err != nil && !netutil.IsClosedConnection(err) {
// Report the error on this connection's context, so that we
// know which remote client caused the error when looking at
// the logs.
log.Error(connCtx, err)
}
}))
})

本文标题:crdb代码阅读05——Stopper

文章作者:Louis

发布时间:2018年04月18日 - 10:04

最后更新:2018年04月25日 - 16:04

原始链接:/2018/04/18/crdb05-stop/

许可协议: Louis-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。