Raft library使用

Raft library介绍

Raft library是etcd中的raft实现,只实现了Raft算法,使用者需要自己提供transportation层用于节点间通信,storage层用于持久化Raft log和state.

Raft library实现如下特性:

  • Leader election
  • Log replication
  • Log compaction
  • Membership changes
  • Leadership transfer extension

使用Raft library的项目:

  • etcd
  • tikv
  • cockroachdb
  • dgraph

使用方法

启动三个节点的集群

1
2
3
4
5
6
7
8
9
10
11
12
storage := raft.NewMemoryStorage()
c := &Config{
ID: 0x01,
ElectionTick: 10,
HeartbeatTick: 1,
Storage: storage,
MaxSizePerMsg: 4096,
MaxInflightMsgs: 256,
}
// Set peer list to the other nodes in the cluster.
// Note that they need to be started separately as well.
n := raft.StartNode(c, []raft.Peer{{ID: 0x02}, {ID: 0x03}})

启动单节点

1
2
peers := []raft.Peer{{ID: 0x01}}
n := raft.StartNode(c, peers)

重启节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
storage := raft.NewMemoryStorage()
// Recover the in-memory storage from persistent snapshot, state and entries.
storage.ApplySnapshot(snapshot)
storage.SetHardState(state)
storage.Append(entries)
c := &Config{
ID: 0x01,
ElectionTick: 10,
HeartbeatTick: 1,
Storage: storage,
MaxSizePerMsg: 4096,
MaxInflightMsgs: 256,
}
// Restart raft without peer information.
// Peer information is already included in the storage.
n := raft.RestartNode(c)

Propose新的请求

1
n.Propose(ctx, data)

请求处理逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
for {
select {
case <-s.Ticker:
n.Tick()
case rd := <-s.Node.Ready():
saveToStorage(rd.State, rd.Entries, rd.Snapshot)
send(rd.Messages)
if !raft.IsEmptySnap(rd.Snapshot) {
processSnapshot(rd.Snapshot)
}
for _, entry := range rd.CommittedEntries {
process(entry)
if entry.Type == raftpb.EntryConfChange {
var cc raftpb.ConfChange
cc.Unmarshal(entry.Data)
s.Node.ApplyConfChange(cc)
}
}
s.Node.Advance()
case <-s.done:
return
}
}
  1. 等待s.Node.Ready channel新的请求
  2. append日志
  3. 发送给其它节点
  4. apply日志
  5. 调用s.Node.Advance继续接收新的请求

定期循环调用Node.Tick(),用于发送心跳消息和选举消息.

消息处理

1
2
3
func recvRaftRPC(ctx context.Context, m raftpb.Message) {
n.Step(ctx, m)
}

接收消息后,调用Step方法触发Ready处理逻辑。

Raft Example

分为三个部分

  1. 对外展示的kv存储层
  2. 用于raft library的存储层
  3. raft library

kv storage

kv层通过RESTFul接口可以进行PUT/GET操作。

raftexample/httpapi.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
key := r.RequestURI
switch {
case r.Method == "PUT":
v, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Printf("Failed to read on PUT (%v)\n", err)
http.Error(w, "Failed on PUT", http.StatusBadRequest)
return
}
log.Printf("PUT (%v):(%v)\n", key, string(v))
h.store.Propose(key, string(v))
// Optimistic-- no waiting for ack from raft. Value is not yet
// committed so a subsequent GET on the key may return old value
w.WriteHeader(http.StatusNoContent)
case r.Method == "GET":
log.Printf("GET (%v)\n", key)
if v, ok := h.store.Lookup(key); ok {
w.Write([]byte(v))
} else {
http.Error(w, "Failed to GET", http.StatusNotFound)
}

PUT通过Propose方法,进行raft请求。

