大家好,欢迎来到IT知识分享网。
Gorountine
轻量级线程,称为协程;比起操作系统线程goroutine消耗的资源更少,切换的开销更低(不会切换内核态),可以有更高的并发度
使用gorountine的目的有:
1 提升计算性能:充分利用多核
2 旁路IO:避免影响主干流程
在raft中,把要发送给每个peer的RPC放入单独的gorountine,避免阻塞主干 electionLoop或replcationLoop
Go并发难点
var a, b int func f() {
a = 1 b = 2 } func g() {
print(b) print(a) } func main() {
go f() g() }
g()可能先输出2再输出0,为了保持多线程同步关系,go提供的手段有sync.Mutex,chan,sync.Cond
锁 sync.Mutex
锁用于保证可见性和原子性
让一个线程对共享变量的修改立即对其他线程可见,避免其他线程读到过期缓存副本
func (rf *Raft) GetState() (int, bool) {
rf.mu.Lock() defer rf.mu.Unlock() return rf.currentTerm, rf.role == Leader }
以上代码可以保证在读取term和role时,能够及时看到其他线程修改结果
原子性,将一段代码打包到同步区执行,不会被其他线程扰乱
func (b *Bank) transfer() {
b.mu.Lock(); defer b.mu.Unlock() b.account1 -= 50; b.account2 += 50; }
需要关注
1 空间上,锁的保护范围
锁的保护范围即锁要保护的共享变量集,锁的范围越大性能越差,因此用锁的基本策略是渐进式加锁,对一个类来说,先用一把大锁保护所有共享字段,确保逻辑正确后,再逐步减低锁粒度
raft中整个类使用一把锁保护起来,编码时,将所有需要保护的字段置于锁后,如果一个函数需要持有锁才能调用,应在其名称中加上Locked(规范)
// A Go object implementing a single Raft peer. type Raft struct {
peers []*labrpc.ClientEnd // RPC end points of all peers persister *Persister // Object to hold this peer's persisted state me int // this peer's index into peers[] dead int32 // set by Kill() mu sync.Mutex // fields below should be persisted currentTerm int votedFor int log []LogEntry // control apply progress commitIndex int lastApplied int applyCond *sync.Cond applyCh chan ApplyMsg // only for leaders nextIndex []int // guess when initialize matchIndex []int // truth from the rpc // timer and role electTimerStart time.Time role Role }
2 时间上,锁的释放时机
在用到共享变量的函数中,全函数加锁,这会导致性能很差,丧失多线程的意义;因此在执行到函数中的长耗时操作时,应该及时释放锁
Raft中,发送RequestVote,AppendEntries RPC请求前,本机读写文件前,向Apply Channel发送数据时,要及时释放锁
Channel
除了锁以外,channel也是常用的多线程同步手段
本质上是一个线程安全的消息队列,将channel作为语言内置实现
raft中用到channel的地方主要是applyChannel,当日志被大多数节点提交时,通过channel传递给raft使用方
for _, applyMsg := range messages {
rf.applyCh <- *applyMsg // maybe block, should release lock rf.mu.Lock() rf.lastApplied++ rf.mu.Unlock() }
sync.Cond
cond 是go中对操作系统信号量原语wait-signal的实现,用于多线程同步,必须和Mutex配合使用,每个cond都要在构造时绑定一个mutex (和java中wait/notify必须用synchronized包裹一样)
rf.applyCond = sync.NewCond(&rf.mu)
wait signal必须在临界区中执行,其语义是
1 wait会阻塞 gorountine执行,并释放当前持有的锁
2 signal 会唤醒阻塞在wait上的gorountine
阻塞在wait上的gorountine被唤醒后会自动重新获取锁,这就要求调用完signal的线程立即释放锁,否则可能死锁
在raft中,用于rf commitIndex更新后,提醒apply gorountine执行apply
// wait in apply loop func (rf *Raft) applyLoop() {
for !rf.killed() {
rf.mu.Lock() rf.applyCond.Wait() // do the apply } } // signal in the leader // in AppendEntriesReply handling if n > rf.commitIndex && rf.log[n].Term == rf.currentTerm {
rf.commitIndex = n rf.applyCond.Signal() } // singal in the follower // in AppendEntries callback if args.LeaderCommit > rf.commitIndex {
targetIndex := MinInt(args.LeaderCommit, len(rf.log)-1) rf.commitIndex = targetIndex rf.applyCond.Signal() }
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/129949.html