crdb初始化流程介绍

总体介绍

初始化流程分为如下几个阶段:

  1. 命令解析: crdb是单执行程序,根据不用的参数执行不同的命令
  2. 创建临时目录: 临时目录用于处理SQL优化,类似MySQL中的临时表,临时文件
  3. 信号处理: 设置信号处理管道
  4. 日志初始化:初始化日志处理
  5. InitNode: 解析node attributes, 初始化gossip bootstrap resolvers.
  6. NewServer: 初始化server各个组件
  7. Start: 启动server所有组件
  8. 挂起主进程:等待error退出

命令解析

1
2
3
4
> main.main
| > cli.Main
| | > cli.Run
| | | > cli.runStart

cockroach start --insecure --store=node1 --host=localhost为例

设置包含的命令

pkg/cli/cli.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
cockroachCmd.AddCommand(
startCmd,
initCmd,
certCmd,
quitCmd,
sqlShellCmd,
userCmd,
zoneCmd,
nodeCmd,
dumpCmd,
// Miscellaneous commands.
// TODO(pmattis): stats
genCmd,
versionCmd,
debugCmd,
)

设置start命令的描述

pkg/cli/start.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
var startCmd = &cobra.Command{
Use: "start",
Short: "start a node",
Long: `
Start a CockroachDB node, which will export data from one or more
storage devices, specified via --store flags.
If no cluster exists yet and this is the first node, no additional
flags are required. If the cluster already exists, and this node is
uninitialized, specify the --join flag to point to any healthy node
(or list of nodes) already part of the cluster.
`,
Example: ` cockroach start --insecure --store=attrs=ssd,path=/mnt/ssd1 [--join=host:port,[host:port]]`,
RunE: MaybeShoutError(MaybeDecorateGRPCError(runStart)),
}

执行函数为runStart.

start命令的参数设置与存储

  1. 参数设置

pkg/cli/cliflags/flags.go

