crdb事务处理流程

session.txnState初始化

如果当前session不在事务中,则reset session的txnState

pkg/sql/executor.go

1
2
3
4
5
6
7
txnState.resetForNewSQLTxn(
e, session,
autoCommit, /* implicitTxn */
false, /* retryIntent */
e.cfg.Clock.PhysicalTime(), /* sqlTimestamp */
session.DefaultIsolationLevel,
roachpb.NormalUserPriority,

begin设置session事务状态

pkg/sql/txn.go

1
2
3
4
5
6
7
8
9
10
11
func (p *planner) BeginTransaction(n *parser.BeginTransaction) (planNode, error) {
if p.session.TxnState.State() != AutoRetry || p.txn == nil {
return nil, errors.Errorf("the server should have already created a transaction. "+
"state: %s", p.session.TxnState.State())
}
if err := p.setTransactionModes(n.Modes); err != nil {
return nil, err
}
return &zeroNode{}, nil
}

生成internal txn

pkg/sql/session.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (ts *txnState) resetForNewSQLTxn(
ts.retryIntent = retryIntent
// Reset state vars to defaults.
ts.commitSeen = false
ts.sqlTimestamp = sqlTimestamp
ts.implicitTxn = implicitTxn
ts.txnResults = s.ResultsWriter.NewResultsGroup()
//设置状态为AutoRetry
ts.SetState(AutoRetry)
// 生成internal txn
ts.mu.txn = client.NewTxn(e.cfg.DB, e.cfg.NodeID.Get())

internal txn时间戳——txn begin timestamp

pkg/internal/client/txn.go

1
2
3
4
5
6
7
8
9
10
func NewTxn(db *DB, gatewayNodeID roachpb.NodeID) *Txn {
return NewTxnWithProto(db, gatewayNodeID, roachpb.MakeTransaction(
"unnamed",
nil, // baseKey
roachpb.NormalUserPriority,
enginepb.SERIALIZABLE,
db.clock.Now(),
db.clock.MaxOffset().Nanoseconds(),
))
}

使用HLC clock.Now获取当前时间戳。

Insert KV写到client.Batch

初始化client.Batch

pkg/sql/tablewriter.go

1
2
3
4
5
6
7
8
9
10
11
func (ti *tableInserter) init(txn *client.Txn) error {
ti.txn = txn
ti.b = txn.NewBatch()
return nil
}
func (ti *tableInserter) row(
ctx context.Context, values tree.Datums, traceKV bool,
) (tree.Datums, error) {
return nil, ti.ri.InsertRow(ctx, ti.b, values, false, traceKV)
}

tableInsert初始化client.Batch, 同时调用InsertRow插入KV数据。

调用client.Batch的Cput方法

pkg/sql/sqlbase/rowwriter.go

1
2
3
4
5
6
7
8
9
10
func insertCPutFn(
ctx context.Context, b putter, key *roachpb.Key, value *roachpb.Value, traceKV bool,
) {
// TODO(dan): We want do this V(2) log everywhere in sql. Consider making a
// client.Batch wrapper instead of inlining it everywhere.
if traceKV {
log.VEventfDepth(ctx, 1, 2, "CPut %s -> %s", *key, value.PrettyPrint())
}
b.CPut(key, value, nil /* expValue */)
}

pkg/internal/client/batch.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (b *Batch) CPut(key, value, expValue interface{}) {
k, err := marshalKey(key)
if err != nil {
b.initResult(0, 1, notRaw, err)
return
}
v, err := marshalValue(value)
if err != nil {
b.initResult(0, 1, notRaw, err)
return
}
ev, err := marshalValue(expValue)
if err != nil {
b.initResult(0, 1, notRaw, err)
return
}
b.appendReqs(roachpb.NewConditionalPut(k, v, ev))
b.initResult(1, 1, notRaw, nil)
}

client.Batch传递给roachpb.BatchRequest

roachpb.BatchRequest初始化

pkg/internal/client/db.go

1
2
3
4
5
func sendAndFill(ctx context.Context, send SenderFunc, b *Batch) error {
var ba roachpb.BatchRequest
ba.Requests = b.reqs
ba.Header = b.Header
b.response, b.pErr = send(ctx, ba)

这里将client.Batch中的请求直接赋给了ba.Requests.

1
ba.Requests = b.reqs

roachpb.BatchRequest Header初始化

pkg/internal/client/txn.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (txn *Txn) Send(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
// It doesn't make sense to use inconsistent reads in a transaction. However,
// we still need to accept it as a parameter for this to compile.
if ba.ReadConsistency != roachpb.CONSISTENT {
return nil, roachpb.NewErrorf("cannot use %s ReadConsistency in txn",
ba.ReadConsistency)
}
// Fill in the GatewayNodeID on the batch if the txn knows it.
// NOTE(andrei): It seems a bit ugly that we're filling in the batches here as
// opposed to the point where the requests are being created, but
// unfortunately requests are being created in many ways and this was the best
// place I found to set this field.
if txn.gatewayNodeID != 0 {
ba.Header.GatewayNodeID = txn.gatewayNodeID
}

roachpb.BatchRequest添加roachpb.BeginTransactionRequest

pkg/internal/client/txn.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
needBeginTxn = !(txn.mu.Proto.Writing || txn.mu.writingTxnRecord) && haveTxnWrite
needEndTxn := txn.mu.Proto.Writing || txn.mu.writingTxnRecord || haveTxnWrite
elideEndTxn = haveEndTxn && !needEndTxn
// If we're not yet writing in this txn, but intend to, insert a
// begin transaction request before the first write command and update
// transaction state accordingly.
if needBeginTxn {
// Set txn key based on the key of the first transactional write if
// not already set. If the transaction already has a key (we're in a
// restart), make sure we keep the anchor key the same.
if len(txn.mu.Proto.Key) == 0 {
txnAnchorKey := txn.mu.txnAnchorKey
if len(txnAnchorKey) == 0 {
txnAnchorKey = ba.Requests[0].GetInner().Header().Key
}
txn.mu.Proto.Key = txnAnchorKey
}
// Set the key in the begin transaction request to the txn's anchor key.
bt := &roachpb.BeginTransactionRequest{
Span: roachpb.Span{
Key: txn.mu.Proto.Key,
},
}
// Inject the new request before position firstWriteIdx, taking
// care to avoid unnecessary allocations.
oldRequests := ba.Requests
ba.Requests = make([]roachpb.RequestUnion, len(ba.Requests)+1)
copy(ba.Requests, oldRequests[:firstWriteIdx])
ba.Requests[firstWriteIdx].MustSetInner(bt)
copy(ba.Requests[firstWriteIdx+1:], oldRequests[firstWriteIdx:])
// We're going to be writing the transaction record by sending the
// begin transaction request.
txn.mu.writingTxnRecord = true
}

txn.mu.Proto.Key使用client.Batch里面第一个请求的key作为txn的key,
新生成一个rochpb.BeginTransactionRequest作为transaction record.

找到对应replica

请参考中的DistSender根据不同的key,选择不同的range

roachpb.BatchRequest转化为Raft Command

单个replica接收的请求不能直接执行,直接执行可能出现多个副本数据不一致的情况。
必须走Raft Group去执行。这里需要把roachpb.BatchRequest请求转化为Raft Command.

初始化rocksdbBatch

rocksdbBatch是rocksdb提供的batch操作方式,用于进行批量操作,可以看成一个事务集合内的操作。
rocksdbBatch主要用于将request转化为command,转化之后,会将rocksdbBatch close, 而不是commit。

pkg/storage/replica.go

1
2
3
func (r *Replica) evaluateTxnWriteBatch(
...
batch := r.store.Engine().NewBatch()

pkg/storage/engine/rocksdb.go

1
2
3
4
5
6
7
8
9
10
11
12
func (r *RocksDB) NewBatch() Batch {
return newRocksDBBatch(r, false /* writeOnly */)
}
func newRocksDBBatch(parent *RocksDB, writeOnly bool) *rocksDBBatch {
r := &rocksDBBatch{
parent: parent,
writeOnly: writeOnly,
}
r.distinct.rocksDBBatch = r
return r
}

每个request对应的eval函数

pkg/storage/replica.go

1
2
3
4
5
6
7
8
9
10
11
12
func evaluateBatch(
...
for index, union := range ba.Requests {
// Execute the command.
args := union.GetInner()
if ba.Txn != nil {
ba.Txn.BatchIndex = int32(index)
}
// Note that responses are populated even when an error is returned.
// TODO(tschottdorf): Change that. IIRC there is nontrivial use of it currently.
reply := br.Responses[index].GetInner()
curResult, pErr := evaluateCommand(ctx, idKey, index, batch, rec, ms, ba.Header, maxKeys, args, reply)

pkg/storage/replica_command.go

1
2
3
4
5
6
7
8
9
if cmd, ok := batcheval.LookupCommand(args.Method()); ok {
cArgs := batcheval.CommandArgs{
EvalCtx: rec,
Header: h,
Args: args.ShallowCopy(),
MaxKeys: maxKeys,
Stats: ms,
}
pd, err = cmd.Eval(ctx, batch, cArgs, reply)

根据request的方法类型,找到对应的cmd,并进行Eval方法调用。

cmds初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
var cmds = make(map[roachpb.Method]Command)
func RegisterCommand(
method roachpb.Method,
declare func(roachpb.RangeDescriptor, roachpb.Header, roachpb.Request, *spanset.SpanSet),
impl func(context.Context, engine.ReadWriter, CommandArgs, roachpb.Response) (result.Result, error),
) {
if _, ok := cmds[method]; ok {
log.Fatalf(context.TODO(), "cannot overwrite previously registered method %v", method)
}
cmds[method] = Command{
DeclareKeys: declare,
Eval: impl,
}
}

BeginTransaction注册到cmds

pkg/storage/batcheval/cmd_begin_transaction.go

1
2
3
func init() {
RegisterCommand(roachpb.BeginTransaction, declareKeysBeginTransaction, BeginTransaction)
}

ConditionPut注册到cmds

pkg/storage/batcheval/cmd_conditional_put.go

1
2
3
func init() {
RegisterCommand(roachpb.ConditionalPut, DefaultDeclareKeys, ConditionalPut)
}

BeginTransaction Eval方法

1
2
3
4
5
6
7
8
9
10
11
12
13
func BeginTransaction(
ctx context.Context, batch engine.ReadWriter, cArgs CommandArgs, resp roachpb.Response,
) (result.Result, error) {
// Verify transaction does not already exist.
tmpTxn := roachpb.Transaction{}
ok, err := engine.MVCCGetProto(ctx, batch, key, hlc.Timestamp{}, true, nil, &tmpTxn)
if err != nil {
return result.Result{}, err
}
// Write the txn record.
reply.Txn.Writing = true
return result.Result{}, engine.MVCCPutProto(ctx, batch, cArgs.Stats, key, hlc.Timestamp{}, nil, reply.Txn)
}

转化为raft command

pkg/storage/replica.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (r *Replica) requestToProposal(
proposal := &ProposalData{
ctx: ctx,
idKey: idKey,
endCmds: endCmds,
doneCh: make(chan proposalResult, 1),
Request: &ba,
}
result, pErr = r.evaluateProposal(ctx, idKey, ba, spans)
// Fill out the results even if pErr != nil; we'll return the error below.
proposal.Local = &result.Local
proposal.command = storagebase.RaftCommand{
ReplicatedEvalResult: result.Replicated,
WriteBatch: result.WriteBatch,
}

proposal会有一个doneCh,用于等待raft group应用完成,其中result就是eval之后的raftcommand。

proposal与raft group

proposal提交给raft group

pkg/storage/replica.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (r *Replica) propose(
...
r.insertProposalLocked(proposal, repDesc, lease)
if err := r.submitProposalLocked(proposal); err != nil {
func defaultSubmitProposalLocked(r *Replica, p *ProposalData) error {
data, err := protoutil.Marshal(&p.command)
return r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) {
encode := encodeRaftCommandV1
if p.command.ReplicatedEvalResult.AddSSTable != nil {
if p.command.ReplicatedEvalResult.AddSSTable.Data == nil {
return false, errors.New("cannot sideload empty SSTable")
}
encode = encodeRaftCommandV2
r.store.metrics.AddSSTableProposals.Inc(1)
log.Event(p.ctx, "sideloadable proposal detected")
}
if log.V(4) {
log.Infof(p.ctx, "proposing command %x: %s", p.idKey, p.Request.Summary())
}
// We're proposing a command so there is no need to wake the leader if we
// were quiesced.
r.unquiesceLocked()
return false /* unquiesceAndWakeLeader */, raftGroup.Propose(encode(p.idKey, data))
})
}

insertProposalLocked将proposal设置到replica的proposal map中,以备后面使用。
最后调用raftGroup.Propose(encode(p.idKey, data)给raft group提交propose。

执行线程挂起

1
2
3
4
func (r *Replica) tryExecuteWriteBatch(
...
ch, tryAbandon, undoQuotaAcquisition, pErr := r.propose(ctx, lease, ba, endCmds, spans)
case propResult := <-ch:

执行线程等待proposal.doneCh.

raft group Ready,执行Raft Command

pkg/storage/store.go

1
2
3
4
5
6
7
8
9
func (s *Store) processReady(ctx context.Context, rangeID roachpb.RangeID) {
value, ok := s.mu.replicas.Load(int64(rangeID))
if !ok {
return
}
start := timeutil.Now()
r := (*Replica)(value)
stats, expl, err := r.handleRaftReady(noSnap)

pkg/storage/replica.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (r *Replica) handleRaftReadyRaftMuLocked(
...
err := r.withRaftGroupLocked(false, func(raftGroup *raft.RawNode) (bool, error) {
if hasReady = raftGroup.HasReady(); hasReady {
rd = raftGroup.Ready() //获取ready的命令
}
return hasReady /* unquiesceAndWakeLeader */, nil
})
for _, e := range rd.CommittedEntries {
switch e.Type {
case raftpb.EntryNormal:
if changedRepl := r.processRaftCommand(ctx, commandID, e.Term, e.Index, command); changedRepl {

processRaftCommand执行command, 唤醒client wait doneChannel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (r *Replica) processRaftCommand(
ctx context.Context,
idKey storagebase.CmdIDKey,
term, raftIndex uint64,
raftCmd storagebase.RaftCommand,
) bool {
proposal, proposedLocally := r.mu.proposals[idKey]
delta, err = r.applyRaftCommand(
ctx, idKey, raftCmd.ReplicatedEvalResult, raftIndex, leaseIndex, writeBatch)
raftCmd.ReplicatedEvalResult.Delta = delta.ToNetworkStats()
if proposedLocally {
proposal.finishRaftApplication(response)

proposal.finishRaftApplication向doneCh中写入数据,唤醒客户线程的channel等待.

applyRaftCommand写入rocksdb,并commit.

applyRaftCommand真正向rocksdb写入数据

1
2
3
4
5
6
7
func (r *Replica) applyRaftCommand(
batch := r.store.Engine().NewWriteOnlyBatch()
if writeBatch != nil {
if err := batch.ApplyBatchRepr(writeBatch.Data, false); err != nil {
if err := batch.Commit(false); err != nil {

此处Commit真正提交数据.

本文标题:crdb事务处理流程

文章作者:Louis

发布时间:2017年12月14日 - 14:12

最后更新:2017年12月15日 - 16:12

原始链接:/2017/12/14/crdb-trx/

许可协议: Louis-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。