Lab2A. Raft 选主实现

栏目: 后端 · 发布时间: 4年前

内容简介:总结下 MIT6.824 Lab2A Raft 选主的实验笔记。本文代码:Raft 将一致性问题分解成三个子问题:Leader 选举、日志复制、安全性保证,分别对应 Lab 的 2A, 2B, 2C,均可参考原论文图 2 中对 Raft 实现的简要总结。本小节实验目标:Lab 限制 leader 每秒最多发送 10 次心跳请求,实现时取心跳间隔为 100ms。相应的,选举超时时间应比心跳大一个量级左右,我实现时取

总结下 MIT6.824 Lab2A Raft 选主的实验笔记。本文代码: MIT6.824/raft

Lab2A

Raft 将一致性问题分解成三个子问题:Leader 选举、日志复制、安全性保证,分别对应 Lab 的 2A, 2B, 2C,均可参考原论文图 2 中对 Raft 实现的简要总结。本小节实验目标:

  • 实现 Leader 选举:选出单个 leader 并保持领导地位,直到自己 crash
  • 实现心跳通信:实现 leader 与其他节点的无日志 AppendEntries RPC 调用

Leader 选举

Lab 限制 leader 每秒最多发送 10 次心跳请求,实现时取心跳间隔为 100ms。相应的,选举超时时间应比心跳大一个量级左右,我实现时取 400 + rand.Intn(4) * 100 ,即 400~800ms 内的随机值,尽可能避免选举 split vote 情况。

选举流程

参考上一篇文章: Leader 选举

Lab2A. Raft 选主实现

发起投票

定义 Raft 节点:

type Raft struct {
	mu        sync.Mutex          // 共享锁
	peers     []*labrpc.ClientEnd // 集群中的全部节点
	persister *Persister          // 持久化工具
	me        int                 // 本节点在 peers 中的索引

	curTerm  int           // 节点目前的任期号
	votedFor int           // 节点目前的投票对象
	entries  []LogEntry    // 本地日志
	state    PeerState     // 节点状态
	timer    *RaftTimer    // 选举超时定时器
	entryCh  chan LogEntry // 日志处理 channel
}

每个节点在 Make 初始化时都选择时长随机的 RaftTimer,之后启动新的 goroutine 监听 timer 超时和 entryCh 心跳请求,当 RaftTimer 超时后,变身为候选人发起投票。

代码实现:

// 投票参数
type RequestVoteArgs struct {
	Term        int // 候选人的任期号
	CandidateId int // 候选人 id
}

// 响应投票
type RequestVoteReply struct {
	Term        int  // 选民节点的任期号
	VoteGranted bool // 是否赢得该选票
}

// 候选人发起投票
func (rf *Raft) vote() {
	rf.curTerm++
	rf.state = Candidate
	rf.votedFor = rf.me

	args := RequestVoteArgs{
		Term:        rf.curTerm,
		CandidateId: rf.me,
	}
	replyCh := make(chan RequestVoteReply, len(rf.peers))
	var wg sync.WaitGroup
	for i := range rf.peers {
		if i == rf.me {
			continue
		}

		wg.Add(1)
		go func(server int) {
			defer wg.Done()
			var reply RequestVoteReply
			if succ := rf.sendRequestVote(server, &args, &reply); !succ {
				return
			}
			replyCh <- reply
		}(i)
	}
	go func() {
		wg.Wait()
		close(replyCh) // 避免资源泄漏
	}()

	votes := 1
	targetVotes := len(rf.peers)/2 + 1
	for reply := range replyCh {
		// 已有更新 leader,回退到 follower
		if reply.Term > rf.curTerm {
			rf.back2Follower(reply.Term)
			return
		}
		if reply.VoteGranted {
			votes++
		}

		// 如果选票已过半,不再等待已 crash 的节点调用超时
		if votes >= targetVotes {
			break
		}
	}

	// 因 split vote 等原因未达到多数票
	if votes < len(rf.peers)/2+1 {
		rf.resetElectTimer()
		return
	}

	// 成功当选,立刻发送心跳
	rf.state = Leader
	go rf.heartbeat()
}

注意减少选举耗时:候选人收集选票过程中,实时计票过半后即可结束选举,而非等待所有请求都返回了才去计票。假设有的节点已 crash,那 RPC 调用将超时返回 false,超时时间为 100ms,若不立即结束选举,候选人将白白浪费 100ms 时间,也就无法及时选出 leader

响应投票

Raft 对投票节点提出了三点要求:

  • 每轮能投几张:一个任期内,一个节点只能投一张票
  • 是否要投:候选人的日志至少要和自己的一样新,才投票
  • 投给谁:first-come-first-served,投给第一个符合条件的候选人

