crdb insert流程

语句

1
2
3
4
5
6
7
8
9
create database test;
use test;
create table t1(c1 int primary key, c2 varchar(20));
begin;
insert into t1 values(2, 'aaaaaaa');

如何查看rocksdb存储内容

扫描node1数据目录下的rocksdb数据库

1
cockroach debug rocksdb scan --value_hex --db=./node1/ > /tmp/b

InsertRow将value转换成Client.Batch

如果一条语句中包含多行insert,有利于批量传递修改

涉及到的文件和结构体

pkg/sql/insert.go insertNode
pkg/sql/tablewriter.go tableInserter
pkg/sql/sqlbase/rowwriter.go RowInserter
pkg/internal/client/batch.go Batch
pkg/roachpb/api.go Batch

堆栈代码

1
2
3
4
5
6
7
8
> func (n *insertNode) internalNext
| > func (ti *tableInserter) row
| | > func (ri *RowInserter) InsertRow
| | | > ri.Helper.encodeIndexes //生成primary index key & second index kv
| | | > func EncodeTableValue// encode value
| | | > func insertCPutFn // 生成kv请求
| | | | > func (b *Batch) CPut
| | | | | > func NewConditionalPut //生成conditionput request

key & value结构

Primary Index

key: tableid, indexid, pk1, pk2 …
value: encoded values. EncodeTableValue

encoded value: colid|Type, len, value

Secondary Index

Unique

key: tableid, indexid, key1, key2…
value: pk1, pk2…

Non-Unique

key: tableid, indexid, key1, key2…, pk1, pk2…
value: empty

txn.Run将client.batch写入到rocksdb

堆栈代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
backtrace:
0 pkg/storage/engine.mvccPutInternal at ./pkg/storage/engine/mvcc.go:1168
1 pkg/storage/engine.mvccPutUsingIter at ./pkg/storage/engine/mvcc.go:963
2 pkg/storage/engine.mvccConditionalPutUsingIter at ./pkg/storage/engine/mvcc.go:1281
3 pkg/storage/engine.MVCCConditionalPut at ./pkg/storage/engine/mvcc.go:1249
4 pkg/storage.evalConditionalPut at ./pkg/storage/replica_command.go:334
5 pkg/storage.evaluateCommand at ./pkg/storage/replica_command.go:207
6 pkg/storage.evaluateBatch at ./pkg/storage/replica.go:5233
7 pkg/storage.(*Replica).evaluateTxnWriteBatch at ./pkg/storage/replica.go:5035
8 pkg/storage.(*Replica).evaluateProposalInner at ./pkg/storage/replica.go:4866
9 pkg/storage.(*Replica).evaluateProposal at ./pkg/storage/replica.go:2844
10 pkg/storage.(*Replica).requestToProposal at ./pkg/storage/replica.go:2801
11 pkg/storage.(*Replica).propose at ./pkg/storage/replica.go:2968
12 pkg/storage.(*Replica).tryExecuteWriteBatch at ./pkg/storage/replica.go:2704
13 pkg/storage.(*Replica).executeWriteBatch at ./pkg/storage/replica.go:2558
14 pkg/storage.(*Replica).Send at ./pkg/storage/replica.go:1800
15 pkg/storage.(*Store).Send at ./pkg/storage/store.go:2580
16 pkg/storage.(*Stores).Send at ./pkg/storage/stores.go:215
17 pkg/server.(*Node).batchInternal.func1 at ./pkg/server/node.go:850
18 pkg/util/stop.(*Stopper).RunTaskWithErr at ./pkg/util/stop/stopper.go:246
19 pkg/server.(*Node).batchInternal at ./pkg/server/node.go:837
20 pkg/server.(*Node).Batch at ./pkg/server/node.go:878
21 pkg/kv.(*grpcTransport).send.func1 at ./pkg/kv/transport.go:229
22 pkg/kv.(*grpcTransport).send at ./pkg/kv/transport.go:242
23 pkg/kv.(*grpcTransport).SendNext at ./pkg/kv/transport.go:194
24 pkg/kv.(*DistSender).sendToReplicas at ./pkg/kv/dist_sender.go:1229
25 pkg/kv.(*DistSender).sendRPC at ./pkg/kv/dist_sender.go:383
26 pkg/kv.(*DistSender).sendSingleRange at ./pkg/kv/dist_sender.go:447
27 pkg/kv.(*DistSender).sendPartialBatch at ./pkg/kv/dist_sender.go:1013
28 pkg/kv.(*DistSender).divideAndSendBatchToRanges at ./pkg/kv/dist_sender.go:689
29 pkg/kv.(*DistSender).Send at ./pkg/kv/dist_sender.go:619
30 pkg/kv.(*TxnCoordSender).Send at ./pkg/kv/txn_coord_sender.go:444
31 pkg/internal/client.(*DB).send at ./pkg/internal/client/db.go:551
32 pkg/internal/client.(*Txn).Send at ./pkg/internal/client/txn.go:959
33 pkg/internal/client.(*Txn).Send-fm at ./pkg/internal/client/txn.go:510
34 pkg/internal/client.sendAndFill at ./pkg/internal/client/db.go:457
35 pkg/internal/client.(*Txn).Run at ./pkg/internal/client/txn.go:510
36 pkg/sql.(*tableInserter).finalize at ./pkg/sql/tablewriter.go:125