kvstore.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (s *kvstore) readCommits(commitC <-chan *string, errorC <-chan error) {
for data := range commitC {
if data == nil {
// done replaying log; new data incoming
// OR signaled to load snapshot
snapshot, err := s.snapshotter.Load()
if err == snap.ErrNoSnapshot {
return
}
if err != nil && err != snap.ErrNoSnapshot {
log.Panic(err)
}
log.Printf("loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index)
if err := s.recoverFromSnapshot(snapshot.Data); err != nil {
log.Panic(err)
}
continue
}
var dataKv kv
dec := gob.NewDecoder(bytes.NewBufferString(*data))
if err := dec.Decode(&dataKv); err != nil {
log.Fatalf("raftexample: could not decode message (%v)", err)
}
s.mu.Lock()
s.kvStore[dataKv.Key] = dataKv.Val
s.mu.Unlock()
log.Printf("add kv (%v):(%v)", dataKv.Key, dataKv.Val)
}
if err, ok := <-errorC; ok {
log.Fatal(err)
}
}

应用raft log,写到kv store里面。对于空信号,表示初始化,需要从snapshot中装载数据

Transport

用于raft节点间通信

raftexample/raft.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
rc.transport = &rafthttp.Transport{
ID: types.ID(rc.id),
ClusterID: 0x1000,
Raft: rc,
ServerStats: ss,
LeaderStats: stats.NewLeaderStats(strconv.Itoa(rc.id)),
ErrorC: make(chan error),
}
rc.transport.Start()
for i := range rc.peers {
if i+1 != rc.id {
rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]})
}
}

以及接收消息后的处理逻辑:

coreos/etcd/rafthttp/peer.go

1
2
3
4
5
6
7
8
9
10
11
12
go func() {
for {
select {
case mm := <-p.recvc:
if err := r.Process(ctx, mm); err != nil {
plog.Warningf("failed to process raft message (%v)", err)
}
case <-p.stopc:
return
}
}
}()

raftexample/raft.go

1
2
3
func (rc *raftNode) Process(ctx context.Context, m raftpb.Message) error {
return rc.node.Step(ctx, m)
}

raft storage

raft storage层借助snapwalMemoryStorage,完成日志,快照以及内存存储。

Replay WAL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (rc *raftNode) replayWAL() *wal.WAL {
log.Printf("replaying WAL of member %d", rc.id)
snapshot := rc.loadSnapshot()
w := rc.openWAL(snapshot)
_, st, ents, err := w.ReadAll()
if err != nil {
log.Fatalf("raftexample: failed to read WAL (%v)", err)
}
rc.raftStorage = raft.NewMemoryStorage()
if snapshot != nil {
log.Printf("Applysnpshot %v", *snapshot)
rc.raftStorage.ApplySnapshot(*snapshot)
}
log.Printf("replaying WAL st:(%v)", st)
rc.raftStorage.SetHardState(st)
// append to storage so raft starts at the right place in log
log.Printf("replaying WAL ents:(%d)", len(ents))
rc.raftStorage.Append(ents)

处理请求

raft.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// store raft entries to wal, then publish over commit channel
case rd := <-rc.node.Ready():
rc.wal.Save(rd.HardState, rd.Entries)
if !raft.IsEmptySnap(rd.Snapshot) {
log.Printf("rd.Snapshot (%v)", rd.Snapshot)
rc.saveSnap(rd.Snapshot)
rc.raftStorage.ApplySnapshot(rd.Snapshot)
rc.publishSnapshot(rd.Snapshot)
}
log.Printf("rd.Entries (%v)", rd.Entries)
rc.raftStorage.Append(rd.Entries)
log.Printf("rd.Messages (%v)", rd.Messages)
rc.transport.Send(rd.Messages)
if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok {
rc.stop()
return
}
rc.maybeTriggerSnapshot()
rc.node.Advance()

先写WAL,然后append日志,Send给其它节点,最后publishEntries进行应用。

参考链接

  1. raft library
  2. raft example
  3. pincap raft

本文标题:Raft library使用

文章作者:Louis

发布时间:2017年12月12日 - 14:12

最后更新:2017年12月12日 - 17:12

原始链接:/2017/12/12/raft-library/

许可协议: Louis-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。