代码实现(干净整洁的代码):

func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
	reply.Voter = rf.me
	reply.Term = rf.curTerm

	switch {
	case args.Term < rf.curTerm: // 拒绝处理
		reply.VoteGranted = false
		return
	case args.Term == rf.curTerm: // 每个任期只能投一票
		if rf.votedFor == VOTE_NIL || rf.votedFor == args.CandidateId {
			reply.VoteGranted = true
			rf.votedFor = args.CandidateId
			rf.back2Follower(args.Term)
		}
	case args.Term > rf.curTerm: // 直接投票
		reply.VoteGranted = true
		rf.votedFor = args.CandidateId
		rf.back2Follower(args.Term)
	}

	return
}

比较候选人与自己的日志将在 2B 中实现。

心跳通信

Raft 将客户端的命令封装为 log entry:

type LogEntry struct {
	Index   int         // 日志索引号
	Term    int         // 写入日志时节点的任期号
	Command interface{} // 客户端命令
}

心跳请求

当候选人成功竞选为 leader 后要 立刻 给集群中其他节点发送心跳,避免有的节点也超时发起新一轮选举。

代码实现:

// 心跳请求
type AppendEntriesArgs struct {
	Term         int        // leader 任期号
	LeaderId     int        // leader id
	PrevLogIndex int        // 暂时不用
	PrevLogTerm  int        //
	Entries      []LogEntry // 批量日志,心跳时为空
}

// 心跳响应
type AppendEntriesReply struct {
	Term int  // 节点任期号
	Succ bool // 心跳是否成功响应
}

// leader 发送心跳
func (rf *Raft) heartbeat() {
	t := time.NewTicker(HEARTBEAT_INTERVAL) // 100ms
	for {
		if !rf.isLeader() {
			return
		}

		args := AppendEntriesArgs{
			Term:         rf.curTerm,
			LeaderId:     rf.me,
			PrevLogIndex: 0,
			PrevLogTerm:  0,
			Entries:      nil, // 心跳时为空日志
		}
		replyCh := make(chan AppendEntriesReply, len(rf.peers))
		var wg sync.WaitGroup
		for i := range rf.peers {
			if i == rf.me {
				continue
			}
			wg.Add(1)

			go func(server int) {
				defer wg.Done()
				var reply AppendEntriesReply
				if succ := rf.sendAppendEntries(server, &args, &reply); !succ {
					return
				}
				replyCh <- reply
			}(i)
		}
		wg.Wait()
		close(replyCh)

		var lived int
		for reply := range replyCh {
			if reply.Term > rf.curTerm {
				// 发现新 leader,如网络分区恢复
				rf.back2Follower(reply.Term)
				return
			}
			lived++
		}

		// 未收到来自大多数节点的心跳,重新开始选举
		if lived < len(rf.peers)/2+1 {
			rf.vote() // 重新开始投票
			return
		}

		<-t.C
	}
}

响应心跳

对于心跳请求,节点需对比任期号,并进行日志的一致性检查:

func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
	if len(args.Entries) > 0 {
		log.Fatal("invalid entry in 2A")
	}

	reply.Term = rf.curTerm
	if rf.curTerm > args.Term {
		reply.Succ = false
		return
	}

	// 检查双方日志的一致性
	if i := len(rf.entries) - 1; i >= 0 {
		switch {
		case i < args.PrevLogIndex: // 本地少日志,让 leader nextIndex[i]-- 后再同步
			reply.Succ = false
			return
		case i == args.PrevLogIndex:
			if rf.entries[i].Term != args.PrevLogTerm { // term 不匹配
				reply.Succ = false
				return
			}
		case i > args.PrevLogIndex: // 强制删除
			rf.entries = rf.entries[args.PrevLogIndex:]
		}
	}
	rf.entries = append(rf.entries, args.Entries...)
	rf.entryCh <- LogEntry{Term: args.Term}

	reply.Succ = true
	return
}

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

互联网时代

互联网时代

《互联网时代》主创团队 / 北京联合出版公司 / 2015-2-1 / 49.80元

【编辑推荐】 1、人类正进入一个充满未知的时代,《互联网时代》不仅告诉你现在,还告诉你未来。 2、中央电视台《互联网时代》是全球第一部全面、系统、深入、客观解析互联网的纪录片,同名图书容量巨大,除纪录片内容,更包含大量尚未播出的内容。 3、中央电视台继《大国崛起》《公司的力量》《华尔街》等之后的又一重磅力作。10个摄影组,制作近3年,在全球14个国家和地区拍摄,6位“互联网之父”......一起来看看 《互联网时代》 这本书的介绍吧!

随机密码生成器
随机密码生成器

多种字符组合密码

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具