我是靠谱客的博主 会撒娇小猫咪,最近开发中收集的这篇文章主要介绍etcdv3·watch操作实现及相关重点说明代码实现 操作触发源 watch监听 细节分析 重要总结,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

基于前文的封装,这里我们说明下watch相关的定义及操作。

为了应对这样的需求, 这里举个例子:
A会根据需要有不同的操作,而B需要监听A不同的操作变化来处理不同的事情,这时候如果是etcd那么问题即可迎刃而解。


再实际一点:如业务中,用户欠费后将会停止用户的某特权功能,则停止的一方就需要监听到底是什么时候欠费的,一旦欠费,则给予停止操作停止其特权。

目录

代码实现

操作触发源

watch监听

细节分析

重要总结


代码实现

func InitV3Client() (cli *clientv3.Client, err error) {
    cli, err = clientv3.New(clientv3.Config{
        Endpoints:   []string{"192.168.31.103:32379"},
        DialTimeout: 5 * time.Second,
        Username:    "root",
        Password:    "555",
    })

    return
}


操作触发源

var (
    watchPrefix = "/a/b"
)

func Source(cli *clientv3.Client) {
    var err error
    for i := 0; ; i++ {
        fmt.Println("i=", i)
        if i%3 == 0 { // 模拟触发某个条件,则开始触发watch
            key := fmt.Sprintf("%s/%v", watchPrefix, i)
            if i%2 == 0 { // 假设是偶数的话进行DELETE操作(纯delete操作时注释掉下面的else)
                fmt.Printf("触发了条件,准备DELETE: %vn", key)
                _, err = cli.Delete(context.Background(), key)
                if err != nil {
                    fmt.Printf("DELETE 失败: %vn", err)
                    continue
                }

                fmt.Printf("DELETE %v successn", key)
            } else {
                //fmt.Printf("触发了条件,准备PUT: %vn", key)
                //_, err = cli.Put(context.Background(), key, strconv.Itoa(i))
                //if err != nil {
                //    fmt.Printf("PUT 失败: %vn", err)
                //    continue
                //}
                //
                //fmt.Printf("PUT %v successn", key)
            }
        }

        time.Sleep(time.Second * 1)
    }
}


watch监听

func WatchSource(cli *clientv3.Client) {
    wCh := cli.Watch(context.Background(), watchPrefix, clientv3.WithPrefix())
    for {
        select {
        case data := <-wCh:
            for i, event := range data.Events { // 每次有几个事件,这里就循环几次
                fmt.Printf("receive %v event, %vn", i, event.Type) // event.Type表示事件类型,PUT还是DELETE
                switch event.Type {
                case clientv3.EventTypePut:
                    // do your sth
                case clientv3.EventTypeDelete:
                    // do your sth
                }

                getKey, getValue := getKeyAndVal(event)
                fmt.Printf("已监听到信号, %v=%v 进行了 %v 操作. 下一步通知操作方做相应处理n", getKey, getValue, event.Type)

                // 实际业务处理...
            }
        }
    }
}

func getKeyAndVal(event *clientv3.Event) (getKey, getValue string) {
    fmt.Printf("event.PrevKv==nil结果 %v, event.Kv==nil结果 %vn", event.PrevKv == nil, event.Kv == nil)
    fmt.Printf("event.PrevKv= %+vn", event.PrevKv)
    fmt.Printf("event.Kv= %+vn", event.Kv)
    if event.PrevKv != nil {
        getKey, getValue = string(event.PrevKv.Key), string(event.PrevKv.Value)
    }

    if event.Type == mvccpb.PUT {
        if event.Kv != nil {
            getKey, getValue = string(event.Kv.Key), string(event.Kv.Value)
        }
    }

    return
}

启动

func main() {
    cli, err := client.InitV3Client()
    if err != nil {
        fmt.Println(err)
        return
    }
    
    // 验证方式1,手动在终端操作,这里监听结果
    watch.WatchSource(cli)  
    
    // 验证方式2,全部在代码中自动运行
    //go watch.WatchSource(cli) 
    //watch.Source(cli)
}


好,一个watch流程实现完了。


细节分析

接下来基于上面代码,我们总结一些细节,如什么时候能收到信号,收到信号后什么时候能拿到上一个版本的值?

我们以方式1做验证。

PUT一个新值看看:

root@:~# docker exec etcd etcdctl put /a/b/3 3 --user=a:123
OK

结果:

receive 0 event, PUT
event.PrevKv==nil结果 true, event.Kv==nil结果 false
event.PrevKv= <nil>
event.Kv= key:"/a/b/3" create_revision:762 mod_revision:762 version:1 value:"3" 
已监听到信号, /a/b/3=3 进行了 PUT 操作. 下一步通知操作方做相应处理

