举个栗子
|
|
txn2会hang住,因为txn1的时间戳< txn2,txn2需要等待txn1事务结束。
此处就是出现了Read write intent
冲突。
再举个栗子
|
|
背景知识
Intent VS Value
- Intent
|
|
Intent没有时间戳,value对应的是txn meta.
- Value
|
|
Value包含时间戳,value对应的是真实的记录值。
mvccGetInternal逻辑
|
|
mvccGetInternal用于根据事务的时间戳和key获取对应的value。
timestamp
: 表示本事务时间戳,metaTimestamp
: 表示数据的时间戳MaxTimestamp
: 表示本事务最大时间戳, =timestamp + maxoffset(500ms)
-
遇到write intent
- 属于本身事务的intent, 直接读取intent对应的value.
- 不属于本事务
metaTimestamp <= timestamp
: 表示读取到了未完成的old写事务,返回WriteIntentError- `timestamp < metaTimestamp <= MaxTimestamp: 数据处于不确定的时间间隔中,存在write intent实际发生时间< timestamp的可能性,返回NewReadWithinUncertaintyIntervalError, 事务需要以更大的时间戳重试
timestamp < MaxTimestamp < metaTimestamp
:seekKey.Timestamp = txn.MaxTimestamp
读取old value
-
遇到value
metaTimestamp <= timestamp
: 直接读取Value- `timestamp < metaTimestamp <= MaxTimestamp: 数据处于不确定的时间间隔中,存在write intent实际发生时间< timestamp的可能性,返回NewReadWithinUncertaintyIntervalError, 事务需要以更大的时间戳重试
timestamp < MaxTimestamp < metaTimestamp
:seekKey.Timestamp = txn.MaxTimestamp
读取old记录
Txn Record HeartBeat
HeartBeat用于更新txn record的状态和心跳时间戳,
如果事务超过2x heartbeat interval
,而且处于Pending状态,
则此事务可以被冲突事务强制Abort。
txn coordinator定期发送heartbeat事件
pkg/kv/txn_coord_sender.go
|
|
每隔1秒进行心跳更新txn record.
HeartBeat事件执行逻辑
pkg/storage/batcheval/cmd_heartbeat_txn.go
|
|
- 获取txn record
- 更新txn.LastHearbeat
- 更新txn record
store.Send执行流程
pkg/storage/store.go
|
|
maybeWaitForPushee
: PushTxnRequest retry后可能会等在这个地方repl.Send
: 调用底层replica,处理request- 错误处理:
TransactionPushError
和WriteIntentError
.
WriteIntentError处理流程
pkg/storage/store.go
概念:
pushee
: 被push的事务
the pushee txn whose intent(s) caused the // conflict
pusher
: 主动提出push txn的事务
pusher the writer trying to abort a conflicting txn or the
// reader trying to push a conflicting txn’s commit timestamp
// forward
|
|
根据操作类型,确定冲突处理手段:
- 写操作:
roachpb.PUSH_ABORT
尝试abort pushee - 读操作:
roachpb.PUSH_TIMESTAMP
, 尝试将pushee的timestamp提升, 从而消除WriteIntentError,可以直接读取old value了。
生成PushTxnRequest
pkg/storage/intent_resolver.go
|
|
PushTxnRequest执行逻辑
pkg/storage/batcheval/cmd_push_txn.go
|
|
- 获取txn record
|
|
- 如果Status已经不是pending状态了,返回成功
|
|
- 如果
roachpb.PUSH_TIMESTAMP
&args.PushTo.Less(reply.PusheeTxn.Timestamp)
, 直接返回成功
|
|
-
pusherWins逻辑判断
- pushee长时间没有心跳了,pusherWins=true, abort pushee
|
|
2. `roachpb.PUSH_TIMESTAMP`且pushee隔离级别为SNAPSHOT, pusherWins = true
|
|
3. pusher Priority 高于 pushee
- pushWins=false,返回NewTransactionPushError
- 根据结果修改
txn record
|
|
TransactionPushError处理流程
txnwait.Queue结构
pkg/storage/txnwait/queue.go
|
|
txns存储出现TransactionPushError的pushee txn.
Enqueue函数入口:
|
|
TransactionPushError pushee txn Enqueue
PushTxnRequest执行失败后,出现TransactionPushError
, 会将pushee txn加入到txnwait.Queue.
pkg/storage/replica_proposal.go
|
|
重试PushTxnRequest
出现TransactionPushError之后,除了按照上面将txn放入到Queue中,还会retry PushTxnRequest.
根据store.Send执行流程
描述,出现错误后,会进行重试,出现TransactionPushError
后也会进行重试.
maybeWaitForPushee
会调用MaybeWaitForPush
函数, 逻辑如下:
- 获取Queue.mu.txns中对应的pendingTxn, 记为pending.
- 如果没有pending,返回成功
- 将当前的pushtxnrequest记录到pending.waitingPushes, 同时定义一个channel
- 等待消息
<-ctx.Done()
: pusher自动放弃txn := <-push.pending
: pushee txn结束,可能commit或者abort了,返回成功<-pusheeTxnTimer.C
: 定时器, pushee txn expire时间触发,更新pushee txn的信息
主要就是等待push.pending管道同步事务的最新消息。
pushee txn更新事务信息
pkg/storage/txnwait/queue.go
|
|
获取pushee txn的事务提交消息
pkg/storage/txnwait/queue.go
|
|
isPushed
中会发现pushee txn已经commit了,此时直接返回PushTxnResponse,不再实际进行PushTxnRequest.
ResolveIntentRequest处理流程
PushTxnRequest处理完成后,需要进行ResolveIntentRequest, 解决WriteIntent。
pkg/storage/intent_resolver.go
|
|
pkg/storage/engine/mvcc.go
|
|