crdb事务冲突处理

举个栗子

1
2
3
4
5
6
7
create table test.t1(c1 int primary key, c2 int);
insert into t1 values(1,1);
txn1: begin;
txn2: begin;
txn1: update t1 set c2=2 where c1=1;
txn2: select * from t1; (hang here)

txn2会hang住,因为txn1的时间戳< txn2,txn2需要等待txn1事务结束。
此处就是出现了Read write intent冲突。

再举个栗子

1
2
3
4
txn1: begin;
txn2: begin;
txn1: update t1 set c2=2 where c1=1;
txn2: update t1 set c2=3 where c1=1; (hang here)

背景知识

Intent VS Value

  • Intent
1
2
/Table/51/1/1/0 : 0x0A2C0A10BD276E6B3F8748E29CA39DACE11EA1E61A04BB8989882A0A08CCF3DD868BDBD4801530DBC53138024001120C08CCF3DD868BDBD4801510001800200C2816
/Table/51/1/1/0/1513582040.406522316,0 : 0x6F4DC47A0A260F616161616161616161616161616161

Intent没有时间戳,value对应的是txn meta.

  • Value
1
/Table/51/1/1/0/1513582040.406522316,0 : 0x6F4DC47A0A260F616161616161616161616161616161

Value包含时间戳,value对应的是真实的记录值。

mvccGetInternal逻辑

