Lab3B. Raft 日志压缩及数据快照

栏目: 服务器 · 发布时间: 4年前

内容简介:总结 MIT6.824 Lab3B Log Compaction 实验笔记。在 3A 部分,以上多种耗时操作最终会

总结 MIT6.824 Lab3B Log Compaction 实验笔记。

Lab3B

日志过大的问题

在 3A 部分, Put / Append / Get 等命令的日志只通过 leader 流向 followers,来维护节点间 kvDB 数据的一致性。但随着 Client 请求的增多,各节点的日志将占用更多空间。日志过长会导致:

  • 当发生日志冲突时,follower 从头到尾查找冲突任期的首条日志将更耗时。
  • 某节点因长时间的网络隔离导致日志过于落后,或新节点加入集群,replay 日志将更耗时。

以上多种耗时操作最终会 降低集群的可用性 ,本节目标就是想办法减少日志长度,将大小控制在可控范围内。

日志压缩与数据快照

参考论文第 7 节,Raft 最终选择快照机制来压缩日志大小,如下的 1-5 committed 状态的日志被压缩为快照,最终使长度为 7 的日志 = 长度为 2 的日志 + 快照数据。快照数据分为两部分:

  • Raft 的快照元信息:LastIncludedIndex 与 LastIncludedTerm,用于生成快照后的 AppendEntries 一致性检查。
  • kvDB 的状态:生成快照时的数据库状态,用于节点重启后恢复数据。

    Lab3B. Raft 日志压缩及数据快照

如上,压缩日志的本质是用 某条 committed 日志代替之前的所有 committed 日志 。所以生成快照会删除 Raft 的日志: raft.logs = rf.logs[snapshotIndex:] ,于是要回头修改 Lab2 中基于日志长度、取日志的操作,需要把快照索引的偏移量加回去。所以需要将快照信息放入 Raft 结构中:

type Raft struct {
    // ...
    // snapshot
    lastIncludedIndex int // the snapshot replaces all entries up through and including this index
    lastIncludedTerm  int // term of lastIncludedIndex
}

添加几个公用的日志偏移量计算方法,并替换那些直接对 rf.logs 的读操作:

func (rf *Raft) getLog(i int) LogEntry {
    return rf.logs[i-rf.lastIncludedIndex]
}

func (rf *Raft) addIdx(i int) int {
    return rf.lastIncludedIndex + i
}

func (rf *Raft) subIdx(i int) int {
    return i - rf.lastIncludedIndex
}

func (rf *Raft) lastIdx() int {
    return rf.lastIncludedIndex + len(rf.logs) - 1
}

func (rf *Raft) lastTerm() int {
    return rf.logs[len(rf.logs)-1].Term
}

func (rf *Raft) logLength() int {
    return rf.lastIdx() + 1
}

交互流程

Lab3B. Raft 日志压缩及数据快照

  1. 任意节点都要实时检测 Raft 模块的日志大小。
  2. 若日志大小超过临界值,将 kvDB 数据发送给 Raft 模块用于生成快照。
  3. Raft 将快照日志、kvDB 均持久化到 persister(模拟磁盘)
  4. leader 在心跳时若发现有新快照则同步,切换 AppendEntries RPC 为 InstallSnaoshot RPC
  5. follower 收到快照后,与本地快照及日志对比,决定进行日志覆盖或删除。
  6. follower 将新快照数据持久化。
  7. follower 将 leader 的快照数据回传给上层的 KVServer2,重置 kvDB 使状态机数据达成一致。

流程划分

1-2-3:各节点在日志过多时独立生成快照。leader 的存在是为了解决冲突维护日志的一致性,在 follower 生成快照时日志本身就是 committed 状态的,这并不违反强 leader 原则。

4-5-6-7:仅 leader 操作。它通过 InstallSnapshot 将自己的快照同步给各 follower,使日志过于落后的 follower 快速达成一致。