添加开始事务request

由于insert是写入操作,且是事务中的第一条写入操作,故需要在所有的request之前插入

pkg/internal/client/txn.go Send

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (txn *Txn) Send(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
...
needBeginTxn = !(txn.mu.Proto.Writing || txn.mu.writingTxnRecord) && haveTxnWrite
if len(txn.mu.Proto.Key) == 0 {
txnAnchorKey := txn.mu.txnAnchorKey
if len(txnAnchorKey) == 0 {
txnAnchorKey = ba.Requests[0].GetInner().Header().Key
}
txn.mu.Proto.Key = txnAnchorKey
}
// Set the key in the begin transaction request to the txn's anchor key.
bt := &roachpb.BeginTransactionRequest{
Span: roachpb.Span{
Key: txn.mu.Proto.Key,
},
}
}

生成BeginTransactionRequest, key的取值和第一条write request的key一致, 此行数据即为Transaction record.

TxnCoordSender负责生成事务相关的元数据,并后台执行heartbeat request

pkg/kv/txn_coord_sender.go Send

修改txnMeta.keys

txnMeta.Keys记载修改了哪些key,方便之后commit或者rollback进行处理。

1
2
3
if txnMeta != nil {
txnMeta.keys = et.IntentSpans
}

添加heartbeat request

