概述
基于前文的封装,这里我们说明下watch相关的定义及操作。
为了应对这样的需求, 这里举个例子:
A会根据需要有不同的操作,而B需要监听A不同的操作变化来处理不同的事情,这时候如果是etcd那么问题即可迎刃而解。
再实际一点:如业务中,用户欠费后将会停止用户的某特权功能,则停止的一方就需要监听到底是什么时候欠费的,一旦欠费,则给予停止操作停止其特权。
目录
代码实现
操作触发源
watch监听
细节分析
重要总结
代码实现
func InitV3Client() (cli *clientv3.Client, err error) {
cli, err = clientv3.New(clientv3.Config{
Endpoints: []string{"192.168.31.103:32379"},
DialTimeout: 5 * time.Second,
Username: "root",
Password: "555",
})
return
}
操作触发源
var (
watchPrefix = "/a/b"
)
func Source(cli *clientv3.Client) {
var err error
for i := 0; ; i++ {
fmt.Println("i=", i)
if i%3 == 0 { // 模拟触发某个条件,则开始触发watch
key := fmt.Sprintf("%s/%v", watchPrefix, i)
if i%2 == 0 { // 假设是偶数的话进行DELETE操作(纯delete操作时注释掉下面的else)
fmt.Printf("触发了条件,准备DELETE: %vn", key)
_, err = cli.Delete(context.Background(), key)
if err != nil {
fmt.Printf("DELETE 失败: %vn", err)
continue
}
fmt.Printf("DELETE %v successn", key)
} else {
//fmt.Printf("触发了条件,准备PUT: %vn", key)
//_, err = cli.Put(context.Background(), key, strconv.Itoa(i))
//if err != nil {
// fmt.Printf("PUT 失败: %vn", err)
// continue
//}
//
//fmt.Printf("PUT %v successn", key)
}
}
time.Sleep(time.Second * 1)
}
}
watch监听
func WatchSource(cli *clientv3.Client) {
wCh := cli.Watch(context.Background(), watchPrefix, clientv3.WithPrefix())
for {
select {
case data := <-wCh:
for i, event := range data.Events { // 每次有几个事件,这里就循环几次
fmt.Printf("receive %v event, %vn", i, event.Type) // event.Type表示事件类型,PUT还是DELETE
switch event.Type {
case clientv3.EventTypePut:
// do your sth
case clientv3.EventTypeDelete:
// do your sth
}
getKey, getValue := getKeyAndVal(event)
fmt.Printf("已监听到信号, %v=%v 进行了 %v 操作. 下一步通知操作方做相应处理n", getKey, getValue, event.Type)
// 实际业务处理...
}
}
}
}
func getKeyAndVal(event *clientv3.Event) (getKey, getValue string) {
fmt.Printf("event.PrevKv==nil结果 %v, event.Kv==nil结果 %vn", event.PrevKv == nil, event.Kv == nil)
fmt.Printf("event.PrevKv= %+vn", event.PrevKv)
fmt.Printf("event.Kv= %+vn", event.Kv)
if event.PrevKv != nil {
getKey, getValue = string(event.PrevKv.Key), string(event.PrevKv.Value)
}
if event.Type == mvccpb.PUT {
if event.Kv != nil {
getKey, getValue = string(event.Kv.Key), string(event.Kv.Value)
}
}
return
}
启动
func main() {
cli, err := client.InitV3Client()
if err != nil {
fmt.Println(err)
return
}
// 验证方式1,手动在终端操作,这里监听结果
watch.WatchSource(cli)
// 验证方式2,全部在代码中自动运行
//go watch.WatchSource(cli)
//watch.Source(cli)
}
好,一个watch流程实现完了。
细节分析
接下来基于上面代码,我们总结一些细节,如什么时候能收到信号,收到信号后什么时候能拿到上一个版本的值?
我们以方式1做验证。
PUT一个新值看看:
root@:~# docker exec etcd etcdctl put /a/b/3 3 --user=a:123
OK
结果:
receive 0 event, PUT
event.PrevKv==nil结果 true, event.Kv==nil结果 false
event.PrevKv= <nil>
event.Kv= key:"/a/b/3" create_revision:762 mod_revision:762 version:1 value:"3"
已监听到信号, /a/b/3=3 进行了 PUT 操作. 下一步通知操作方做相应处理
这里已经能收到PUT信号,表示流程OK!
再put /a/b/3 3一次试试:
receive 0 event, PUT
event.PrevKv==nil结果 true, event.Kv==nil结果 false
event.PrevKv= <nil>
event.Kv= key:"/a/b/3" create_revision:762 mod_revision:763 version:2 value:"3"
已监听到信号, /a/b/3=3 进行了 PUT 操作. 下一步通知操作方做相应处理
可以看到version、mod_revision值做了+1操作,PrevKv未拿到值。
那么key不变,val变化呢?
root@:~# docker exec etcd etcdctl put /a/b/3 13 --user=a:123
OK
看看结果:
receive 0 event, PUT
event.PrevKv==nil结果 true, event.Kv==nil结果 false
event.PrevKv= <nil>
event.Kv= key:"/a/b/3" create_revision:762 mod_revision:764 version:3 value:"13"
已监听到信号, /a/b/3=13 进行了 PUT 操作. 下一步通知操作方做相应处理
可见event.PrevKv依旧无值。
删除一个已存在数据:
root@:~# docker exec etcd etcdctl del /a/b/3 --user=a:123
1
看看结果:
receive 0 event, DELETE
event.PrevKv==nil结果 true, event.Kv==nil结果 false
event.PrevKv= <nil>
event.Kv= key:"/a/b/3" mod_revision:767
已监听到信号, /a/b/3= 进行了 DELETE 操作. 下一步通知操作方做相应处理
删除一个不存在数据:
root@:~# docker exec etcd etcdctl del /a/b/5 --user=a:123
0
receive 0 event, DELETE
event.PrevKv==nil结果 true, event.Kv==nil结果 false
event.PrevKv= <nil>
event.Kv= key:"/a/b/3" mod_revision:767
已监听到信号, /a/b/3= 进行了 DELETE 操作. 下一步通知操作方做相应处理
PrevKv依旧为空。
难道,PrevKv总是无值吗?那要这个有什么意义? 不要急,我们做一个小改动:
wCh := cli.Watch(context.Background(), watchPrefix, clientv3.WithPrefix(),clientv3.WithPrevKV())
重复上述流程进行验证:
PUT一个新值:
root@:~# docker exec etcd etcdctl put /a/b/3 2 --user=a:123
OK
receive 0 event, PUT
event.PrevKv==nil结果 true, event.Kv==nil结果 false
event.PrevKv= <nil>
event.Kv= key:"/a/b/3" create_revision:806 mod_revision:806 version:1 value:"2"
已监听到信号, /a/b/3=2 进行了 PUT 操作. 下一步通知操作方做相应处理
可以看到PrevKv为空。
再来一次:
receive 0 event, PUT
event.PrevKv==nil结果 false, event.Kv==nil结果 false
event.PrevKv= key:"/a/b/3" create_revision:799 mod_revision:800 version:2 value:"2"
event.Kv= key:"/a/b/3" create_revision:799 mod_revision:801 version:3 value:"2"
已监听到信号, /a/b/3=2 进行了 PUT 操作. 下一步通知操作方做相应处理
改一下value :
root@:~# docker exec etcd etcdctl put /a/b/3 3 --user=a:123
OK
receive 0 event, PUT
event.PrevKv==nil结果 false, event.Kv==nil结果 false
event.PrevKv= key:"/a/b/3" create_revision:799 mod_revision:801 version:3 value:"2"
event.Kv= key:"/a/b/3" create_revision:799 mod_revision:802 version:4 value:"3"
已监听到信号, /a/b/3=3 进行了 PUT 操作. 下一步通知操作方做相应处理
重要总结
1,对于PUT操作
put过该key后再次put,相当于更新操作,此时PrevKv有值、可收到任何监听信号;
对key首次PUT时 PrevKv为空、可收到任何监听信号;
2,对于DELETE操作
该key存在时进行delete操作,此时PrevKvs有值,可收到任何监听信号;
若delete不存在的key,则收不到任何监听信号;
其实可以进行的骚操作很多,就看你的需要是什么了,再如我们给上面加了clientv3.WithPrefix(),这就代表操作时会按包含前缀对待,也就是说监听方监听的是以/a/b为前缀的key的变化,如果没有 clientv3.WithPrefix(),删除或PUT一个/a/b/5这样的值注定是要失败的、不会接收到任何监听信号。
最后
以上就是会撒娇小猫咪为你收集整理的etcdv3·watch操作实现及相关重点说明代码实现 操作触发源 watch监听 细节分析 重要总结的全部内容,希望文章能够帮你解决etcdv3·watch操作实现及相关重点说明代码实现 操作触发源 watch监听 细节分析 重要总结所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复