这里已经能收到PUT信号,表示流程OK!

再put /a/b/3 3一次试试:

receive 0 event, PUT
event.PrevKv==nil结果 true, event.Kv==nil结果 false
event.PrevKv= <nil>
event.Kv= key:"/a/b/3" create_revision:762 mod_revision:763 version:2 value:"3" 
已监听到信号, /a/b/3=3 进行了 PUT 操作. 下一步通知操作方做相应处理

可以看到version、mod_revision值做了+1操作,PrevKv未拿到值。


那么key不变,val变化呢?

root@:~# docker exec etcd etcdctl put /a/b/3 13 --user=a:123
OK

看看结果:
receive 0 event, PUT
event.PrevKv==nil结果 true, event.Kv==nil结果 false
event.PrevKv= <nil>
event.Kv= key:"/a/b/3" create_revision:762 mod_revision:764 version:3 value:"13" 
已监听到信号, /a/b/3=13 进行了 PUT 操作. 下一步通知操作方做相应处理

可见event.PrevKv依旧无值。

删除一个已存在数据:

root@:~# docker exec etcd etcdctl del /a/b/3  --user=a:123
1

看看结果:

receive 0 event, DELETE
event.PrevKv==nil结果 true, event.Kv==nil结果 false
event.PrevKv= <nil>
event.Kv= key:"/a/b/3" mod_revision:767 
已监听到信号, /a/b/3= 进行了 DELETE 操作. 下一步通知操作方做相应处理

删除一个不存在数据:

root@:~# docker exec etcd etcdctl del /a/b/5  --user=a:123
0

receive 0 event, DELETE
event.PrevKv==nil结果 true, event.Kv==nil结果 false
event.PrevKv= <nil>
event.Kv= key:"/a/b/3" mod_revision:767 
已监听到信号, /a/b/3= 进行了 DELETE 操作. 下一步通知操作方做相应处理

PrevKv依旧为空。


难道,PrevKv总是无值吗?那要这个有什么意义? 不要急,我们做一个小改动:

wCh := cli.Watch(context.Background(), watchPrefix, clientv3.WithPrefix(),clientv3.WithPrevKV())

重复上述流程进行验证:


PUT一个新值:

root@:~# docker exec etcd etcdctl put /a/b/3 2 --user=a:123
OK

receive 0 event, PUT
event.PrevKv==nil结果 true, event.Kv==nil结果 false
event.PrevKv= <nil>
event.Kv= key:"/a/b/3" create_revision:806 mod_revision:806 version:1 value:"2" 
已监听到信号, /a/b/3=2 进行了 PUT 操作. 下一步通知操作方做相应处理

可以看到PrevKv为空。

再来一次:

receive 0 event, PUT
event.PrevKv==nil结果 false, event.Kv==nil结果 false
event.PrevKv= key:"/a/b/3" create_revision:799 mod_revision:800 version:2 value:"2" 
event.Kv= key:"/a/b/3" create_revision:799 mod_revision:801 version:3 value:"2" 
已监听到信号, /a/b/3=2 进行了 PUT 操作. 下一步通知操作方做相应处理


改一下value :

root@:~# docker exec etcd etcdctl put /a/b/3 3 --user=a:123
OK

receive 0 event, PUT
event.PrevKv==nil结果 false, event.Kv==nil结果 false
event.PrevKv= key:"/a/b/3" create_revision:799 mod_revision:801 version:3 value:"2" 
event.Kv= key:"/a/b/3" create_revision:799 mod_revision:802 version:4 value:"3" 
已监听到信号, /a/b/3=3 进行了 PUT 操作. 下一步通知操作方做相应处理


重要总结

1,对于PUT操作
put过该key后再次put,相当于更新操作,此时PrevKv有值、可收到任何监听信号;
对key首次PUT时 PrevKv为空、可收到任何监听信号;


2,对于DELETE操作
该key存在时进行delete操作,此时PrevKvs有值,可收到任何监听信号;
若delete不存在的key,则收不到任何监听信号;


其实可以进行的骚操作很多,就看你的需要是什么了,再如我们给上面加了clientv3.WithPrefix(),这就代表操作时会按包含前缀对待,也就是说监听方监听的是以/a/b为前缀的key的变化,如果没有 clientv3.WithPrefix(),删除或PUT一个/a/b/5这样的值注定是要失败的、不会接收到任何监听信号。

最后

以上就是会撒娇小猫咪为你收集整理的etcdv3·watch操作实现及相关重点说明代码实现 操作触发源 watch监听 细节分析 重要总结的全部内容,希望文章能够帮你解决etcdv3·watch操作实现及相关重点说明代码实现 操作触发源 watch监听 细节分析 重要总结所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(37)

评论列表共有 0 条评论

立即
投稿
返回
顶部