Appearance
MIT 6.824 Lab 2 实现详解
这一篇会和lab1一样,直接在一个文章中讲解整个实现,难度是比较低的。
实验要求概述
先说一下这个lab的要求。
首先就是实现一个简单的kv存储。client的方法会通过指定的rpc方式向server发送rpc请求。
server中是实际对kv的存储。在受到client的rpc请求后,会进行存储,然后返回对应的响应给client。
主要的难点在于对一个特殊情况的处理上。
在发送rpc请求后,如果没有受到相应,为了完成这个put或者get方法,应当进行持续的重试。
但是比如说第一次rpc成功发送了,server进行了成功处理了,但是在返回相应的过程中,这个rpc丢失了该怎么办?
主要就是这个问题的处理。
RPC接口定义
rpc的格式是定义好的。
错误类型定义
首先是几种固定的错误类型
go
const (
// Err's returned by server and Clerk
OK = "OK"
ErrNoKey = "ErrNoKey"
ErrVersion = "ErrVersion"
// Err returned by Clerk only
ErrMaybe = "ErrMaybe"
// For future kvraft lab
ErrWrongLeader = "ErrWrongLeader"
ErrWrongGroup = "ErrWrongGroup"
)
RPC请求和响应格式
然后是固定的rpc请求和相应格式
go
type PutArgs struct {
Key string
Value string
Version Tversion
}
type PutReply struct {
Err Err
}
type GetArgs struct {
Key string
}
type GetReply struct {
Value string
Version Tversion
Err Err
}
Client端实现
Get方法实现
然后我们就是实现Get方法。
- 首先构建一个GetArgs这个rpc请求,然后塞入key,发送出去,接收reply
- 如果结果是ok那么就直接return value version ok就行
- 如果是error no key
- 如果是其他类型的错误,那么就循环重试,直到接收到Ok或者是errNoKey相应
如下实现:
go
func (ck *Clerk) Get(key string) (string, rpc.Tversion, rpc.Err) {
args := &rpc.GetArgs{Key: key}
for {
reply := &rpc.GetReply{}
ok := ck.clnt.Call(ck.server, "KVServer.Get", args, reply)
if ok {
// 成功收到服务器回复,根据错误类型处理
if reply.Err == rpc.OK {
// 正常情况:键存在,返回值和版本号
return reply.Value, reply.Version, rpc.OK
} else if reply.Err == rpc.ErrNoKey {
// 键不存在:这是确定的结果,直接返回
return "", 0, rpc.ErrNoKey
}
}
}
}
Put方法实现
put方法只在请求中的version和服务器上key的version匹配的时候更新keyvalue。如果版本号不匹配就会返回ErrVersion错误。
但是这里就有我刚才提到的问题,就是有可能第一次发送,server成功处理了,但是在返回的过程中rpc丢失了。也就是说状态保留了,但是client并没有收到成功的回复,仍然一直充实。
这就是ErrMaybe的情况。
处理逻辑分析
所以我们的逻辑上:
- 如果第一次就成功了,那是最好
- 如果第一次就返回了ErrVersion,那么就确定put没有作用于服务器状态上,直接返回ErrVersion
如果不是第一次发送的rpc返回的结果,那么如果返回ErrVersion我们并不确认之前的rpc有没有作用于kv服务器状态上。
- 第一种情况:首次RPC成功,但是响应丢失,已经作用在服务器状态上了,那么就重发Rpc请求会收到ErrVersion。这个情况就是我们要做特别处理的
- 第二种情况:第一次rpc失败了,重发的也因为版本冲突收到了ErrVersion,聪明的你一定发现了,修改没修改服务器状态,在单一的ErrVersion处理上是完全无法分辨的。所以这里要做特殊处理
解决思路
我们的解决思路就是,分为首次调用收到回复和首次调用没有收到回复两种情况。
如果收到恢复:
- 是ok,那么就直接返回
- 如果是errversion说明版本错误了,并没有直接作用到server上,所以我们可以直接返回errversion
- 其他类型的错误也可以直接返回,这是非常顺理成章的,因为如果接收到服务端的rpc就说明client是可以确认server的状态的
然后在首次调用失败后,我们会进入一个循环重试当中:
- 如果收到回复是ok,那么是最好的情况,代表第一次rpc在发送过程中丢失了,我们重复的发送rpc请求达到了最好要作用在服务器上的目的
- 如果是ErrVersion,我们就要小心了。不知道rpc是否成功。所以我们返回ErrMaybe
- 如果是其他类型的错误可以直接确定返回,比如说ErrNoKey,说明尝试作用在server上
代码实现
以下为代码实现:
go
func (ck *Clerk) Put(key, value string, version rpc.Tversion) rpc.Err {
args := &rpc.PutArgs{Key: key, Value: value, Version: version}
reply := &rpc.PutReply{}
// 首次尝试调用
ok := ck.clnt.Call(ck.server, "KVServer.Put", args, reply)
if ok {
if reply.Err == rpc.OK {
return rpc.OK
} else if reply.Err == rpc.ErrVersion {
return rpc.ErrVersion
}
if reply.Err == rpc.ErrNoKey {
return rpc.ErrNoKey
}
return reply.Err
}
for {
reply = &rpc.PutReply{}
ok := ck.clnt.Call(ck.server, "KVServer.Put", args, reply)
if ok {
if reply.Err == rpc.OK {
return rpc.OK
} else if reply.Err == rpc.ErrVersion {
return rpc.ErrMaybe
}
if reply.Err == rpc.ErrNoKey {
return rpc.ErrNoKey
}
return reply.Err
}
}
}
这样我们的client部分就完成了。
Server端实现
然后我们在server中要完成具体的kv存储以及对rpc请求的处理作用在状态机上。
首先按照rpc的格式,应当有对应的版本号,只有版本号相同的key才能进行更新。
数据结构定义
定义一个keyValue结构体:
go
type KeyValue struct {
Value string
Version rpc.Tversion
}
用map存到KVServer结构体当中:
go
type KVServer struct {
mu sync.Mutex
data map[string]KeyValue
// Your definitions here.
}
记得在MakeKVServer中初始化。
Get方法实现
然后写一下get方法,为了解决并发问题,我们需要加锁,然后defer。
- 直接能从map中找到key,那么就返回
- 不然就返回ErrNoKey
go
func (kv *KVServer) Get(args *rpc.GetArgs, reply *rpc.GetReply) {
// Your code here.
key := args.Key
kv.mu.Lock()
defer kv.mu.Unlock()
if kvData, ok := kv.data[key]; ok {
reply.Value = kvData.Value
reply.Version = kvData.Version
reply.Err = rpc.OK
} else {
reply.Err = rpc.ErrNoKey
}
}
Put方法实现
put过程就要复杂一点,同样还是上锁。然后拿到key,value:
- 先处理复杂情况,如果想要更新的值在map当中,判断一下传入的args的version与当前这个key的version是否一致,不一致返回errversion
- 不然就更新值,增加版本号,返回ok的rpc,这个过程因为lock了,所以是线程安全的
- 不然,如果args的version是0说明是第一次put,那么就在map中创建一个对应的值
- 不然就返回errNoKey,因为如果version不是0的话,说明是一个已经存在的key,但是没有传入正确的version
go
func (kv *KVServer) Put(args *rpc.PutArgs, reply *rpc.PutReply) {
// Your code here.
kv.mu.Lock()
defer kv.mu.Unlock()
key := args.Key
value := args.Value
if kvData, ok := kv.data[key];ok{
if kvData.Version != args.Version {
reply.Err = rpc.ErrVersion
return
}else{
kvData.Value = value
kvData.Version++
kv.data[key] = kvData
reply.Err = rpc.OK
}
}else{
if args.Version == 0 {
kv.data[key] = KeyValue{Value: value, Version: 1}
reply.Err = rpc.OK
} else {
reply.Err = rpc.ErrNoKey
}
}
}
分布式锁实现
然后是要我们实现的一个Lock。这个lock是通过这一套kv服务来实现的。
结构体定义
首先就是对结构体定义的优化,这里需要一个random的id来区分不同的锁,然后要有一个储存在server上的锁的key。
go
type Lock struct {
// IKVClerk is a go interface for k/v clerks: the interface hides
// the specific Clerk type of ck but promises that ck supports
// Put and Get. The tester passes the clerk in when calling
// MakeLock().
ck kvtest.IKVClerk
lid string
key string
}
Acquire方法实现
acquire就是获取锁的流程。
首先检查锁的当前状态:
- 如果锁可用,也就是为空,那么就尝试获取
- 如果自己已经持有了这个锁,那么直接返回
- 如果被其他客户端持有了,就等待
先获取锁的当前状态,用get方法。
对于锁不存在或者锁未被持有的状态,那么直接尝试用put获取锁,key是锁的key,value是自己的id。
如果成功获取锁就返回,如果获取失败返回了ErrMaybe说明锁可能加上了,但是rpc丢失了,这里我们就要再通过get看一下是否作用到了服务端状态只是rpc丢失了。如果验证失败就继续循环获取锁,因为他还没作用到服务端状态上。
如果我们已经持有了锁,那么直接返回。
如果返回了ok,但是value不为空,说明有别的客户端持有了锁,我们就continue做等待。
go
func (lk *Lock) Acquire() {
for {
value, version, err := lk.ck.Get(lk.key)
if err == rpc.ErrNoKey || (err == rpc.OK && value == "") {
putErr := lk.ck.Put(lk.key, lk.lid, version)
if putErr == rpc.OK {
return
} else if putErr == rpc.ErrMaybe {
checkValue, _, checkErr := lk.ck.Get(lk.key)
if checkErr == rpc.OK && checkValue == lk.lid {
return
}
}
} else if err == rpc.OK && value == lk.lid {
return
} else if err == rpc.OK && value != "" {
continue
}
}
}
Release方法实现
release就是释放锁的过程。
- 先检查当前是否持有这个锁,如果有锁就尝试把值设置成空字符串
- 如果不持有就直接返回
还是获取锁的状态,然后如果value等于自己就尝试put空字符串。
如果put成功就返回,如果是errMaybe说明rpc有可能丢失了。我们就再做get确认一下。如果已经释放了就返回。如果锁不是我们持有的就和我们没什么关系了。直接返回。不然就一直重试。
go
func (lk *Lock) Release() {
for {
// 获取锁的当前状态
value, version, err := lk.ck.Get(lk.key)
// 情况1: 成功获取锁状态,且确认我们持有这个锁
if err == rpc.OK && value == lk.lid {
// 尝试释放锁:将锁的值设置为空字符串
putErr := lk.ck.Put(lk.key, "", version)
if putErr == rpc.OK {
// 成功释放锁
return
} else if putErr == rpc.ErrMaybe {
// Put操作可能成功了,需要验证锁是否真的被释放
// 这种情况发生在:Put请求成功但响应丢失
checkValue, _, checkErr := lk.ck.Get(lk.key)
if checkErr == rpc.OK && checkValue != lk.lid {
// 验证确认锁已经被释放(值不再是我们的ID)
return
}
// 验证失败,可能锁仍然被我们持有,继续尝试释放
}
} else if err == rpc.OK && value != lk.lid {
return
}
}
}
测试结果
以上,我们的lab2就完成了,我们进行一下测试。
bash
/mnt/data/coding/mit65840/src/kvsrv1 main go test -v
得到结果:
=== RUN TestReliablePut
One client and reliable Put (reliable network)...
... Passed -- time 0.0s #peers 1 #RPCs 5 #Ops 0
--- PASS: TestReliablePut (0.00s)
=== RUN TestPutConcurrentReliable
Test: many clients racing to put values to the same key (reliable network)...
info: linearizability check timed out, assuming history is ok
... Passed -- time 11.0s #peers 1 #RPCs 50923 #Ops 50923
--- PASS: TestPutConcurrentReliable (11.04s)
=== RUN TestMemPutManyClientsReliable
Test: memory use many put clients (reliable network)...
... Passed -- time 7.7s #peers 1 #RPCs 100000 #Ops 0
--- PASS: TestMemPutManyClientsReliable (13.47s)
=== RUN TestUnreliableNet
One client (unreliable network)...
... Passed -- time 3.4s #peers 1 #RPCs 257 #Ops 210
--- PASS: TestUnreliableNet (3.44s)
PASS
ok 6.5840/kvsrv1 27.966s
以上。