MIT6.824分布式系统实验
MIT6.824分布式系统实验
LAB1 mapreduce
mapreduce中包含了两个⾓⾊,coordinator和worker,其中,前者掌管任务的分发和回收,后者执⾏任务。mapreduce分为两个阶段,map阶段和reduce阶段。
map阶段对应的是map任务。coordinator将会把任务分成多个部分,例如,有多个⽂件待处理,则每个⽂件的处理是⼀个任务。coordinator根据待处理⽂件⽣成多个任务,将这些任务⽤available管道暂存,供worker取⽤。worker将任务完成之后,需要告知coordinator,coordinator需要记录任务的状态。为了标识任务,每个任务需要有唯⼀的taskId。coordinator可以⽤taskId为key的map来存储所有task,worker完成⼀个task之后,这个task就没有必要保存,coordinator可以从map中删除该task。coordinator存储未完成的task,除了供worker⽐对之外,还可以⽤来重新分发超时的任务。worker调⽤coordinator的applyForTask函数,来从avaliable队列中得到新的任务。在map阶段,worker收到任务后会调⽤mapf函数,这个函数是⽤户传⼊的参数,指向任务的具体执⾏过程。对mapf的执⾏结果,worker根据reduce的个数,将执⾏结果hash成reduce份。例如,对于wordcount任务,每个⽂件中的词的统计数量将根据词分为reduce份,保存在reduce个⽂件中。
reduce阶段对应的是reduce任务。coordinator将⽣成reduce个新的任务,每个任务处理⼀个hash桶中的
内容。同样⽤available管道供worker取⽤。当然,这时worker只需要知道⾃⼰取到的是第⼏个hash桶对应的reduce任务,即可通过共享⽂件和统⼀的⽂件命名规则获取到此时需要处理的⽂件。根据⽤户reducef函数的输⼊,worker将输⼊⽂件中的内容排序之后,将相同key的value存储成数组,输⼊reducef函数处理。
值得探讨的点:
1. worker通知coordinator任务完成:worker对任务完成的通知可以不必发⼀个新的包,因为worker每次完成任务的同时都会⽴即向coordinator请
求新的任务,因此可以在请求包中附送上⼀个已经完成的taskId。coordinator经过⽐对taskId和workerId确认⽆误之后,在分发新任务之前就可以处理旧的已完成任务。
2. 超时任务检测:有两种选择,⼀是worker接收任务之后定时发⼼跳包,但是这种⽅式较为繁琐。另⼀种是coordinator定时检查,task的map中对
每个task维护⼀个ddl,若当前时刻已经超过了ddl时间,就视为超时。
3. available管道初始化容量:不初始化容量的话,管道会阻塞。
4. 任务结果⽂件重命名:worker处理阶段,为了防⽌其他worker也在处理这⼀⽂件导致的写冲突,会将处理结果⽂件命名中加上workerId,但
reduce阶段不需要知道map结果是由哪个worker⽣成的,因此coordinator确认任务完成后会对结果⽂件重新命名,去掉workerId的标记。reduce 阶段同理。
mapreduce框架的⼀个重要瓶颈就是可能有⼤量的数据需要在服务器之间进⾏传递,其论⽂中详细讨论了这⼀点及其解决⽅案。
LAB2 raft
raft是⼀个分布式共识算法。分为领导选举【Leader election】、⽇志复制【Log replication】和安全【Safety】。
分布式共识的应⽤:
1. 逻辑时间的共识,⽤来决定事件发⽣的顺序;
2. 互斥性的共识,⽤来决定当下谁正拥有访问的资源;
3. 协调者的共识,谁是当下的leader。
在⼀个raft集中,server总是在三种状态之间转换,follower、candidate、leader,且保证任何时刻系统中最多只有⼀个leader。系统将时间划分为多个term,term顺序递增,candidate进⾏选举的时候会先将⾃⾝的term加⼀,表⽰⾃⼰认为已经可以开始新的term了。在⼀个term内的稳定状态
下,raft集中只有⼀个leader,其余的server是follower,系统所处term的切换意味着leader的切换。leader定时向其余服务器发送⼀个heart beat⼼跳信息,表⽰⾃⼰仍然存活,此外,接收外界对raft系统的数据请求,提供对外服务,⽣成⽇志条⽬,并且将⽇志条⽬复制给其他的follower,以此实现数据的多存储;follower接受leader发送的heart beat,确认当前系统存在leader,并且接收leader发来的⽇志条⽬副本,更新本地的⽇志。
【Leader election】若follower的heart beat超时,即,在⼀段时间内都没有收到leader发来的heart beat。此时,这台follower认为leader已经挂掉,于是⾃动转化为candidate状态,开始竞选成为新⼀期的leader。candidate将⾃⾝的term加⼀,投票给⾃⼰,同时向所有server发送requestVote的请求,对于收到requestVote请求的服务器来说,只要它们在这个term没有投出票,则投给这个candidate,换句话说,⼀个server在⼀个term只能投⼀次票。在⼀轮投票中,若所得票数⼤于总服务器数量的⼀半,则赢得选举,成为本期leader,同时⽴即发送⼀条heart beat宣布上任,系统回到稳定状态。同⼀时刻允许同时存在多个candidate,此时可能会出现选票平分的情况,这时⽆法选出新的leader,candidate将重新发起投票,并且term再加⼀。重新投票将会影响系统的性能,为了减⼩同时出现多个candidate
的可能性,每台server的heart beat超时时间(等待heart beat的时间)将设置为⼀个区间范围内的随机数。⼀般要求:heart beat时间<<;选举超时时间<<;平均故障时间。
由于raft集的server总是在三种状态之间切换,不同状态执⾏不同的任务,因此将使⽤状态机来实现。server之间互相发送的包是⼼跳包和requestVote包以及它们的reply。
主线任务
leader:【发送⼼跳包给follower和candidate,收到不合法的⼼跳则拒收】向集中其他所有成员定时发送heart beat,确认存活,同时接收其他成员反馈的reply信息。对于reply信息,有多种情况:
reply.Success = true:成员承认本leader;
reply.Success = false:成员拒绝承认本leader。原因是该成员的term&,本leader的任期已过,集已经在新的term了。于是这台机器退位,降级为follower,并更新⾃⾝的term等信息,保持与集同步。
follower:【从leader接收⼼跳包,从candidate接收requestVote包】
接收投票要求
如果投票的term⼤于⾃⼰,说明有⼈发现leader挂了,在发起新⼀轮的投票,投票,同时视为收到了⼼跳;
否则,拒绝投票,并且告诉通过告知candidate本机认为当前所处的term;
接收⼼跳:重置⼼跳超时计时器;
检查是否⼼跳超时:若超时,成为candidate,并且⽴即发起投票;
candidate:【从leader接收⼼跳,发送requestVote包给follower和其他candidate,从其他candidate接收requestVote包】
发起投票。对于投票结果:
若超过半数同意,则⽴即成为leader并且执⾏leader任务;
若有⼈拒绝:查看,如果>=⾃⼰,说明是⾃⼰out了,降级为follower,取消本轮投票;否则就是单纯的不投
我,那就算了;
检查投票是否超时,若超时,重新发起投票;
接收⼼跳,如果在投票过程中收到term>=⾃⼰的⼼跳,说明现在已经有leader了,降级到follower状态,取消本轮投票。
关于投票取消的时候可能发⽣的异常讨论
follower同意投票的同时,将term更新,⽴即视为进⼊了新的term并且将这个candidate视为当前term的leader,这是没有问题的。如果
candidate选举成功,显然是没问题的;如果candidate选举不成功,即,取消投票,有以下情况:收到>=的选举回复,说明系统正在试图开启更⼤的term;收到term>=⾃⼰的⼼跳,说明当前系统中正处于更⼤的term,并且已经处于有leader的稳定状态。不论是试图开启还是已经达到,当这个更⼤的term达到稳定的状态时,其leader会发送⼼跳,⼼跳的term⼤于candidate的term,投票给candidate的server不会拒绝这些⼼跳,并且会⽴即响应进⼊新的term,从前的错误投票在新的term下毫⽆影响。
——————————————
(重构)
对于⼀台server,需要做的事情有三个⽅⾯:选举、⽇志复制、apply。其中,选举和apply两项是所有server都主动进⾏的,因此在初始化的时候使⽤两个goroutine来控制,⽇志复制应该是由client调⽤start来控制进⾏的。
timeout⼀直在倒计时,⼀旦超出了倒计时就称为candidate开始选举,倒计时期间,可能由于收到leader或者任期更⼤的server的消息⽽reset倒计时。logApplier不断地推动lastApplied追上commitIndex,通过发送ApplyMsg给applyCh通道接⼝来apply⽇志,如果已经两者已经⼀致了,就wait直到有新的commit。如何检查有新的commit呢?可以使⽤d条件变量,等commitIndex更新的时候⽤broadcast唤醒这个cond,从⽽疏通堵塞。
【选举】
投票条件:
1. 候选⼈最后⼀条Log条⽬的任期号⼤于本地最后⼀条Log条⽬的任期号;
2. 或者,候选⼈最后⼀条Log条⽬的任期号等于本地最后⼀条Log条⽬的任期号,且候选⼈的Log记录长度⼤于等于本地Log记录的长度becomeCandidate时,⽴即开始选举,当然,这时候需要⼀些前序步骤:将term++标识进⼊了新的term,将votedfor置为me表⽰投票给⾃⼰了。
选举⽅:选举过程需要⼀个“得票数”的变量votesRcvd来记录已得票数(在分布式系统中,它的增加需要原⼦操作,因此⽤⼀个锁d锁来保护),此外,还要⽤⼀个finish变量来确定已经做出回答的server有多少。每当得到⼀枚票,就唤醒(broadcast)⼀次cond锁,堵塞疏通,做出“继续等待/处理最终票数/直接return”的选择。其中,继续等待是当票数不够⼀半,但还有server没有做出回复的时候。处理最终票数是剩余情况。直接return⽐较特殊,因为可能在等待得票的过程中,本candidate已经不是candidate了,可能降级为follower了。处理最终票数就很简单了,如果够⼀半就升级为leader(开始⼼跳goroutine),不够就变成follower(此时是因为所有server都已经做出了回复所以开始处理最终票数的),处理最终票数的过程中,要通过判断和加锁的⽅式,确保本candidate仍然是candidate,且当前任期和得票的任期⼀样。
接待员(中间函数):构造args和reply,调⽤投票⽅的投票函数。对返回结果,只在voteGranted为true的时候返回true,否则返回false,如果更⼤,就令candidate降级为follower。同时,在处理期间也要保证本candidate是candidate的时候才有必要继续进⾏,但继续进⾏的时候,⾮必要不得对candidate加锁,否则容易形成死锁。
投票⽅:先检查,如果⽐⾃⼰⼤,那就先承认⼀下⾃⼰的follower地位,如果⽐⾃⼰⼩,那就voteGranted置为false,让选举⽅承认⾃⼰follower的地位,并且返回,没必要再理会这次选举。继续处理的是>=⾃⼰的情况。如果还没投,或者已经投给了这个candidate,并且ter
m相同的话选举⽅log更长,那就投给选举⽅,并且reset选举超时计时器。否则不投。简⽽⾔之,投票要检查term,term相等的话看log是不是新于⾃⼰,以及票是不是已经投出去了。
设计技巧:
1. 将单个询问、处理回复和分发、回收分为两个过程。前者是接待员,为单个投票⽅提供单个接待服务,后者是总管,给各个投票⽅分配出各⾃
的接待员。
2. 尽量不加锁,或者锁粒度尽可能⼩,在处理的时候判断⼀下是不是状态还未过时。
【⽇志复制】
接待员(发送⽅/中间函数):取出⽬标server对应的nextIndex和matchIndex。如果nextIndex,即即将发送的entries的开始位
置,<=snapshotLastIndex,就是已经被压缩了,那就将snapshot发送给⽬标server,返回。如果nextIndex在log⾥,就构造AppendEntriesArgs,把nextIndex后⾯所有的entries全发送过去,这时,要附带nextIndex-1这⼀条的index和term,⽤来给⽬标server做⼀致性检查。对于返回值,⾸先检查term
判断是否本leader需要降级为follower,然后再判断是否成功。如果成功,就更新nextIndex和matchIndex,再看看需不需要commit。如果不成功,那就是⼀致性检查出问题了,到冲突点,重新执⾏接待任务。
快速回退法:发⽣冲突的时候,让follower返回⾜够的信息给leader,这样leader可以以term为单位来回退,⽽不⽤每次只回退⼀条log条⽬,因此当log不匹配的时候,leader只需要在每个不同的term发送⼀条appendEntries,这是⼀种加速策略。
冲突点回溯:到args.PrevlogTerm的第⼀条log的index,就是⽬前看来的冲突index。不会往之前的term,因为⽆法确定那⾥是不是冲突了。这个冲突index可能会有点悲观,这⾥会增加⽹络负载,可以优化。
设计技巧:
1. matchIndex只在发送成功的时候更新,并且是为了commit设置的。follower的commitIndex始终是随着AppendEntriesArgs带来的leader的
commitIndex更新的,⾃⼰不能主动判断更新。另,commit的时候会唤醒applyCond。
2. nextIndex总是很乐观的,靠⼀致性检查和冲突点回溯来防⽌错误。
3. ⼀条log entry的index和它在log中的下标不是同⼀个东西。
4. 对log的操作可能很多,设计⼀个log类来专门管理这些操作,像cmu数据库⼀样写⼀些基本的常⽤操作函数。
5. 向管道中塞东西,可能会发⽣堵塞,因此要使⽤goroutine。例如 go rf.applyCh<-msg
6. appendNewEntry时的index
【关于⽇志复制时可能出现的异常情况讨论】
如果leader正常⼯作,raft系统中不会出现什么问题,follower只需要接收leader发来的⽇志信息,将log的状态与leader的log状态靠齐即可。
⼀个旧leader故障之后,新的leader是否可以使系统达到⼀致?
麻省理工申请条件
假设现在系统中有三台机器,S1,S2和S3,其中S3是旧的leader,且系统此刻是⼀致的。S3可能引发不⼀致的故障时刻有三种:
1. 将新条⽬添加到本地log之后⽴即故障:根据多数选举的规则,S1和S2中可以出现新的leader,系统继续服务。
2. 将新条⽬添加到S1之后故障:S1可以成为leader,系统继续服务,S1会将这条条⽬传递给其他机器并且提交。
3. 将新条⽬添加到S1并且提交之后故障:同上。
因此,旧leader S3故障之后,剩下的团体也可以正常服务。如果此时旧leader重新与集建⽴了联系,系统将会如何?
不论中间经过了多少个term,假设现在的leader是S1,旧leader是S3,S3重新加⼊集的时候,⾸先S3肯定会降级为follower,如果S3可以⽴即被选举为leader,那么就可以视为S3没有发⽣过故障。S1会发送新条⽬给S3的时候,S3会进⾏不⼀致性检查,经过多次发送并尝试append条⽬,S1会令S3的log状态与⾃⼰的达成⼀致。
【关于已经commit的log是否会丢失的进⼀步讨论】
假设当前leader是S3。已知leader选举,当term⼀致的时候,只能给log长于⾃⼰的投选举票。那么只有log长于其中超过半数机器的机器可以成为leader。已知leader永远不会丢弃⾃⼰已有的log,那么存在于leader中的被commit的log肯定不会被丢弃。丢弃的情况只会是⼀条log被⼤多数机器记录,但leader没有记录。
假设该log的index是i1,term是t1。根据log append的连续性,S3⾄多接收到i1-1之后就没有接收到t1的其他任何log了。进⼀步地,由于i1被保留到了S3⼊选的term,因此t1之后的leader都有i1记录,因此S3⾄多接收到i1-1之后就再也没有收到到选举为⽌的其他任何log了。在这种情况下,还要保证经历了所有的term(才能term与其他选举者⼀致),即使之后的所有term都不再append条⽬到任何机器上,那也有⼤多数机器⽐S3多了i1这条log,S3不可能选举成功。推出⽭盾,因此commit的log不会丢失。
【persist】
persist类是raft类中的⼀个成员。其作⽤应该是为了保存state信息和snapshot信息,state信息包括currentTerm,votedFor,log。只有这三者需要被持久化存储,log是唯⼀记录了应⽤程序状态的地⽅,其中存储的⼀系列操作是唯⼀能在断电重启之后⽤来重建应⽤程序状态的信息;votedfor和currenterm是为了保证每个任期最多只有⼀个leader。其他的状态,例如lastApplied和commitIndex都可以通过leader和follower之间的交流来重新获得。
【snapshot】
每个server会⾃⼰创建⾃⼰的snapshot,也会接受并install leader发送的snapshot(这发⽣在⽇志同步的时候nextIndex<=ssLastshot时)。只有leader 可以让其他server install⾃⼰的snapshot,这和只有l
eader可以让其他server appendEntries⼀样,因此,发送处理和接收处理之前都必须check发送⽅的leader⾝份,并且可以以此来代替加锁。
收到installSnapshot和收到AppendEntries类似,都需要有检查leader⾝份,确认⾃⼰follower⾝份和reset election timer等操作。将得到的snapshot发送到applyCh即可。
假死问题:由于⽹络原因导致的⼼跳超时,认为leader已死,但其实leader还活着。
脑裂问题:指的是分布式集系统中由于⽹络故障等原因,选举出了两个leader,集分裂成两个集。出现脑裂问题的原因是分布式算法中没有考虑过半机制。脑裂问题对分布式系统是致命的,两个集同时对外提供服务,会出现各种不⼀致问题,如果两个集突然可以联通了,将不得不⾯对数据合并、数据冲突的解决等问题。
为了解决脑裂问题,通常有四种做法:
1. zookeeper和raft中使⽤的过半原则;
2. 添加⼼跳线。集中采取多种通信⽅式,防⽌⼀种通信⽅式失效导致集中的节点⽆法通信,⽐如原来只有⼀条⼼跳线路,此时若断开,则判
断对⽅已死亡,若有两条⼼跳线,⼀条断开,另⼀条仍然可以收发⼼跳,保证集服务正常运⾏,备⽤线路与主线路可以互相监测,正常情况下备⽤线路为了节约资源⽽不起作。
3. 使⽤磁盘锁的形式,保证集中只能有⼀个Leader获取磁盘锁,对外提供服务,避免数据错乱发⽣。但是,也会存在⼀个问题,若该Leader节
点宕机,则不能主动释放锁,那么其他的Follower就永远获取不了共享资源。于是有⼈在HA中设计了"智能"锁。正在服务的⼀⽅只有在发现⼼跳线全部断开(察觉不到对端)时才启⽤磁盘锁。平时就不上锁了。
4. 仲裁机制。⽐如提供⼀个参考的IP地址,⼼跳机制断开时,节点各⾃ping⼀下参考IP,如果ping不通,那么表⽰该节点⽹络已经出现问题,则该
节点需要⾃⾏退出争抢资源,释放占有的共享资源,将服务的提供功能让给功能更全⾯的节点。
过半原则:根据鸽巢原理,raft中任意⼀个操作都需要过半的服务器的认同,这样能保证始终只有⼀个leader。此外,服务器通常选择奇数台机器部署,这样可以⽤较少的机器实现相同的集容忍度。
快速领导者选举算法:在选举的过程中进⾏过半验证,这样不需要等待所有server都认同,速度⽐较快。
Lab3 KV-raft
在此,从⼀个⽐lab2更⾼层次的⾓度看待分布式系统。lab2中的raft是⽤于机器之间互相沟通形成⼀致的log和state,但机器之间并不关⼼log中存储的command是什么,因此全部使⽤interface{}作为command的接⼝。lab3中,我们要实现的是client调⽤Get()、Append()、Put(),server通过raft达成集内的⼀致,然后将raft apply的command正式执⾏。raft系统在这⼀过程中,只起到了⼀致性的作⽤,是命令的被调⽤和真正执⾏之间的⼀层。
这⾥需要注意的是线性⼀致性,为了实现这⼀点,给command递增的index(由raft调⽤start后返回),使⽤⼀个map记录每个client最近最后⼀个被执⾏command的index以及执⾏结果,由此可以推测出command序列执⾏到哪⼀条了,防⽌重复执⾏。
另外,由于raft系统在start和apply之间需要⼀定的时间,因此,客户端调⽤读写函数,读写函数调⽤start通知raft集之后,注册⼀个index对应的待相应result channel,存储在以index为key的map中。当raft系统达成⼀致,apply这条命令的时候,从apply函数调⽤真正的读写过程,执⾏结果push到index对应的channel中。于是,客户端调⽤的读写函数只需要直接去result channel中取出这条命令的执⾏结果。这样做⾮常的简洁流畅,⽤channel阻塞的时间来等待raft系统⼀致、apply执⾏读写。
关于start和apply之间leader被更换的讨论:
⼀条command,在其start和apply中间,可能raft系统已经更换了leader,对于新的leader来说,它没有为这条command创建channel(start不是通过新leader进⾏的),试图将result放⼊channel的时候会失败,导致直接返回。然⽽,旧leader虽然降级为follower,但仍然会对这条apply,因此即使更换leader也没关系,但需要注意的是,从channel取出result的时候,就不必判断这个机器是不是leader了,只要在start的时候判断了就可以了。
关于command的index是否会发⽣变化的讨论:
command的index是由start调⽤的时候,leader的log中当前log最后⼀条entry的index+1决定的顺序index,如果这条command的entry被覆盖,那就会超时,client将更换server重新执⾏,如果没有被覆盖,将会保持这个index。
如果command的entry被覆盖了,且这个index对应的map中仍然有channel在等待答案(发⽣于leader降级,被新的leader清除了index对应位置,并且没有覆盖,leader⼜当选为leader,并建⽴了新的index位置),那么将会发⽣不匹配,因此,应该在从channel中取出result的时候检查op是否是在等待的那个。
如果op正好与在等待的那个⼀致,但是seq⼜不是那个呢?没有关系,只要执⾏内容⼀致就可以了。client中等待之后那条op结果的timer会超时,重新执⾏之后那条op。
B部分是压缩,kv中有⼀个变量maxraftState限制了log的长度,若即将超过这个长度,就对log进⾏压缩。同时,kv的data和peocessed也应该被持久化存储。
此外,LAB3可以使⽤init函数完成logger注册,并记录。当然,这不是必需的。
关于golang中的init函数
golang⾥的main函数是程序的⼊⼝函数,main函数返回后,程序也就结束了。golang还有另外⼀个特殊的函数init函数,先于main函数执⾏,实现包级别的⼀些初始化操作。
init函数的主要作⽤:
初始化不能采⽤初始化表达式初始化的变量。
程序运⾏前的注册。
实现sync.Once功能。
其他
init函数的主要特点:
init函数先于main函数⾃动执⾏,不能被其他函数调⽤;
init函数没有输⼊参数、返回值;
每个包可以有多个init函数;
包的每个源⽂件也可以有多个init函数,这点⽐较特殊;
同⼀个包的init执⾏顺序,golang没有明确定义,编程时要注意程序不要依赖这个执⾏顺序。
不同包的init函数按照包导⼊的依赖关系决定执⾏顺序。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。