1
2
3
4
5
6
7
hb := &roachpb.HeartbeatTxnRequest{
Now: tc.clock.Now(),
}
hb.Key = txn.Key
ba.Add(hb)
if pErr = tc.updateState(ctx, startNS, ba, br, pErr); pErr != nil {

tc.updateState会调用tc.heartbeatLoop循环给transaction record发送heartbeat request。
默认心跳间隔为1秒。

DistSender根据不同的key,选择不同的range

pkg/kv/dist_sender.go DistSender

获取不同range描述的堆栈

1
2
3
4
5
6
| > func (ds *DistSender) Send
| | > keys.Range(ba) //根据batch request,生成range span{begin key, end key}
| | > ds.divideAndSendBatchToRanges // 将请求分配到不同的range
| | | > func (ri *RangeIterator) Seek //获取begin keyrange descriptor
| | | | > func (ds *DistSender) getDescriptor
| | | | | > func (rdc *RangeDescriptorCache) LookupRangeDescriptor

发送请求堆栈

1
2
3
4
5
6
| > func (ds *DistSender) divideAndSendBatchToRanges
| | > ds.sendPartialBatch
| | | > func (ds *DistSender) sendSingleRange
| | | | > func (ds *DistSender) sendRPC
| | | | | > func (ds *DistSender) sendToReplicas
| | | | | | > func (gt *grpcTransport) SendNext

GRPC transport

初始化

pkg/kv/transport.go

1
2
3
| > func NewDistSender(cfg DistSenderConfig, g *gossip.Gossip) *DistSender
| | > ds.transportFactory = GRPCTransportFactory
| | | > func grpcTransportFactoryImpl

GRPC服务定义

pkg/roachpb/api.proto

1
2
3
service Internal {
rpc Batch (BatchRequest) returns (BatchResponse) {}
}

服务实现

pkg/server/node.go Batch

1
2
3
4
5
6
7
8
9
10
11
func (n *Node) Batch(
ctx context.Context, args *roachpb.BatchRequest,
) (*roachpb.BatchResponse, error) {
growStack()
// NB: Node.Batch is called directly for "local" calls. We don't want to
// carry the associated log tags forward as doing so makes adding additional
// log tags more expensive and makes local calls differ from remote calls.
ctx = n.storeCfg.AmbientCtx.ResetAndAnnotateCtx(ctx)
br, err := n.batchInternal(ctx, args)

gRPC调用

pkg/kv/transport.go func (gt *grpcTransport) send

  1. 如果range在当前node,则直接使用localserver
1
return localServer.Batch(localCtx, &client.args)
  1. 如果range不在当前node,则调用远程方法
1
reply, err := client.client.Batch(ctx, &client.args)

node, store, range传递Batch Request

pkg/server/node.go
pkg/storage/stores.go
pkg/storage/store.go

调用堆栈

1
2
3
4
5
6
7
| > func (n *Node) Batch
| | > func (n *Node) batchInternal
| | | > func (ls *Stores) Send
| | | | > store, err := ls.GetStore(ba.Replica.StoreID) //获取具体的store
| | | | | > func (s *Store) Send
| | | | | | > repl, err := s.GetReplica(ba.RangeID) //获取具体的range对应的replica
| | | | | | | > func (r *Replica) Send

replica转换batch request为replica command

pkg/storage/replica.go

调用堆栈

1
2
3
4
5
6
7
8
9
10
11
12
| > func (r *Replica) Send
| | > func (r *Replica) executeWriteBatch
| | | > func (r *Replica) tryExecuteWriteBatch
| | | | > func (r *Replica) propose
| | | | | > func (r *Replica) requestToProposal
| | | | | | > func (r *Replica) evaluateProposal
| | | | | | | > func (r *Replica) evaluateProposalInner
| | | | | | | | > func (r *Replica) evaluateTxnWriteBatch
| | | | | | | | | > evaluateBatch(ctx, idKey, batch, rec, &ms, ba)
| | | | | | | | | | > func evaluateCommand
| | | | | | | | | | | > func evalConditionalPut //转换成对应的replia command
| | | | | | | | | | | | > func MVCCConditionalPut //调用底层enginemvcc方法

batch request到replica command对应表

pkg/storage/replica_command.go

1
2
3
4
5
6
7
var commands = map[roachpb.Method]Command{
roachpb.Get: {DeclareKeys: DefaultDeclareKeys, Eval: evalGet},
roachpb.Put: {DeclareKeys: DefaultDeclareKeys, Eval: evalPut},
roachpb.ConditionalPut: {DeclareKeys: DefaultDeclareKeys, Eval: evalConditionalPut},
roachpb.InitPut: {DeclareKeys: DefaultDeclareKeys, Eval: evalInitPut},
...
}

BeginTransactionRequest对应的engine KV操作

pkg/storage/replica_command.go

1
2
3
4
5
6
7
8
9
10
11
func evalBeginTransaction(
ctx context.Context, batch engine.ReadWriter, cArgs CommandArgs, resp roachpb.Response,
) (EvalResult, error) {
...
key := keys.TransactionKey(h.Txn.Key, h.Txn.ID)
clonedTxn := h.Txn.Clone()
reply.Txn = &clonedTxn
...
return EvalResult{}, engine.MVCCPutProto(ctx, batch, cArgs.Stats, key, hlc.Timestamp{}, nil, reply.Txn)
}

key: 包含了TRX ID
value: reply.Txn包含了TxnMeta

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
type TxnMeta struct {
// id is a unique UUID value which identifies the transaction.
// This field is always filled in.
ID github_com_cockroachdb_cockroach_pkg_util_uuid.UUID `protobuf:"bytes,1,opt,name=id,proto3,customtype=github.com/cockroachdb/cockroach/pkg/util/uuid.UUID" json:"id"`
Isolation IsolationType `protobuf:"varint,2,opt,name=isolation,proto3,enum=cockroach.storage.engine.enginepb.IsolationType" json:"isolation,omitempty"`
// key is the key which anchors the transaction. This is typically
// the first key read or written during the transaction and
// determines which range in the cluster will hold the transaction
// record.
Key []byte `protobuf:"bytes,3,opt,name=key,proto3" json:"key,omitempty"`
// Incremented on txn retry.
Epoch uint32 `protobuf:"varint,4,opt,name=epoch,proto3" json:"epoch,omitempty"`
// The proposed timestamp for the transaction. This starts as the
// current wall time on the txn coordinator. This is the timestamp
// at which all of the transaction's writes are performed: even if
// intents have been laid down at different timestamps, the process
// of resolving them (e.g. when the txn commits) will bump them to
// this timestamp. SERIALIZABLE transactions only commit when
// timestamp == orig_timestamp. SNAPSHOT transactions can commit
// even when they've performed their reads (at orig_timestamp) at a
// different timestamp than their writes (at timestamp).
Timestamp cockroach_util_hlc.Timestamp `protobuf:"bytes,5,opt,name=timestamp" json:"timestamp"`
Priority int32 `protobuf:"varint,6,opt,name=priority,proto3" json:"priority,omitempty"`
// A one-indexed sequence number which is increased on each batch
// sent as part of the transaction. Used to prevent replay and
// out-of-order application protection (by means of a transaction
// retry).
Sequence int32 `protobuf:"varint,7,opt,name=sequence,proto3" json:"sequence,omitempty"`
// A zero-indexed sequence number indicating the index of a command
// within a batch. This disambiguate Raft replays of a batch from
// multiple commands in a batch which modify the same key.
BatchIndex int32 `protobuf:"varint,8,opt,name=batch_index,json=batchIndex,proto3" json:"batch_index,omitempty"`
}

Transaciton Record

1
/Local/Range/Table/51/1/2/0/Transaction/"13bc3569-98e7-464e-b9ce-601ce49abe21" : 0x1204080010001800200028003270F868B036030A2A0A1013BC356998E7464EB9CE601CE49ABE211A04BB898A882A0A08F6BED5988EDF91FD143084EE4C3802120773716C2074786E2A0A08BDDF8D97A4E191FD14320A08F6BED5988EDF91FD143A0A08F6BED5988EDF91FD14420E0801120A08F6BED5988EDF91FD144801

key: recordkey+ transaction uuid
value: trx meta

ConditionalPutRequest对应的engine KV操作

pkg/storage/replica_command.go
pkg/storage/engine/engine.go
pkg/storage/engine/rocksdb.go

1
2
3
4
5
6
7
8
| > func evalConditionalPut
| | > func MVCCConditionalPut
| | | > func mvccConditionalPutUsingIter
| | | | > func mvccPutUsingIter
| | | | | > func mvccPutInternal
| | | | | | > engine.Put(versionKey, value) //写入数据kv
| | | | | | | > buf.putMeta(engine, metaKey, newMeta) // 写入intent
}

metaKey:

1
metaKey := MakeMVCCMetadataKey(key)

versionKey:

1
2
versionKey := metaKey
versionKey.Timestamp = timestamp

newMeta:

1
2
3
4
newMeta := &buf.newMeta
if newMeta.Txn != nil {
metaKeySize, metaValSize, err = buf.putMeta(engine, metaKey, newMeta)
if err != nil {
1
2
/Table/51/1/2/0 : 0x0A2C0A1013BC356998E7464EB9CE601CE49ABE211A04BB898A882A0A08F6BED5988EDF91FD143084EE4C38024001120C08F6BED5988EDF91FD1410001800200C2810
/Table/51/1/2/0/1511598659.711885174,0 : 0x84E19AD40A2609626262626262626262

本文标题:crdb insert流程

文章作者:Louis

发布时间:2017年11月27日 - 11:11

最后更新:2017年11月28日 - 15:11

原始链接:/2017/11/27/crdb-insert/

许可协议: Louis-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。