1
2
3
4
5
6
7
8
9
10
11
var (
....
Store = FlagInfo{
Name: "store",
Shorthand: "s",
Description: `
The file path to a storage device. This flag must be specified separately for
each storage device, for example:
<PRE>
--store=/mnt/ssd01 --store=/mnt/ssd02 --store=/mnt/hda1
  1. 参数存储

pkg/cli/flags.go

cli.init

1
2
3
4
5
6
7
f := startCmd.Flags()
// Server flags.
stringFlag(f, &serverConnHost, cliflags.ServerHost, "")
stringFlag(f, &serverConnPort, cliflags.ServerPort, base.DefaultPort)
stringFlag(f, &serverAdvertiseHost, cliflags.AdvertiseHost, "")
stringFlag(f, &serverAdvertisePort, cliflags.AdvertisePort, "")

创建临时目录

pkg/cli/start.go

cli.runStart

1
2
3
4
var err error
if serverCfg.TempStorageConfig, err = initTempStorageConfig(ctx, serverCfg.Stores.Specs[0]); err != nil {
return err
}

创建临时存储目录cockroach-temp012521878, 创建temp-dirs-record.txt文件,里面存放临时存储目录的全路径。

信号处理

pkg/cli/start.go

cli.runStart

1
2
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)

日志初始化

pkg/util/log/clog.go

1
2
> (l *loggingT) createFile()
| > (sb *syncBuffer) rotateFile
  1. log.Shout:输出到stderr和日志文件
  2. log.Infof: 只输出到日志文件

InitNode

pkg/server/config.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (cfg *Config) InitNode() error {
cfg.readEnvironmentVariables()
// Initialize attributes.
cfg.NodeAttributes = parseAttributes(cfg.Attrs)
// Expose HistogramWindowInterval to parts of the code that can't import the
// server package. This code should be cleaned up within a month or two.
cfg.Config.HistogramWindowInterval = cfg.HistogramWindowInterval()
// Get the gossip bootstrap resolvers.
resolvers, err := cfg.parseGossipBootstrapResolvers()
if err != nil {
return err
}
if len(resolvers) > 0 {
cfg.GossipBootstrapResolvers = resolvers
}
return nil
}
  1. 读取环境变量

  2. 解析attr参数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    Attrs = FlagInfo{
    Name: "attrs",
    Description: `
    An ordered, colon-separated list of node attributes. Attributes are arbitrary
    strings specifying machine capabilities. Machine capabilities might include
    specialized hardware or number of cores (e.g. "gpu", "x16c"). For example:
    <PRE>
    --attrs=x16c:gpu`,
    }
  3. 初始化gossip resolver: 解析joinlist,对每个合理的joinlist上的主机设置一个socketsolver。

NewServer

注册rpc服务

pkg/server/server.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
var earlyRPCWhiteList = map[string]struct{}{
"/cockroach.rpc.Heartbeat/Ping": {},
"/cockroach.gossip.Gossip/Gossip": {},
}
s.grpc = rpc.NewServerWithInterceptor(s.rpcContext, func(fullName string) error {
if atomic.LoadInt32(&s.serveNonGossip) != 0 {
return nil
}
if _, allowed := earlyRPCWhiteList[fullName]; !allowed {
return grpcstatus.Errorf(
codes.Unavailable, "node waiting for gossip network; %s not available", fullName,
)
}
return nil
})
}

pkg/rpc/context.go

1
2
3
4
5
6
7
8
9
func NewServerWithInterceptor(
ctx *Context, interceptor func(fullMethod string) error,
) *grpc.Server {
...
s := grpc.NewServer(opts...)
RegisterHeartbeatServer(s, &HeartbeatService{
clock: ctx.LocalClock,
remoteClockMonitor: ctx.RemoteClocks,
})

pkg/rpc/heartbeat.proto

1
2
3
service Heartbeat {
rpc Ping (PingRequest) returns (PingResponse) {}
}
  1. 设置grpc options
  2. 对unaryInterceptor和streamInterceptor进行注入
  3. 注册HeartBeat服务

注册gossip服务

pkg/server/server.go

1
2
3
4
5
6
7
8
s.gossip = gossip.New(
s.cfg.AmbientCtx,
&s.nodeIDContainer,
s.rpcContext,
s.grpc,
s.stopper,
s.registry,
)

pkg/gossip/gossip.go

1
2
3
4
5
6
7
8
9
10
11
12
func New(
ambient log.AmbientContext,
nodeID *base.NodeIDContainer,
rpcContext *rpc.Context,
grpcServer *grpc.Server,
stopper *stop.Stopper,
registry *metric.Registry,
) *Gossip {
....
if grpcServer != nil {
RegisterGossipServer(grpcServer, g.server)
}

pkg/gossip/gossip.proto

1
2
3
service Gossip {
rpc Gossip (stream Request) returns (stream Response) {}
}

初始化DistSender

pkg/server/server.go
pkg/kv/dist_sender.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
......
distSenderCfg := kv.DistSenderConfig{
AmbientCtx: s.cfg.AmbientCtx,
Settings: st,
Clock: s.clock,
RPCContext: s.rpcContext,
RPCRetryOptions: &retryOpts,
}
if distSenderTestingKnobs := s.cfg.TestingKnobs.DistSender; distSenderTestingKnobs != nil {
distSenderCfg.TestingKnobs = *distSenderTestingKnobs.(*kv.DistSenderTestingKnobs)
}
s.distSender = kv.NewDistSender(distSenderCfg, s.gossip)
}

初始化TxnCoordSender

pkg/server/server.go
pkg/kv/txn_coord_sender.go

1
2
3
4
5
6
7
8
9
s.txnCoordSender = kv.NewTxnCoordSender(
s.cfg.AmbientCtx,
st,
s.distSender,
s.clock,
s.cfg.Linearizable,
s.stopper,
txnMetrics,
)

初始化StorePool

pkg/server/server.go

1
2
3
4
5
6
7
8
s.storePool = storage.NewStorePool(
s.cfg.AmbientCtx,
s.st,
s.gossip,
s.clock,
storage.MakeStorePoolNodeLivenessFunc(s.nodeLiveness),
/* deterministic */ false,
)

pkg/storage/store_pool.go

1
2
3
4
5
6
7
8
9
10
11
12
func NewStorePool(
ambient log.AmbientContext,
st *cluster.Settings,
g *gossip.Gossip,
clock *hlc.Clock,
nodeLivenessFn NodeLivenessFunc,
deterministic bool,
) *StorePool {
........
g.RegisterCallback(storeRegex, sp.storeGossipUpdate)
deadReplicasRegex := gossip.MakePrefixPattern(gossip.KeyDeadReplicasPrefix)
g.RegisterCallback(deadReplicasRegex, sp.deadReplicasGossipUpdate)
  1. StorePool维护集群中所有store的信息及健康状态.
  2. 注册gossip的回调函数,用于更新store的信息。

初始化RaftTranport

pkg/storage/raft_transport

1
2
3
4
5
6
7
8
9
10
11
func NewRaftTransport(
ambient log.AmbientContext,
st *cluster.Settings,
resolver NodeAddressResolver,
grpcServer *grpc.Server,
rpcContext *rpc.Context,
) *RaftTransport {
....
if grpcServer != nil {
RegisterMultiRaftServer(grpcServer, t)
}

pkg/storage/raft.proto

service MultiRaft {
1
2
3
rpc RaftMessageBatch (stream RaftMessageRequestBatch) returns (stream RaftMessageResponse) {}
rpc RaftSnapshot (stream SnapshotRequest) returns (stream SnapshotResponse) {}
}
  1. 注册raft服务
  2. 定时更新message queue的状态(每隔10s)

初始化DBServer

pkg/server/server.go

1
2
s.kvDB = kv.NewDBServer(s.cfg.Config, s.txnCoordSender, s.stopper)
roachpb.RegisterExternalServer(s.grpc, s.kvDB)

pkg/roachpb/api.proto

1
2
3
4
5
6
7
service Internal {
rpc Batch (BatchRequest) returns (BatchResponse) {}
}
service External {
rpc Batch (BatchRequest) returns (BatchResponse) {}
}

初始化LeaseManager

pkg/server/server.go

1
2
3
4
5
6
7
8
9
10
11
// descriptor.ModificationTime <= txn.Timestamp < expirationTime
s.leaseMgr = sql.NewLeaseManager(
s.cfg.AmbientCtx,
&s.nodeIDContainer,
*s.db,
s.clock,
lmKnobs,
s.stopper,
&s.internalMemMetrics,
s.cfg.LeaseManagerConfig,
)

LeaseManager用于获取和释放每个表上的租约.

初始DistSQL用到的temp engine

pkg/server/server.go

1
tempEngine, err := engine.NewTempEngine(s.cfg.TempStorageConfig)

初始化ts server

pkg/server/server.go

1
2
s.tsDB = ts.NewDB(s.db, s.cfg.Settings)
s.tsServer = ts.MakeServer(s.cfg.AmbientCtx, s.tsDB, s.cfg.TimeSeriesServerConfig, s.stopper)

tsDB是一个内部的时序数据库,tsServer对tsDB进行存取操作

生成Node实例

pkg/server/server.go
pkg/server/node.go

1
2
3
4
s.node = NewNode(storeCfg, s.recorder, s.registry, s.stopper, txnMetrics, sql.MakeEventLogger(s.leaseMgr))
roachpb.RegisterInternalServer(s.grpc, s.node)
storage.RegisterConsistencyServer(s.grpc, s.node.storesServer)
serverpb.RegisterInitServer(s.grpc, &noopInitServer{clusterID: s.ClusterID})

pkg/server/node.go

1
2
3
service Consistency {
rpc CollectChecksum(CollectChecksumRequest) returns (CollectChecksumResponse) {}
}

pkg/server/serverpb/init.proto

1
2
3
4
5
service Init {
// Bootstrap an uninitialized cluster.
rpc Bootstrap(BootstrapRequest) returns (BootstrapResponse) {
}
}
  1. 生成Node对象
  2. 注册InternalService
  3. 注册ConsistencySerice
  4. 注册InitService

注册DistSQLServer

pkg/server/server.go

1
2
s.distSQLServer = distsqlrun.NewServer(ctx, distSQLCfg)
distsqlrun.RegisterDistSQLServer(s.grpc, s.distSQLServer)

pkg/sql/distsqlrun/api.proto

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
service DistSQL {
// RunSyncFlow instantiates a flow and streams back results of that flow.
// The request must contain one flow, and that flow must have a single mailbox
// of the special sync response type.
rpc RunSyncFlow(stream ConsumerSignal) returns (stream ProducerMessage) {}
// SetupFlow instantiates a flow (subgraphs of a distributed SQL
// computation) on the receiving node.
rpc SetupFlow(SetupFlowRequest) returns (SimpleResponse) {}
// FlowStream is used to push a stream of messages that is part of a flow. The
// first message will have a StreamHeader which identifies the flow and the
// stream (mailbox).
//
// The response is a stream that the consumer uses to perform a handshake and
// to signal the producer when it wants it to start draining. The client (i.e.
// the producer) will read from this consumer->producer stream until it has
// sent everything it needs to send and it performs CloseSend() on the
// producer->consumer stream; after that point the producer isn't listening
// for consumer signals any more.
rpc FlowStream(stream ProducerMessage) returns (stream ConsumerSignal) {}
}

设置Executor

1
s.sqlExecutor = sql.NewExecutor(execCfg, s.stopper)

设置pg server

pkg/server/server.go

1
2
3
4
5
6
7
8
s.pgServer = pgwire.MakeServer(
s.cfg.AmbientCtx,
s.cfg.Config,
s.sqlExecutor,
&s.internalMemMetrics,
&rootSQLMemoryMonitor,
s.cfg.HistogramWindowInterval(),
)

用于处理客户端连接的请求.

Start

pkg/cli/start.go

1
if err := s.Start(ctx); err != nil {

启动httpserver端口8080

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
httpServer := netutil.MakeServer(s.stopper, tlsConfig, s)
httpLn, err := net.Listen("tcp", s.cfg.HTTPAddr)
s.mux.Handle(adminPrefix, authHandler)
s.mux.Handle(ts.URLPrefix, authHandler)
s.mux.Handle(statusPrefix, authHandler)
s.mux.Handle(authPrefix, gwMux)
s.mux.Handle("/health", gwMux)
s.mux.Handle(statusVars, http.HandlerFunc(s.status.handleVars))
...
s.stopper.RunWorker(workersCtx, func(context.Context) {
netutil.FatalIfUnexpected(httpServer.Serve(httpLn))
})

注册一些服务到8080http服务。

启动tcp server 端口26257

pkg/server/server.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
ln, err := net.Listen("tcp", s.cfg.Addr)
m := cmux.New(ln)
initL := m.Match(func(_ io.Reader) bool {
return atomic.LoadInt32(&initLActive) != 0
})
pgL := m.Match(pgwire.Match)
anyL := m.Match(cmux.Any())
...
serveOnMux.Do(func() {
// A cmux can't gracefully shut down without Serve being called on it.
netutil.FatalIfUnexpected(m.Serve())
})
//监听init服务
initServer := newInitServer(s)
initServer.serve(ctx, initL)
//监听pg连接
netutil.FatalIfUnexpected(httpServer.ServeWith(pgCtx, s.stopper, pgL, func(conn net.Conn) {
connCtx := log.WithLogTagStr(pgCtx, "client", conn.RemoteAddr().String())
//监听rpc服务
s.stopper.RunWorker(workersCtx, func(context.Context) {
netutil.FatalIfUnexpected(s.grpc.Serve(anyL))
})

启动26247端口监控,多路复用监听,分别监听init,pg,以及rpc通信。

启动gossip

1
2
filtered := s.cfg.FilterGossipBootstrapResolvers(ctx, unresolvedListenAddr, unresolvedAdvertAddr)
s.gossip.Start(unresolvedAdvertAddr, filtered)

初始化rocksdb数据节点

1
s.engines, err = s.cfg.CreateEngines(ctx)

写3个addr文件

1
2
3
4
➜ node1 cat cockroach.http-addr
localhost:8080
➜ node1 cat cockroach.listen-addr
localhost:26257

inspectEngines

1
if bootstrappedEngines, _, _, err := inspectEngines(ctx, s.engines, s.cfg.Settings.Version.MinSupportedVersion, s.cfg.Settings.Version.ServerVersion); err != nil {

遍历engine,查看每个engine是否empty或者bootstrapped

rocksdb node1 bootstrap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
if err := s.node.bootstrap(ctx, s.engines, bootstrapVersion); err != nil {
func bootstrapCluster(
func (s *Store) Bootstrap(
if err := engine.MVCCPutProto(
ctx,
batch,
nil,
keys.StoreIdentKey(),
hlc.Timestamp{},
nil,
&s.Ident,
); err != nil {
batch.Close()
return err
}
if err := WriteClusterVersion(ctx, batch, cv); err != nil {
if err := s.BootstrapRange(initialValues, bootstrapVersion.MinimumVersion); err != nil {

初始化集群数据, 写local数据,包含clusterID,nodeid,storeid,以及clusterversion,
同时初始化一个range,包含所有的key,是new cluster的第一个range

node start

1
2
3
4
5
6
7
8
9
10
if err := s.node.start(
ctx,
unresolvedAdvertAddr,
bootstrappedEngines, emptyEngines,
s.cfg.NodeAttributes,
s.cfg.Locality,
cv,
); err != nil {
return err
}
  1. 初始化Node结构
  2. 读取engine中存储的store ident(cluster id, node id, store id)
  3. 遍历store中的所有range,读取range desciptor,获取range描述信息
  4. 为每个range生成replica结构
  5. 启动后台raft goroutine

启动执行器

1
2
s.sqlExecutor.Start(ctx, &s.adminMemMetrics, distSQLPlanner)
s.distSQLServer.Start()
  1. Executor注册systemconfig channel,等待更新系统配置
  2. distSQLServer等待flow命令

等待用户链接

1
2
if err := s.pgServer.ServeConn(connCtx, conn); err != nil && !netutil.IsClosedConnection(err) {
// Report the error on this connection's context, so that we

创建errchan, 挂起主进程

等待各种信号,用于退出

1
2
3
4
5
6
errChan := make(chan error, 1)
select {
case err := <-errChan:
case <-stopper.ShouldStop():
case sig := <-signalCh:

本文标题:crdb初始化流程介绍

文章作者:Louis

发布时间:2017年11月13日 - 10:11

最后更新:2017年11月13日 - 21:11

原始链接:/2017/11/13/crdb-init-intro/

许可协议: Louis-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。