测试用例

  • TestSnapshotRPC3B :三个节点集群,若其中一个长时间网络隔离后重新加入集群,要能通过 InstallSnapshot RPC 加速日志回放,其他两个节点要能独立生成快照,使日志大小不超过 1000 字节。
  • TestSnapshotRecover3B :节点重启后要能从 persister 中恢复快照数据。
  • 其他测试:在网络分区、节点崩溃重启的环境下保持 kvDB 数据的线性一致性。

测试通过:

Lab3B. Raft 日志压缩及数据快照

KVServer 生成快照

认真阅读并思考: Lecture: Part BGuide: An aside on optimizations

快照数据

kvServer 在重启后会先从 persister 中读取快照信息重置 kvDB,同时为检测 Client 的重复请求,所以必须将 kv.cid2seq 一起持久化:

func (kv *KVServer) encodeSnapshot() []byte {
    w := new(bytes.Buffer)
    e := gob.NewEncoder(w)
    if err := e.Encode(kv.cid2seq); err != nil {
        panic(fmt.Errorf("encode cid2seq fail: %v", err))
    }
    if err := e.Encode(kv.db); err != nil {
        panic(fmt.Errorf("encode db fail: %v", err))
    }
    return w.Bytes()
}

检测 Raft 日志大小

每次 kvServer 的 Raft 模块通知它有新日志达成一致,就意味着 Raft 的日志又新增了一条,就检测 Raft 的日志是否接近边界值。注意 lab 中的 maxraftstate 是日志大小的上限,要在快接近时提前生成快照,因为调用 Raft 模块生成快照抢锁是需要等待的,若不提前就很可能超出上限,无法通过测试。所以取 90% 提前生成:

func (kv *KVServer) waitAgree() {
    for {
        select {
        case <-kv.killCh:
            return
        case msg := <-kv.applyCh:
            op := msg.Command.(Op)
            kv.mu.Lock()
            // ...
            kv.checkSnapshot(msg.CommandIndex) // use committed index as snapshot LastIncludedIndex
            kv.mu.Unlock()
        }
    }
}

func (kv *KVServer) checkSnapshot(appliedId int) {
    if kv.maxraftstate == -1 {
        return
    }
    // take snapshot when raft size come near upper limit
    if kv.persister.RaftStateSize() < kv.maxraftstate*9/10 {
        return
    }
  
    rawSnapshot := kv.encodeSnapshot()
    go kv.rf.TakeSnapshot(appliedId, rawSnapshot) // not take long time with KVServer's lock
}

通知 Raft 同步快照

kvServer 需告知 Raft 模块以哪条 committed 日志为准来生成快照,由于 applyCh 的 msg 都是 committed 的,所以如上直接使用它的 msg.CommandIndex 即可。

Raft 收到生成快照的请求后更新完自己的快照信息就立刻返回,不要阻塞,快照同步交给心跳去处理:

// leader take snapshot should be async like Start(), must return quickly
func (rf *Raft) TakeSnapshot(appliedId int, rawSnapshot []byte) {
    rf.mu.Lock()
    defer rf.mu.Unlock()

    // lock competition may delayed snapshot call, check this otherwise rf.logs[0] may out of bounds
    if appliedId <= rf.lastIncludedIndex {
        return
    }

    // discard the entries before that index, preserved it for AppendEntries consistency check
    rf.logs = rf.logs[rf.subIdx(appliedId):]
    rf.lastIncludedIndex = appliedId
    rf.lastIncludedTerm = rf.logs[0].Term
    rf.persistStatesAndSnapshot(rawSnapshot)
}

KVServer 多次调用 Raft.TakeSnapshot() 的执行顺序会收到 Raft 抢占锁的影响,可能新快照会先抢到锁,轮到旧日志处理时对应的 appliedId 日志已被删除,会造成 rf.logs[0] 溢出,所以要过滤旧快照。

Raft 同步快照

Leader 使用快照代替日志加速同步

在 Lab2 中已实现 leader 向各 follower 发送缺失日志的心跳逻辑,引入快照机制后,可直接发送快照。当leader 发现快照比 nextIndex 还新,就将 AppendEntries RPC 换成 InstallSnapshot RPC,大大减少要同步的日志数量。

