概述
lab 4要求实现一个使用raft共识算法的分片存储系统。而在lab 4a中,我们仅需实现分片控制器的功能。
lab4中的分片存储系统有多个分片,这些数据分布在众多分片上,而每个分片都对应着一个集群组,可能一个集群组负责多个分片,一个集群组内的所有服务器都会使用raft共识算法进行状态的复制。
介绍
而本实验的重点为分片控制器,主要负责的工作是对集群组和分片之间关系的管理,我们仅需在/src/shardctrler文件夹中对client.go,server.go,common.go这三个文件进行修改即可。
go test
注意事项(实验教程中的Hint)
1、本实验的开始部分就如同kvraft的server那部分,可以直接借鉴照搬过来。
2、同样需要设计重复请求侦查机制,避免一条请求执行两遍造成错误。
3、分片控制器在有集群组的加入或者离开的情况下,由于分片负载平衡的需要,后续还要进行分片和集群组之间的再分配,确保分片均匀分配给这些集群组,以及尽量避免分片和集群组对应关系的变动。但是这个执行过程要求具有确定性的。因为分片控制器也使用了raft共识算法来进行冗余复制,Leader和follower各自在执行join,leave,move操作后,分片再分配后每个分片对应的集群组在每个backup上必须是相同的,因此要求这个执行过程具备确定性。而我们需要特别注意的就是,在Go里面map的遍历是不具有确定性的,每次遍历时,遍历的顺序都会发生变化。
4、Go中map数据类型是引用类型,因此如果想要进行深拷贝,需要避免通过赋值的方式,而是应该采取遍历的方式,将map中的key、value对一个个拷贝到新的map中。
5、-race同样可以帮你进行侦测数据竞争的问题,帮你找到bug。
本次实验内容
公共部分和客户端部分
1、设计重复请求侦查机制,加入id来标识每个rpc请求
服务端部分
1、完善ShardCtrler结构体
2、完善Op结构体
3、完善Join、Leave、Move、Query函数
4、设计executeThread函数
5、完善StartServers函数
代码阶段
公共部分和客户端部分
由于需要设计重复请求侦查机制,我们参考lab3中的设计方式,给每个请求都添加一个int64类型的id来标识。
因此,照搬lab3当时的方法,给common.go中每个Args结构体中添加一个int64类型Id,在client.go中每个请求操作的函数中,应当在rpc的args的结构体中给Id赋值。
代码如下所示,代码仅取部分展示,因为都是一样的,新增一条简单代码而已:
type JoinArgs struct {
Servers map[int][]string // new GID -> servers mappings
Id int64
}
func (ck *Clerk) Query(num int) Config {
args := &QueryArgs{}
// Your code here.
args.Num = num
args.Id = nrand()
for {
// try each known server.
for _, srv := range ck.servers {
var reply QueryReply
ok := srv.Call("ShardCtrler.Query", args, &reply)
if ok && reply.WrongLeader == false {
return reply.Config
}
}
time.Sleep(100 * time.Millisecond)
}
}
服务端部分
完善ShardCtrler结构体
由于lab4a和lab3做的事情十分相似。因此,照搬lab3中的设计。server需要等待共识完成后才能返回rpc调用结果给client,因此加入了条件变量。同时,重复请求侦查机制也是会将近期处理的请求的id记录下来,并将query的结果保存下来,因此新增了commandLog和configLog变量,分别用来记录近期处理的请求的id和近期的query请求的返回值。以上设计原理都和lab3一模一样。
type ShardCtrler struct {
mu sync.Mutex
me int
rf *raft.Raft
applyCh chan raft.ApplyMsg
// Your data here.
cond *sync.Cond
commandLog []int64
configLog map[int64]Config
configs []Config // indexed by config num
}
完善Op结构体
Op结构体主要用于承载各个操作的信息,因此仅需把各个操作所需的信息打包到这个Op结构体中即可。
Servers对应Join操作的参数
GIDs对应Leave操作的参数
Shard和GID对应Move操作的参数
Num对应Query操作的参数
type Op struct {
// Your data here.
Operation string
Id int64
Servers map[int][]string
GIDs []int
Shard int
GID int
Num int
}
完善Join、Leave、Move、Query函数
这些函数的设计模式类同lab3中的PutAppend那些函数。
(1)检查该请求为重复请求,若是则直接返回结果
(2)封装请求的信息到Op中,并调用raft算法的接口进行共识
(3)若发现本服务器不是Leader则返回错误给client
(4)Join、Leave操作检查一下参数是否正确,例如传来的Join的GID本来就已经有了,或者是传来的Leave的GID不存在等,这样的正确性检查。这一步骤在实验这里并没有必要。
(5)检查请求是否完成共识并执行,若没有则进入阻塞等待请求完成被唤醒,以及设定一个计时器避免长时间等待。
(6)请求成功执行,Join、Leave、Move操作则直接返回,Query操作返回查询到的config。
代码仅展示Join和Query函数的代码,因为Join、Leave、Move函数基本一致,Query也就只是基本相同,也就是多了要返回查询到的config而已。
func (sc *ShardCtrler) Join(args *JoinArgs, reply *JoinReply) {
// Your code here.
sc.cond.L.Lock()
for _, v := range sc.commandLog {
if args.Id == v {
sc.cond.L.Unlock()
return
}
}
// _, isLeader := sc.rf.GetState()
command := Op{Operation: "Join", Id: args.Id, Servers: args.Servers}
sc.cond.L.Unlock()
_, term, isLeader := sc.rf.Start(command)
if !isLeader {
reply.WrongLeader = true
reply.Err = "Not leader"
return
}
log.Printf(" server%d recv Join:%v id:%v", sc.me, args.Servers, args.Id)
sc.cond.L.Lock()
for key := range args.Servers {
_, ok := sc.configs[len(sc.configs)-1].Groups[key]
if ok {
reply.Err = "duplicate key"
sc.cond.L.Unlock()
return
}
}
var timerController bool = true
var timeoutTime = 0
for {
for _, v := range sc.commandLog {
if args.Id == v {
timerController = false
sc.cond.L.Unlock()
log.Printf("Success: Join:%v id:%v", args.Servers, args.Id)
return
}
}
nowTerm, isLeader := sc.rf.GetState()
if nowTerm != term || !isLeader || timeoutTime == 3 {
timerController = false
reply.WrongLeader = !isLeader
reply.Err = "Consensus error"
sc.cond.L.Unlock()
return
}
log.Printf("Join:%v id:%v waiting", args.Servers, args.Id)
go func() {
time.Sleep(time.Duration(100) * time.Millisecond)
sc.cond.L.Lock()
if timerController {
timeoutTime++
sc.cond.L.Unlock()
sc.cond.Broadcast()
} else {
sc.cond.L.Unlock()
return
}
}()
sc.cond.Wait()
}
}
func (sc *ShardCtrler) Query(args *QueryArgs, reply *QueryReply) {
// Your code here.
sc.cond.L.Lock()
result, ok := sc.configLog[args.Id]
if ok {
reply.Config = result
sc.cond.L.Unlock()
return
}
command := Op{Operation: "Query", Id: args.Id, Num: args.Num}
sc.cond.L.Unlock()
_, term, isLeader := sc.rf.Start(command)
if !isLeader {
reply.WrongLeader = true
reply.Err = "Not Leader"
return
}
log.Printf(" server%d recv Query:%v id:%v", sc.me, args.Num, args.Id)
sc.cond.L.Lock()
var timerController bool = true
var timeoutTime = 0
for {
result, ok = sc.configLog[args.Id]
if ok {
reply.Config = result
timerController = false
sc.cond.L.Unlock()
log.Printf("Success: Query:%v config:%v id:%v", args.Num, result, args.Id)
return
}
nowTerm, isLeader := sc.rf.GetState()
if nowTerm != term || !isLeader || timeoutTime == 3 {
timerController = false
reply.WrongLeader = !isLeader
reply.Err = "Consensus error"
sc.cond.L.Unlock()
return
}
log.Printf("Query:%v id:%v waiting", args.Num, args.Id)
go func() {
time.Sleep(time.Duration(100) * time.Millisecond)
sc.cond.L.Lock()
if timerController {
timeoutTime++
sc.cond.L.Unlock()
sc.cond.Broadcast()
} else {
sc.cond.L.Unlock()
return
}
}()
sc.cond.Wait()
}
}
设计executeThread函数
本函数的关键在于处理Join、Leave、Move函数中的分片负载均衡。
注意:为了解决遍历map具有不确定性的问题,此处在遍历map的方法改为将map中的数据遍历取出到切片中,并对切片进行排序,再对排序后的切片进行遍历,这样相当于有序遍历map中的元素。
Join函数和Leave函数给现有的集群组新加入几个组或减少几个组,由于组数发生了变化,分片固定为10个,那么就要重新分配这些分片,并确保分配结果尽可能的均衡,避免个别组的负担过大。同时还尽量少的改动分片的位置,因为一个分片对应的组发生了变化,那么这个分片的数据也需要从上个组迁移到新的组中,我们需要尽量减少这种开销巨大的数据迁移。因此以下问题就是我们就是Join和Leave函数需要解决的。
如何做到分片的分配尽可能均衡,以及分配过程中尽可能少改动分片的位置?
尽可能均衡就是让这些分片平分到每个组上,如果无法平分,再将多出来的那些分片,每个都各自分配给一个组。举例说明:10个分片,4个组,首先每个组分2个分片,多出来2个分片,这两个分片,各自分配给一个组,最终结果就是4个组上的分片数量分别为:3、3、2、2。
因此,如果有NShard个分片,有groupSize个组,首先每个组分avgNum个分片,avgNum=NShard / groupSize。多出来moreNum个分片,moreNum=NShard % groupSize
自然也就会有moreNum个组有avgNum+1个分片,groupSize-moreNum个组有avgNum个分片,这就是均衡分配的必然的结果。
以上方式解决了分片的均衡分配,后续解决尽可能少改动分片位置的问题。
首先我们已知原来的每个组上的分片数量,再分两种情况讨论:
1、组的数量增多,对应Join函数。那么每个组上的分片数量要么不变,要么减少。均衡分配要求有moreNum个组有avgNum+1个分片,先将这moreNum个名额分配给分片最多的组,让它们的分片数量降为avgNum+1个,减少的那些分片就分配给Join的新组。接下来就是把那些分片数量超过avgNum个的组上的分片数量减下来分配给Join的新组。
2、组的数量减少,对应Leave函数。那么每个组上的分片数量要么不变,要么增多。被踢出的组上的分片需要分配到剩余的组中,已知要有moreNum个组有avgNum+1个分片,那么根据原来每个组上的分片数量,先分配分片造出moreNum个有avgNum+1个分片的组,后续再分配分片,再造出groupSize-1个有avgNum个分片的组。
注意:实验中,分片数量固定为NShard个,也就是10个。但是组的数量可能会超过10个,此时就是有些组压根就分不到分片。因此我们需要考虑这个情况,特别在Leave操作中,有一个组被踢出,那么就会有一个新的组上来顶替,并分配到一个分片。
Move和Query操作十分简单,没有涉及到分片的再分配。Move操作仅需修改一下配置中的Shards切片的值即可。Query操作仅需返回执行的config对象的值即可。
Join、Leave、Move操作都会生成一个新的config添加到ShardCtrler中,Query不会。因此,我们需要新建一个tempConfig,这个tempConfig是基于原来的最新的config修改而来。一般都会先将最新的config的值拷贝到tempConfig中,再对tempConfig进行修改。拷贝过程中需要注意,map是引用类型,直接赋值的话,那么两个变量会指向同一个map,tempConfig进行修改时,会影响到旧的config的值,这一点在Hint中也说过。
其余部分设计都是照搬lab3,此处也不过多赘述。
代码如下所示:
func (sc *ShardCtrler) executeThread() {
for {
var command = <-sc.applyCh
sc.cond.L.Lock()
if command.CommandValid {
var op = command.Command.(Op)
for _, v := range sc.commandLog {
if op.Id == v {
sc.cond.L.Unlock()
return
}
}
var index = len(sc.configs) - 1
var tempConfig Config
tempConfig.Groups = make(map[int][]string)
for key, value := range sc.configs[index].Groups {
tempConfig.Groups[key] = value
}
tempConfig.Shards = sc.configs[index].Shards
tempConfig.Num = len(sc.configs)
numRecord := make(map[int]int)
log.Printf("server%d execute command %v", sc.me, command)
if op.Operation == "Join" {
var gidArray = make([]int, 0)
for key, value := range op.Servers {
gidArray = append(gidArray, key)
tempConfig.Groups[key] = value
}
sort.Ints(gidArray)
var groupSize = len(tempConfig.Groups)
var avgNum = NShards / groupSize
var moreNum = NShards % groupSize
for shard, gid := range tempConfig.Shards {
if gid == 0 {
for _, id := range gidArray {
_, ok := numRecord[id]
if !ok {
tempConfig.Shards[shard] = id
numRecord[id] = 1
break
} else {
if numRecord[id] == avgNum {
if moreNum != 0 {
moreNum--
numRecord[gid]++
break
} else {
continue
}
} else if numRecord[id] < avgNum {
tempConfig.Shards[shard] = id
numRecord[id]++
break
}
}
}
} else {
_, ok := numRecord[gid]
if ok {
if numRecord[gid] == avgNum {
if moreNum != 0 {
moreNum--
numRecord[gid]++
} else {
for _, id := range gidArray {
_, ok := numRecord[id]
if !ok {
tempConfig.Shards[shard] = id
numRecord[id] = 1
} else {
if numRecord[id] == avgNum {
continue
} else {
tempConfig.Shards[shard] = id
numRecord[id]++
break
}
}
}
}
} else if numRecord[gid] < avgNum {
numRecord[gid]++
} else {
for _, id := range gidArray {
_, ok := numRecord[id]
if !ok {
tempConfig.Shards[shard] = id
numRecord[id] = 1
break
} else {
if numRecord[id] == avgNum {
if moreNum != 0 {
moreNum--
numRecord[gid]++
break
} else {
continue
}
} else if numRecord[id] < avgNum {
tempConfig.Shards[shard] = id
numRecord[id]++
break
}
}
}
}
} else {
numRecord[gid] = 1
}
}
}
} else if op.Operation == "Leave" {
for _, gid := range op.GIDs {
delete(tempConfig.Groups, gid)
}
var groupSize = len(tempConfig.Groups)
if groupSize == 0 {
for i := range tempConfig.Shards {
tempConfig.Shards[i] = 0
}
} else {
var avgNum = NShards / groupSize
var moreNum = NShards % groupSize
var invalidShard = make(chan int, NShards)
var numList = make([]int, 0)
for key := range tempConfig.Groups {
numList = append(numList, key)
}
sort.Ints(numList)
for shard, val := range tempConfig.Shards {
var isInvalid bool
for _, value := range op.GIDs {
if value == val {
invalidShard <- shard
isInvalid = true
break
}
}
if !isInvalid {
_, ok := numRecord[val]
if ok {
numRecord[val]++
} else {
numRecord[val] = 1
}
}
}
for _, gid := range numList {
if len(invalidShard) == 0 {
break
}
if numRecord[gid] > avgNum {
moreNum--
continue
} else {
if moreNum != 0 {
moreNum--
for numRecord[gid] < avgNum+1 && len(invalidShard) != 0 {
// val++
numRecord[gid]++
var shardIndex = <-invalidShard
tempConfig.Shards[shardIndex] = gid
}
} else {
for numRecord[gid] < avgNum && len(invalidShard) != 0 {
// val++
numRecord[gid]++
var shardIndex = <-invalidShard
tempConfig.Shards[shardIndex] = gid
}
}
}
}
}
} else if op.Operation == "Move" {
tempConfig.Shards[op.Shard] = op.GID
} else if op.Operation == "Query" {
if op.Num == -1 || op.Num > index {
sc.configLog[op.Id] = sc.configs[index]
} else {
sc.configLog[op.Id] = sc.configs[op.Num]
}
}
sc.appendCommandLog(op.Id)
if op.Operation == "Join" || op.Operation == "Move" || op.Operation == "Leave" {
log.Println(tempConfig)
sc.configs = append(sc.configs, tempConfig)
}
}
sc.cond.L.Unlock()
sc.cond.Broadcast()
}
}
完善StartServer函数
这个函数就是给ShardCtrler结构体中新增的对象添加初始化操作,以及开启executeThread协程。
代码如下
func StartServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister) *ShardCtrler {
sc := new(ShardCtrler)
sc.me = me
sc.configs = make([]Config, 1)
sc.configs[0].Groups = map[int][]string{}
labgob.Register(Op{})
sc.applyCh = make(chan raft.ApplyMsg)
sc.rf = raft.Make(servers, me, persister, sc.applyCh)
// Your code here.
sc.commandLog = make([]int64, 0)
sc.configLog = make(map[int64]Config)
sc.cond = sync.NewCond(&sc.mu)
go sc.executeThread()
return sc
}
实验结果图
实验过程中插桩了大量log.Printf指令,因此图片中就截取了最后的成功结果图。
最后
以上就是踏实香烟为你收集整理的MIT 6.824分布式 LAB4A:ShardCtrler介绍注意事项(实验教程中的Hint)本次实验内容代码阶段实验结果图的全部内容,希望文章能够帮你解决MIT 6.824分布式 LAB4A:ShardCtrler介绍注意事项(实验教程中的Hint)本次实验内容代码阶段实验结果图所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复