概述
目录
- 前言
- 一、Key/value Service
- 1.1 流程图
- 二、client
- 1.1 arguments
- 1.2 client属性
- 1.3 client的Get/PutAppend请求
- 三、Server
- 1.1 server中的Operation Op
- 1.2 server的属性
- 1.3 kv.Get
- 1.4 kv.PutAppend
- 1.5 kv.getChannel
- 1.6 StartKVServer
- 1.7 kv.serverMonitor
- 总结
前言
做2020的MIT6.824,完成了实验Raft Lab3A,通过了测试,对于之前的Raft实现的实验请参考Raft Lab 2A, Raft Lab 2B 以及 Raft Lab 2C
Lab3A主要是实现应用层的数据储存,通过之前已经实现Raft的帮助,实现strong consistent. 具体来说客户端Clerk给kvserver发送Get/Put/Append的请求,kvserver associate Raft的一个peer. 实验重点实现key/value service,对应就是server.go以及client.go。
一、Key/value Service
1.1 流程图
大概流程就是
- client给kvserver发送请求,一共有两个API,分别是Get和PutAppend
- kvserver收到请求,向Raft发送Start,并等待Raft发送过来的applyCh(如果此时Raft peer已经不是leader,返回error)
- 收到applyCh后,处理applyCh返回的Command的请求,比如是Put则把kv存到kvserver的map中,最后通过chan通知第2步等待的server
- 收到第3步的channel后处理对应的信息,并返回client
当然,这是个非常简化的流程图,中间涉及很多细节后面会讨论,Get跟PutAppend流程类似,这里就画了一张PutAppend的超简流程图先宏观上了解一下
二、client
1.1 arguments
在common.go中,添加了两个属性,sequenceId跟clientId,这主要为了在parallel中实现Linearizable,也就是保证每个operation是按顺序执行
You will need to uniquely identify client operations to ensure that the key/value service executes each one just once.
type PutAppendArgs struct {
Key string
Value string
Op string // "Put" or "Append"
// You'll have to add definitions here.
// Field names must start with capital letters,
// otherwise RPC will break.
SequenceId int64
ClientId int64
}
type GetArgs struct {
Key string
// You'll have to add definitions here.
SequenceId int64
ClientId int64
}
1.2 client属性
按照提示,在client中,同时需要记录leaderId
You will probably have to modify your Clerk to remember which server turned out to be the leader for the last RPC, and send the next RPC to that server first. This will avoid wasting time searching for the leader on every RPC, which may help you pass some of the tests quickly enough.
type Clerk struct {
servers []*labrpc.ClientEnd
// You will have to modify this struct.
mu sync.Mutex
leaderId int
clientId int64
sequenceId int64
}
1.3 client的Get/PutAppend请求
就是给server发请求,如果leader不是client.leader则leaderId++重新请求,如果 reply.Err == OK则为成功,同时return
func (ck *Clerk) Get(key string) string {
// You will have to modify this function.
args := GetArgs{Key: key, ClientId: ck.clientId, SequenceId: atomic.AddInt64(&ck.sequenceId, 1)}
leaderId := ck.currentLeader()
for {
reply := GetReply{}
if ck.servers[leaderId].Call("KVServer.Get", &args, &reply) {
if reply.Err == OK {
return reply.Value
} else if reply.Err == ErrNoKey {
return ""
}
}
leaderId = ck.changeLeader()
time.Sleep(1 * time.Millisecond)
}
}
func (ck *Clerk) PutAppend(key string, value string, op string) {
args := PutAppendArgs{
// You will have to modify this function.
Key: key,
Value: value,
Op: op,
SequenceId: atomic.AddInt64(&ck.sequenceId, 1),
ClientId: ck.clientId,
}
leaderId := ck.currentLeader()
for {
reply := PutAppendReply{}
if ck.servers[leaderId].Call("KVServer.PutAppend", &args, &reply) {
if reply.Err == OK {
break
}
}
leaderId = ck.changeLeader()
time.Sleep(1 * time.Millisecond)
}
}
三、Server
1.1 server中的Operation Op
– OpId是client的sequenceId
– index 是Raft.start传过来的
– Term是Raft的Term,用来对比Raft.start.term跟applyCh.command.term的,防止leadership丢失用的
type Op struct {
// Your definitions here.
// Field names must start with capital letters,
// otherwise RPC will break.
OpId int64
ClientId int64
Index int
Term int
OpType string
OpKey string
OpValue string
}
1.2 server的属性
主要添加了3个mapper,sequenceMapper用于确保operation只执行一次,requestMapper 用于传递Raft.start之后Raft.applyCh传过来的command,kvStore就是保存每次client perisit的key/value了
type KVServer struct {
mu sync.Mutex
me int
rf *raft.Raft
applyCh chan raft.ApplyMsg
dead int32 // set by Kill()
maxraftstate int // snapshot if log grows this big
// Your definitions here.
sequenceMapper map[int64]int64
requestMapper map[int]chan Op
kvStore map[string]string
}
1.3 kv.Get
- Get 跟PutAppend的逻辑类似,就是先Raft.start,把需要consistent的信息,也就是kvserver的Operation发给Raft
- getChannel就是获取requestMapper中对应index的channel,这个后面会细讲
- 之后等待raft的applyCh,kv收到applyCh会通知,再发Operation到第2步的channel(由serverMonitor来完成,会面会细讲),完成信息的处理
func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
// Your code here.
var isLeader bool
clientOp := Op{OpType: "Get", OpKey: args.Key, OpId: args.SequenceId, ClientId: args.ClientId}
clientOp.Index, clientOp.Term, isLeader = kv.rf.Start(clientOp)
if !isLeader {
reply.Err = ErrWrongLeader
return
}
ch := kv.getChannel(clientOp.Index)
defer func() {
kv.mu.Lock()
delete(kv.requestMapper, clientOp.Index)
kv.mu.Unlock()
}()
timer := time.NewTicker(500 * time.Millisecond)
defer timer.Stop()
select {
case op := <-ch:
kv.mu.Lock()
opTerm := op.Term
kv.mu.Unlock()
if clientOp.Term != opTerm {
reply.Err = ErrWrongLeader
return
} else {
reply.Err = OK
kv.mu.Lock()
reply.Value = kv.kvStore[args.Key]
kv.mu.Unlock()
return
}
case <-timer.C:
reply.Err = ErrWrongLeader
}
}
1.4 kv.PutAppend
这部分逻辑跟Get类似
func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
// Your code here.
var isLeader bool
clientOp := Op{OpType: args.Op, OpKey: args.Key, OpValue: args.Value, OpId: args.SequenceId, ClientId: args.ClientId}
clientOp.Index, clientOp.Term, isLeader = kv.rf.Start(clientOp)
if !isLeader {
reply.Err = ErrWrongLeader
return
}
// leader is found
ch := kv.getChannel(clientOp.Index)
defer func() {
kv.mu.Lock()
delete(kv.requestMapper, clientOp.Index)
kv.mu.Unlock()
}()
timer := time.NewTicker(500 * time.Millisecond)
defer timer.Stop()
select {
case op := <-ch:
kv.mu.Lock()
opTerm := op.Term
kv.mu.Unlock()
if clientOp.Term != opTerm {
reply.Err = ErrWrongLeader
} else {
reply.Err = OK
}
case <-timer.C:
reply.Err = ErrWrongLeader
}
}
1.5 kv.getChannel
这个属于helper function,get/putAppend会用到,逻辑就是如果requestMapper有对应index的Operation channel则返回该channel,否则创建新的channel然后返回
func (kv *KVServer) getChannel(index int) chan Op {
kv.mu.Lock()
defer kv.mu.Unlock()
ch, ok := kv.requestMapper[index]
if !ok {
ch = make(chan Op, 1)
kv.requestMapper[index] = ch
}
return ch
}
1.6 StartKVServer
这个就输入初始化kvServer的,然后单独开一个线程来监视Raft传过来的applyCh
func StartKVServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int) *KVServer {
labgob.Register(Op{})
kv := new(KVServer)
kv.me = me
kv.maxraftstate = maxraftstate
kv.applyCh = make(chan raft.ApplyMsg)
kv.rf = raft.Make(servers, me, persister, kv.applyCh)
kv.sequenceMapper = make(map[int64]int64)
kv.requestMapper = make(map[int]chan Op)
kv.kvStore = make(map[string]string)
go kv.serverMonitor()
return kv
}
1.7 kv.serverMonitor
这个就是处理raft一个kvServe的消息传输的,当raft发送一个applyCh,根据传过来的command,做相应的处理,最后都需要通过channel把Command传过来的Operation发给对应的channel
需要注意的是,如果传过来的applyCh对应的OperationId,也就是SequenceNumber,如果sequenceMapper存在的话,要确保applyCh的OperationId > sequenceMapper对应的sequenceNumber,否则kvStore不对数据做处理
这个的channel操作其实是这样的
当kvServer收到applyCh,serverMonitor把applyCh的内容封装一下成为Op,再发送给对应index的channel
func (kv *KVServer) serverMonitor() {
for {
if kv.killed() {
return
}
select {
case msg := <-kv.applyCh:
index := msg.CommandIndex
term := msg.CommandTerm
op := msg.Command.(Op)
kv.mu.Lock()
sequenceInMapper, hasSequence := kv.sequenceMapper[op.ClientId]
op.Term = term
if !hasSequence || op.OpId > sequenceInMapper {
switch op.OpType {
case "Put":
kv.kvStore[op.OpKey] = op.OpValue
case "Append":
kv.kvStore[op.OpKey] += op.OpValue
}
kv.sequenceMapper[op.ClientId] = op.OpId
}
kv.mu.Unlock()
// send message to op chan
kv.getChannel(index) <- op
}
}
}
总结
这个实验需要先了解整个流程,刚刚开始需要多用DPrintf,理解清楚Client, Server以及Raft三者的关系。先把程序Run起来,再考虑容错处理。需要思考清楚的一个设计的点就是,当kvServer开始调用Start之后,是需要等待Raft的applyCh confirm的,要设计出一旦kvServer收到applyCh就能通知对应的kvServer执行后续操作的架构,模仿作者使用channel是一个不错的选择,正好也使用了go语言的特性。
最后
以上就是贤惠冬天为你收集整理的2020 6.824 的 Raft Lab 3A前言一、Key/value Service二、client三、Server总结的全部内容,希望文章能够帮你解决2020 6.824 的 Raft Lab 3A前言一、Key/value Service二、client三、Server总结所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复