如下leader 只需发送 6 一条日志,而非 3 4 5 6 四条:

Lab3B. Raft 日志压缩及数据快照

在 Leader 给 follower 同步日志前,检查快照若更新则走 InstallSnapshot RPC 的逻辑:

// leader sync logs to followers
func (rf *Raft) sync() {
    for i := range rf.peers {
        if i == rf.me {
            continue
        }

        go func(server int) {
            for {
                if !rf.isRunningLeader() {
                    return
                }

                rf.mu.Lock()
                rf.syncConds[server].Wait() // wait for trigger

                // sync new log or missing logs to server
                next := rf.nextIndex[server]

                // if follower far behind from leader, just send snapshot to it for speeding up replay
                if next <= rf.lastIncludedIndex {
                    rf.syncSnapshot(server) // InstallSnapshot RPC logic
                    continue
                }
          
               // AppendEntries RPC logic
    }
}

InstallSnapshot RPC

参考论文的图 13,RPC 参数如下:

type InstallSnapshotArgs struct {
    Term              int    // leader's term
    LeaderId          int    // so follower can redirect clients
    LastIncludedIndex int    // the snapshot replaces all entries up through and including this index
    LastIncludedTerm  int    // term of lastIncludedIndex
    Data              []byte // raw bytes of the snapshot chunk, starting at offset
    // Offset            int    // byte offset where chunk is positioned in the snapshot file
    // Done              bool   // true if this is the last chunk
}

type InstallSnapshotReply struct {
    Term int // currentTerm, for leader to update itself
}

lab 为简化流程,每次直接发送整个快照,而非论文中切割为 chunk 分块有序同步,所以 offset 取 0,done 取 true

Leader 端

// sync snap shot to follower server
// rf.mu is locked when call syncSnapshot()
func (rf *Raft) syncSnapshot(server int) {
    if rf.state != Leader || rf.crashed {
        rf.mu.Unlock()
        return
    }

    args := InstallSnapshotArgs{
        Term:              rf.curTerm,
        LeaderId:          rf.me,
        LastIncludedIndex: rf.lastIncludedIndex,
        LastIncludedTerm:  rf.lastIncludedTerm,
        Data:              rf.persister.ReadSnapshot(),
    }
    rf.mu.Unlock()

    var reply InstallSnapshotReply
    respCh := make(chan struct{})
    go func() {
        if ok := rf.sendInstallSnapshot(server, &args, &reply); ok {
            respCh <- struct{}{}
        }
    }()
    select {
    case <-time.After(RPC_CALL_TIMEOUT):
        return
    case <-respCh:
        close(respCh)
    }

    rf.mu.Lock()
    defer rf.mu.Unlock()
    if reply.Term > rf.curTerm {
        rf.back2Follower(reply.Term, VOTE_NIL)
        return
    }
    if rf.state != Leader || reply.Term < rf.curTerm { // curTerm changed already
        return
    }

    rf.matchIndex[server] = args.LastIncludedIndex
    rf.nextIndex[server] = args.LastIncludedIndex + 1
}

注意在 syncSnapshot 中调用 RPC 前及时释放锁,否则同一时间只能发起一个调用。

Follower 端

// follower receive snapshot from leader and force overwrite local logs
func (rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {
    rf.mu.Lock()
    defer rf.mu.Unlock()
    reply.Term = rf.curTerm

    // 1. reply false if term < currentTerm
    if args.Term < rf.curTerm {
        return
    }
    if args.Term > rf.curTerm {
        reply.Term = args.Term
        rf.back2Follower(args.Term, VOTE_NIL)
    }
    rf.resetElectTimer()

    // check snapshot may expired by lock competition, otherwise rf.logs may overflow below
    if args.LastIncludedIndex <= rf.lastIncludedIndex {
        return
    }

    // 2. Create new snapshot file if first chunk (offset is 0)
    // 3. Write data into snapshot file at given offset
    // 4. Reply and wait for more data chunks if done is false
    // 5. Save snapshot file, discard any existing or partial snapshot with a smaller index

    // 6. If existing log entry has same index and term as snapshot's last included entry, retain log entries following it and reply
    if args.LastIncludedIndex < rf.lastIdx() {
        // the args.LastIncludedIndex log has agreed, if there are more logs, just retain them
        rf.logs = rf.logs[args.LastIncludedIndex-rf.lastIncludedIndex:]
    } else {
        // 7. Discard the entire log
        // empty log use for AppendEntries RPC consistency check
        rf.logs = []LogEntry{{Term: args.LastIncludedTerm, Command: nil}}
    }

    // update snapshot state and persist them
    rf.lastIncludedIndex = args.LastIncludedIndex
    rf.lastIncludedTerm = args.LastIncludedTerm
    rf.persistStatesAndSnapshot(args.Data)

    // force the follower's log catch up with leader
    rf.commitIndex = max(rf.commitIndex, args.LastIncludedIndex)
    rf.lastApplied = max(rf.lastApplied, args.LastIncludedIndex)

    // 8. Reset state machine using snapshot contents (and load snapshot's cluster configuration)
    rf.applyCh <- ApplyMsg{
        CommandValid: false, // it's snapshot raw data, not a command
        CommandIndex: -1,
        Command:      args.Data, // use for KVServer restore kvDB
    }
}

对应修改 KVServer 监听 applyCh 的逻辑,从中取出 leader 发来的快照数据:

func (kv *KVServer) waitAgree() {
    for {
        select {
        case <-kv.killCh:
            return
        case msg := <-kv.applyCh:
            if !msg.CommandValid { // snapshot data
                buf := msg.Command.([]byte)
                kv.mu.Lock()
                kv.db, kv.cid2seq = kv.decodeSnapshot(buf) // restore kvDB and cid2seq
                kv.mu.Unlock()
                continue
            }
            
      // log agreement loginc
        }
    }
}

至此分别完成了三件事:

  • 各节点能实时检测 Raft 日志大小,独立地生成快照并持久化,重启时读取。
  • leader 生成快照后通过心跳发起 InstallSnapshot RPC 给 followers 加速日志回放和同步。
  • followers 将快照数据回传给上层的 KVServer 重置 kvDB 和 cid2seq,最终状态与 leader 一致。

总结

在 Lab2 中实现了 Raft 算法的三个子模块,开放了 Start()applyCh 供状态机使用。

Lab3A 在其 Lab2 基础上实现了容错的 kvDB 集群,依靠底层 Raft 算法在节点崩溃重启甚至不可用、网络延迟丢包甚至分区的环境下,依旧对多个 Client 保证数据的线性一致性。

Lab3B 实现了 Raft 日志大小的实时检测并截断生成快照,由 leader 通过 InstallSnapshot RPC 将日志同步给过于落后的节点来加速回放,同时重置各状态机(kvDB)的状态,使其数据与 leader 最达成一致。

Lab3 比 Lab2 稍简单,关键在于梳理好 KVServer 与底层的 Raft 模块的交互流程,需重读 Raft 论文的第 7、8 节的快照机制,思考 lecture 的提示。调试时活锁、状态更新时机有误等问题居多,其余细节在代码注释中有标注。


以上所述就是小编给大家介绍的《Lab3B. Raft 日志压缩及数据快照》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

A Guide to Monte Carlo Simulations in Statistical Physics

A Guide to Monte Carlo Simulations in Statistical Physics

Landau, David P./ Binder, Kurt / Cambridge Univ Pr / 2005-9 / 786.00元

This new and updated edition deals with all aspects of Monte Carlo simulation of complex physical systems encountered in condensed-matter physics, statistical mechanics, and related fields. After brie......一起来看看 《A Guide to Monte Carlo Simulations in Statistical Physics》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

html转js在线工具
html转js在线工具

html转js在线工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换