目录
- 前言
- 一、Key/value Service
- 1.1 流程图
- 二、client
- 1.1 arguments
- 1.2 client属性
- 1.3 client的Get/PutAppend请求
- 三、Server
- 1.1 server中的Operation Op
- 1.2 server的属性
- 1.3 kv.Get
- 1.4 kv.PutAppend
- 1.5 kv.getChannel
- 1.6 StartKVServer
- 1.7 kv.serverMonitor
- 总结
前言
做2020的MIT6.824,完成了实验Raft Lab3A,通过了测试,对于之前的Raft实现的实验请参考Raft Lab 2A, Raft Lab 2B 以及 Raft Lab 2C
Lab3A主要是实现应用层的数据储存,通过之前已经实现Raft的帮助,实现strong consistent. 具体来说客户端Clerk给kvserver发送Get/Put/Append的请求,kvserver associate Raft的一个peer. 实验重点实现key/value service,对应就是server.go以及client.go。
一、Key/value Service
1.1 流程图
大概流程就是
- client给kvserver发送请求,一共有两个API,分别是Get和PutAppend
- kvserver收到请求,向Raft发送Start,并等待Raft发送过来的applyCh(如果此时Raft peer已经不是leader,返回error)
- 收到applyCh后,处理applyCh返回的Command的请求,比如是Put则把kv存到kvserver的map中,最后通过chan通知第2步等待的server
- 收到第3步的channel后处理对应的信息,并返回client
当然,这是个非常简化的流程图,中间涉及很多细节后面会讨论,Get跟PutAppend流程类似,这里就画了一张PutAppend的超简流程图先宏观上了解一下
二、client
1.1 arguments
在common.go中,添加了两个属性,sequenceId跟clientId,这主要为了在parallel中实现Linearizable,也就是保证每个operation是按顺序执行
You will need to uniquely identify client operations to ensure that the key/value service executes each one just once.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18type PutAppendArgs struct { Key string Value string Op string // "Put" or "Append" // You'll have to add definitions here. // Field names must start with capital letters, // otherwise RPC will break. SequenceId int64 ClientId int64 } type GetArgs struct { Key string // You'll have to add definitions here. SequenceId int64 ClientId int64 }
1.2 client属性
按照提示,在client中,同时需要记录leaderId
You will probably have to modify your Clerk to remember which server turned out to be the leader for the last RPC, and send the next RPC to that server first. This will avoid wasting time searching for the leader on every RPC, which may help you pass some of the tests quickly enough.
1
2
3
4
5
6
7
8
9type Clerk struct { servers []*labrpc.ClientEnd // You will have to modify this struct. mu sync.Mutex leaderId int clientId int64 sequenceId int64 }
1.3 client的Get/PutAppend请求
就是给server发请求,如果leader不是client.leader则leaderId++重新请求,如果 reply.Err == OK则为成功,同时return
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40func (ck *Clerk) Get(key string) string { // You will have to modify this function. args := GetArgs{Key: key, ClientId: ck.clientId, SequenceId: atomic.AddInt64(&ck.sequenceId, 1)} leaderId := ck.currentLeader() for { reply := GetReply{} if ck.servers[leaderId].Call("KVServer.Get", &args, &reply) { if reply.Err == OK { return reply.Value } else if reply.Err == ErrNoKey { return "" } } leaderId = ck.changeLeader() time.Sleep(1 * time.Millisecond) } } func (ck *Clerk) PutAppend(key string, value string, op string) { args := PutAppendArgs{ // You will have to modify this function. Key: key, Value: value, Op: op, SequenceId: atomic.AddInt64(&ck.sequenceId, 1), ClientId: ck.clientId, } leaderId := ck.currentLeader() for { reply := PutAppendReply{} if ck.servers[leaderId].Call("KVServer.PutAppend", &args, &reply) { if reply.Err == OK { break } } leaderId = ck.changeLeader() time.Sleep(1 * time.Millisecond) } }
三、Server
1.1 server中的Operation Op
– OpId是client的sequenceId
– index 是Raft.start传过来的
– Term是Raft的Term,用来对比Raft.start.term跟applyCh.command.term的,防止leadership丢失用的
1
2
3
4
5
6
7
8
9
10
11
12
13type Op struct { // Your definitions here. // Field names must start with capital letters, // otherwise RPC will break. OpId int64 ClientId int64 Index int Term int OpType string OpKey string OpValue string }
1.2 server的属性
主要添加了3个mapper,sequenceMapper用于确保operation只执行一次,requestMapper 用于传递Raft.start之后Raft.applyCh传过来的command,kvStore就是保存每次client perisit的key/value了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15type KVServer struct { mu sync.Mutex me int rf *raft.Raft applyCh chan raft.ApplyMsg dead int32 // set by Kill() maxraftstate int // snapshot if log grows this big // Your definitions here. sequenceMapper map[int64]int64 requestMapper map[int]chan Op kvStore map[string]string }
1.3 kv.Get
- Get 跟PutAppend的逻辑类似,就是先Raft.start,把需要consistent的信息,也就是kvserver的Operation发给Raft
- getChannel就是获取requestMapper中对应index的channel,这个后面会细讲
- 之后等待raft的applyCh,kv收到applyCh会通知,再发Operation到第2步的channel(由serverMonitor来完成,会面会细讲),完成信息的处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42func (kv *KVServer) Get(args *GetArgs, reply *GetReply) { // Your code here. var isLeader bool clientOp := Op{OpType: "Get", OpKey: args.Key, OpId: args.SequenceId, ClientId: args.ClientId} clientOp.Index, clientOp.Term, isLeader = kv.rf.Start(clientOp) if !isLeader { reply.Err = ErrWrongLeader return } ch := kv.getChannel(clientOp.Index) defer func() { kv.mu.Lock() delete(kv.requestMapper, clientOp.Index) kv.mu.Unlock() }() timer := time.NewTicker(500 * time.Millisecond) defer timer.Stop() select { case op := <-ch: kv.mu.Lock() opTerm := op.Term kv.mu.Unlock() if clientOp.Term != opTerm { reply.Err = ErrWrongLeader return } else { reply.Err = OK kv.mu.Lock() reply.Value = kv.kvStore[args.Key] kv.mu.Unlock() return } case <-timer.C: reply.Err = ErrWrongLeader } }
1.4 kv.PutAppend
这部分逻辑跟Get类似
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) { // Your code here. var isLeader bool clientOp := Op{OpType: args.Op, OpKey: args.Key, OpValue: args.Value, OpId: args.SequenceId, ClientId: args.ClientId} clientOp.Index, clientOp.Term, isLeader = kv.rf.Start(clientOp) if !isLeader { reply.Err = ErrWrongLeader return } // leader is found ch := kv.getChannel(clientOp.Index) defer func() { kv.mu.Lock() delete(kv.requestMapper, clientOp.Index) kv.mu.Unlock() }() timer := time.NewTicker(500 * time.Millisecond) defer timer.Stop() select { case op := <-ch: kv.mu.Lock() opTerm := op.Term kv.mu.Unlock() if clientOp.Term != opTerm { reply.Err = ErrWrongLeader } else { reply.Err = OK } case <-timer.C: reply.Err = ErrWrongLeader } }
1.5 kv.getChannel
这个属于helper function,get/putAppend会用到,逻辑就是如果requestMapper有对应index的Operation channel则返回该channel,否则创建新的channel然后返回
1
2
3
4
5
6
7
8
9
10
11func (kv *KVServer) getChannel(index int) chan Op { kv.mu.Lock() defer kv.mu.Unlock() ch, ok := kv.requestMapper[index] if !ok { ch = make(chan Op, 1) kv.requestMapper[index] = ch } return ch }
1.6 StartKVServer
这个就输入初始化kvServer的,然后单独开一个线程来监视Raft传过来的applyCh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19func StartKVServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int) *KVServer { labgob.Register(Op{}) kv := new(KVServer) kv.me = me kv.maxraftstate = maxraftstate kv.applyCh = make(chan raft.ApplyMsg) kv.rf = raft.Make(servers, me, persister, kv.applyCh) kv.sequenceMapper = make(map[int64]int64) kv.requestMapper = make(map[int]chan Op) kv.kvStore = make(map[string]string) go kv.serverMonitor() return kv }
1.7 kv.serverMonitor
这个就是处理raft一个kvServe的消息传输的,当raft发送一个applyCh,根据传过来的command,做相应的处理,最后都需要通过channel把Command传过来的Operation发给对应的channel
需要注意的是,如果传过来的applyCh对应的OperationId,也就是SequenceNumber,如果sequenceMapper存在的话,要确保applyCh的OperationId > sequenceMapper对应的sequenceNumber,否则kvStore不对数据做处理
这个的channel操作其实是这样的
当kvServer收到applyCh,serverMonitor把applyCh的内容封装一下成为Op,再发送给对应index的channel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29func (kv *KVServer) serverMonitor() { for { if kv.killed() { return } select { case msg := <-kv.applyCh: index := msg.CommandIndex term := msg.CommandTerm op := msg.Command.(Op) kv.mu.Lock() sequenceInMapper, hasSequence := kv.sequenceMapper[op.ClientId] op.Term = term if !hasSequence || op.OpId > sequenceInMapper { switch op.OpType { case "Put": kv.kvStore[op.OpKey] = op.OpValue case "Append": kv.kvStore[op.OpKey] += op.OpValue } kv.sequenceMapper[op.ClientId] = op.OpId } kv.mu.Unlock() // send message to op chan kv.getChannel(index) <- op } } }
总结
这个实验需要先了解整个流程,刚刚开始需要多用DPrintf,理解清楚Client, Server以及Raft三者的关系。先把程序Run起来,再考虑容错处理。需要思考清楚的一个设计的点就是,当kvServer开始调用Start之后,是需要等待Raft的applyCh confirm的,要设计出一旦kvServer收到applyCh就能通知对应的kvServer执行后续操作的架构,模仿作者使用channel是一个不错的选择,正好也使用了go语言的特性。
最后
以上就是贤惠冬天最近收集整理的关于2020 6.824 的 Raft Lab 3A前言一、Key/value Service二、client三、Server总结的全部内容,更多相关2020内容请搜索靠谱客的其他文章。
发表评论 取消回复