1
2
3
4
5
6
7
8
9
10
func mvccGetInternal(
_ context.Context,
iter Iterator,
metaKey MVCCKey,
timestamp hlc.Timestamp,
consistent bool,
allowedSafety valueSafety,
txn *roachpb.Transaction,
buf *getBuffer,
) (*roachpb.Value, []roachpb.Intent, valueSafety, error) {

mvccGetInternal用于根据事务的时间戳和key获取对应的value。

  • timestamp : 表示本事务时间戳,
  • metaTimestamp : 表示数据的时间戳
  • MaxTimestamp : 表示本事务最大时间戳, = timestamp + maxoffset(500ms)
  1. 遇到write intent

    • 属于本身事务的intent, 直接读取intent对应的value.
    • 不属于本事务
      • metaTimestamp <= timestamp: 表示读取到了未完成的old写事务,返回WriteIntentError
      • `timestamp < metaTimestamp <= MaxTimestamp: 数据处于不确定的时间间隔中,存在write intent实际发生时间< timestamp的可能性,返回NewReadWithinUncertaintyIntervalError, 事务需要以更大的时间戳重试
      • timestamp < MaxTimestamp < metaTimestamp: seekKey.Timestamp = txn.MaxTimestamp 读取old value
  2. 遇到value

    • metaTimestamp <= timestamp: 直接读取Value
    • `timestamp < metaTimestamp <= MaxTimestamp: 数据处于不确定的时间间隔中,存在write intent实际发生时间< timestamp的可能性,返回NewReadWithinUncertaintyIntervalError, 事务需要以更大的时间戳重试
    • timestamp < MaxTimestamp < metaTimestamp: seekKey.Timestamp = txn.MaxTimestamp 读取old记录

Txn Record HeartBeat

HeartBeat用于更新txn record的状态和心跳时间戳,
如果事务超过2x heartbeat interval,而且处于Pending状态,
则此事务可以被冲突事务强制Abort。

txn coordinator定期发送heartbeat事件

pkg/kv/txn_coord_sender.go

1
2
3
4
5
6
7
8
func (tc *TxnCoordSender) Send
func (tc *TxnCoordSender) updateState
func (tc *TxnCoordSender) heartbeatLoop
func (tc *TxnCoordSender) heartbeat
hb := &roachpb.HeartbeatTxnRequest{
Now: tc.clock.Now(),
}
DefaultHeartbeatInterval = 1 * time.Second

每隔1秒进行心跳更新txn record.

HeartBeat事件执行逻辑

pkg/storage/batcheval/cmd_heartbeat_txn.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
if ok, err := engine.MVCCGetProto(ctx, batch, key, hlc.Timestamp{}, true, nil, &txn); err != nil {
return result.Result{}, err
} else if !ok {
// If no existing transaction record was found, skip heartbeat.
// This could mean the heartbeat is a delayed relic or it could
// mean that the BeginTransaction call was delayed. In either
// case, there's no reason to persist a new transaction record.
return result.Result{}, errors.Errorf("heartbeat for transaction %s failed; record not present", h.Txn)
}
if txn.Status == roachpb.PENDING {
txn.LastHeartbeat.Forward(args.Now)
if err := engine.MVCCPutProto(ctx, batch, cArgs.Stats, key, hlc.Timestamp{}, nil, &txn); err != nil {
return result.Result{}, err
}
}
  1. 获取txn record
  2. 更新txn.LastHearbeat
  3. 更新txn record

store.Send执行流程

pkg/storage/store.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (s *Store) Send(
...
for {
...
s.maybeWaitForPushee(ctx, &ba, repl); br != nil || pErr != nil
...
br, pErr = repl.Send(ctx, ba)
...
switch pErr.GetDetail().(type) {
case *roachpb.TransactionPushError:
dontRetry := s.cfg.DontRetryPushTxnFailures
if !dontRetry && ba.IsSinglePushTxnRequest() {
pushReq := ba.Requests[0].GetInner().(*roachpb.PushTxnRequest)
dontRetry = txnwait.ShouldPushImmediately(pushReq)
}
if dontRetry {
// If we're not retrying on push txn failures return a txn retry error
// after the first failure to guarantee a retry.
if ba.Txn != nil {
err := roachpb.NewTransactionRetryError(roachpb.RETRY_REASON_UNKNOWN)
return nil, roachpb.NewErrorWithTxn(err, ba.Txn)
}
return nil, pErr
}
pErr = nil // retry command
  1. maybeWaitForPushee: PushTxnRequest retry后可能会等在这个地方
  2. repl.Send: 调用底层replica,处理request
  3. 错误处理: TransactionPushErrorWriteIntentError.

WriteIntentError处理流程

pkg/storage/store.go

概念:

  1. pushee: 被push的事务

the pushee txn whose intent(s) caused the // conflict

  1. pusher: 主动提出push txn的事务

pusher the writer trying to abort a conflicting txn or the
// reader trying to push a conflicting txn’s commit timestamp
// forward

1
2
3
4
5
if ba.IsWrite() {
pushType = roachpb.PUSH_ABORT
} else {
pushType = roachpb.PUSH_TIMESTAMP
}

根据操作类型,确定冲突处理手段:

  1. 写操作: roachpb.PUSH_ABORT 尝试abort pushee
  2. 读操作: roachpb.PUSH_TIMESTAMP, 尝试将pushee的timestamp提升, 从而消除WriteIntentError,可以直接读取old value了。

生成PushTxnRequest

pkg/storage/intent_resolver.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (ir *intentResolver) maybePushTransactions(
...
for _, pushTxn := range pushTxns {
pushReqs = append(pushReqs, &roachpb.PushTxnRequest{
Span: roachpb.Span{
Key: pushTxn.Key,
},
PusherTxn: *partialPusherTxn,
PusheeTxn: pushTxn,
PushTo: h.Timestamp,
// The timestamp is used by PushTxn for figuring out whether the
// transaction is abandoned. If we used the argument's timestamp
// here, we would run into busy loops because that timestamp
// usually stays fixed among retries, so it will never realize
// that a transaction has timed out. See #877.
Now: now,
PushType: pushType,
})
}
b := &client.Batch{}
b.AddRawRequest(pushReqs...)
var pErr *roachpb.Error
if err := ir.store.db.Run(ctx, b); err != nil {
pErr = b.MustPErr()
}

PushTxnRequest执行逻辑

pkg/storage/batcheval/cmd_push_txn.go

1
2
3
func PushTxn(
ctx context.Context, batch engine.ReadWriter, cArgs CommandArgs, resp roachpb.Response,
) (result.Result, error) {
  1. 获取txn record
1
2
existTxn := &roachpb.Transaction{}
ok, err := engine.MVCCGetProto(ctx, batch, key, hlc.Timestamp{},
  1. 如果Status已经不是pending状态了,返回成功
1
2
3
if reply.PusheeTxn.Status != roachpb.PENDING {
// Trivial noop.
return result.Result{}, nil
  1. 如果roachpb.PUSH_TIMESTAMP & args.PushTo.Less(reply.PusheeTxn.Timestamp), 直接返回成功
1
2
3
4
if args.PushType == roachpb.PUSH_TIMESTAMP && args.PushTo.Less(reply.PusheeTxn.Timestamp) {
// Trivial noop.
return result.Result{}, nil
}
  1. pusherWins逻辑判断

    1. pushee长时间没有心跳了,pusherWins=true, abort pushee
1
2
3
4
5
6
case txnwait.IsExpired(args.Now, &reply.PusheeTxn):
reason = "pushee is expired"
// When cleaning up, actually clean up (as opposed to simply pushing
// the garbage in the path of future writers).
args.PushType = roachpb.PUSH_ABORT
pusherWins = true
2. `roachpb.PUSH_TIMESTAMP`且pushee隔离级别为SNAPSHOT, pusherWins = true
1
2
3
4
5
case args.PushType == roachpb.PUSH_TIMESTAMP &&
reply.PusheeTxn.Isolation == enginepb.SNAPSHOT:
// Can always push a SNAPSHOT txn's timestamp.
reason = "pushee is SNAPSHOT"
pusherWins = true
3. pusher Priority 高于 pushee
  1. pushWins=false,返回NewTransactionPushError
  2. 根据结果修改txn record
1
if err := engine.MVCCPutProto(ctx, batch, cArgs.Stats, key, hlc.Timestamp{}, nil, &reply.PusheeTxn); err != nil {

TransactionPushError处理流程

txnwait.Queue结构

pkg/storage/txnwait/queue.go

1
2
3
4
5
6
7
8
9
10
11
12
type Queue struct {
store StoreInterface
mu struct {
syncutil.Mutex
txns map[uuid.UUID]*pendingTxn
queries map[uuid.UUID][]*waitingQuery
}
}
type pendingTxn struct {
txn atomic.Value // the most recent txn record
waitingPushes []*waitingPush

txns存储出现TransactionPushError的pushee txn.

Enqueue函数入口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (q *Queue) Enqueue(txn *roachpb.Transaction) {
q.mu.Lock()
defer q.mu.Unlock()
if q.mu.txns == nil {
// Not enabled; do nothing.
return
}
// If the txn which failed to push is already pending, update the
// transaction status.
if pt, ok := q.mu.txns[txn.ID]; ok {
pt.txn.Store(txn)
} else {
pt = &pendingTxn{}
pt.txn.Store(txn)
q.mu.txns[txn.ID] = pt
}
}

TransactionPushError pushee txn Enqueue

PushTxnRequest执行失败后,出现TransactionPushError, 会将pushee txn加入到txnwait.Queue.

pkg/storage/replica_proposal.go

1
2
3
4
5
6
7
func (r *Replica) handleLocalEvalResult(ctx context.Context, lResult result.LocalResult) {
// Enqueue failed push transactions on the txnWaitQueue.
if !r.store.cfg.DontRetryPushTxnFailures {
if tpErr, ok := lResult.Err.GetDetail().(*roachpb.TransactionPushError); ok {
r.txnWaitQueue.Enqueue(&tpErr.PusheeTxn)
}
}

重试PushTxnRequest

出现TransactionPushError之后,除了按照上面将txn放入到Queue中,还会retry PushTxnRequest.

根据store.Send执行流程描述,出现错误后,会进行重试,出现TransactionPushError后也会进行重试.

maybeWaitForPushee会调用MaybeWaitForPush函数, 逻辑如下:

  1. 获取Queue.mu.txns中对应的pendingTxn, 记为pending.
  2. 如果没有pending,返回成功
  3. 将当前的pushtxnrequest记录到pending.waitingPushes, 同时定义一个channel
  4. 等待消息
    • <-ctx.Done(): pusher自动放弃
    • txn := <-push.pending: pushee txn结束,可能commit或者abort了,返回成功
    • <-pusheeTxnTimer.C: 定时器, pushee txn expire时间触发,更新pushee txn的信息

主要就是等待push.pending管道同步事务的最新消息。

pushee txn更新事务信息

pkg/storage/txnwait/queue.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (q *Queue) UpdateTxn(ctx context.Context, txn *roachpb.Transaction) {
txn.AssertInitialized(ctx)
q.mu.Lock()
if log.V(1) {
if count := len(q.mu.queries[txn.ID]); count > 0 {
log.Infof(ctx, "returning %d waiting queries for %s", count, txn.ID.Short())
}
}
for _, w := range q.mu.queries[txn.ID] {
close(w.pending)
}
delete(q.mu.queries, txn.ID)
if q.mu.txns == nil {
// Not enabled; do nothing.
q.mu.Unlock()
return
}
pending, ok := q.mu.txns[txn.ID]
if !ok {
q.mu.Unlock()
return
}
waitingPushes := pending.waitingPushes
pending.waitingPushes = nil
delete(q.mu.txns, txn.ID)
pending.txn.Store(txn)
q.mu.Unlock()
if log.V(1) && len(waitingPushes) > 0 {
log.Infof(context.Background(), "updating %d push waiters for %s", len(waitingPushes), txn.ID.Short())
}
// Send on pending waiter channels outside of the mutex lock.
for _, w := range waitingPushes {
w.pending <- txn
}
}

获取pushee txn的事务提交消息

pkg/storage/txnwait/queue.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
case txn := <-push.pending:
log.Eventf(ctx, "result of pending push: %v", txn)
log.Infof(ctx, "[crdb debug] result of pending push: %v", txn)
// If txn is nil, the queue was cleared, presumably because the
// replica lost the range lease. Return not pushed so request
// proceeds and is redirected to the new range lease holder.
if txn == nil {
return nil, nil
}
// Transaction was committed, aborted or had its timestamp
// pushed. If this PushTxn request is satisfied, return
// successful PushTxn response.
if isPushed(req, txn) {
log.Event(ctx, "push request is satisfied")
return createPushTxnResponse(txn), nil
}
// If not successfully pushed, return not pushed so request proceeds.
log.Event(ctx, "not pushed; returning to caller")
return nil, nil

isPushed中会发现pushee txn已经commit了,此时直接返回PushTxnResponse,不再实际进行PushTxnRequest.

ResolveIntentRequest处理流程

PushTxnRequest处理完成后,需要进行ResolveIntentRequest, 解决WriteIntent。

pkg/storage/intent_resolver.go

1
2
3
func (ir *intentResolver) resolveIntents(
ctx context.Context, intents []roachpb.Intent, opts ResolveOptions,
) error {

pkg/storage/engine/mvcc.go

1
2
3
4
5
6
7
8
func mvccResolveWriteIntent(
ctx context.Context,
engine ReadWriter,
iter Iterator,
ms *enginepb.MVCCStats,
intent roachpb.Intent,
buf *putBuffer,
) error {

本文标题:crdb事务冲突处理

文章作者:Louis

发布时间:2017年12月18日 - 14:12

最后更新:2017年12月21日 - 11:12

原始链接:/2017/12/18/crdb-txn-txnwait/

许可协议: Louis-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。