我是靠谱客的博主 失眠西装,这篇文章主要介绍Etcd源码分析之clientv3源码分析(2)Etcd源码分析之clientv3源码分析(2),现在分享给大家,希望可以做个参考。

Etcd源码分析之clientv3源码分析(2)

在上一篇博客《Etcd源码分析之clientv3源码分析(1)》中介绍了如何创建一个clientv3的流程,在本篇博客中以一个PUT操作为例,介绍一下PUT、GET和DELETE这三个键值相关操作中的代码流程。

PUT操作样例程序

首先,一个简单的PUT操作代码代码样例如下:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import ( "context" "log" "time" "github.com/coreos/etcd/clientv3" ) var ( dialTimeout = 5 * time.Second requestTimeout = 10 * time.Second endpoints = []string{"localhost:2379"} ) //此函数来源于 -->etcd-3.3.1/clientv3/example_kv_test.go line:26 func ExampleKV_put() { //创建一个客户端 cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints , //客户端了解到服务端的localhost:2379 DialTimeout: dialTimeout, //grpc 连接建立超时时间为5s }) if err != nil { log.Fatal(err) } //函数退出前关闭客户端 defer cli.Close() //因为clientv3里面设置了重试机制,如果一个endpoint不可用,clientv3会尝试其他endpoint,即使所有的 //endpoints都不可用,如果不配置ctx为WithTimeout类型,clientV3会一直在重试,直到有可用endpoint, //将Put请求发送出去为止。所以此处配置了requestTimeout(10s),10s内Put操作还没有完成,则取消该操作, //同时Put也会返回错误。 ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) _, err = cli.Put(ctx, "sample_key", "sample_value") //PUT sample_key sample_value cancel() //虽然10s之后,ctx会自动失效,但是这里还是执行以下cancel()函数,避免context泄露。 if err != nil { log.Fatal(err) } } func main(){ ExampleKV_put() }

PUT操作源码分析

对于Put操作,执行的是Client结构体中的Put函数,该函数属于KV interface

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 原代码见-->etcd-3.3.1/clientv3/kv.go line:33 type KV interface { // Put puts a key-value pair into etcd. // Note that key,value can be plain bytes array and string is // an immutable representation of that bytes array. // To get a string of bytes, do string([]byte(0x10, 0x20)). Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) // Get retrieves keys. // By default, Get will return the value for "key", if any. // When passed WithRange(end), Get will return the keys in the range [key, end). // When passed WithFromKey(), Get returns keys greater than or equal to key. // When passed WithRev(rev) with rev > 0, Get retrieves keys at the given revision; // if the required revision is compacted, the request will fail with ErrCompacted . // When passed WithLimit(limit), the number of returned keys is bounded by limit. // When passed WithSort(), the keys will be sorted. Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error) // Delete deletes a key, or optionally using WithRange(end), [key, end). Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error) // Compact compacts etcd KV history before the given rev. Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error) // Do applies a single Op on KV without a transaction. // Do is useful when declaring operations to be issued at a later time // whereas Get/Put/Delete are for better suited for when the operation // should be immediately issued at time of declaration. // Do applies a single Op on KV without a transaction. // Do is useful when creating arbitrary operations to be issued at a // later time; the user can range over the operations, calling Do to // execute them. Get/Put/Delete, on the other hand, are best suited // for when the operation should be issued at the time of declaration. Do(ctx context.Context, op Op) (OpResponse, error) // Txn creates a transaction. Txn(ctx context.Context) Txn }

在Client结构体中,KV的定义如下:

复制代码
1
2
3
4
5
6
7
8
9
10
11
// 原代码见-->etcd-3.3.1/clientv3/client.go line:45 // Client provides and manages an etcd v3 client session. type Client struct { Cluster KV Lease Watcher Auth Maintenance //... }

那么在实际的代码运行中,Put接口是如何调用的呢?
首先在调用clientv3.New()函数创建一个客户端时,其实最主要是调用newClient(cfg *Config)函数(详情见上一篇博客),在该函数里面,通过以下代码,对Client.KV进行了赋值:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
// 原代码见-->etcd-3.3.1/clientv3/client.go line:366 func newClient(cfg *Config) (*Client, error) { //... client.Cluster = NewCluster(client) client.KV = NewKV(client) client.Lease = NewLease(client) client.Watcher = NewWatcher(client) client.Auth = NewAuth(client) client.Maintenance = NewMaintenance(client) //... return client, nil }

此处的 NewKV()函数,返回值为一个kv结构体的地址,入参是当前的Client客户端指针

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
// 原代码见-->etcd-3.3.1/clientv3/kv.go line:92 type kv struct { remote pb.KVClient callOpts []grpc.CallOption } func NewKV(c *Client) KV { api := &kv{remote: RetryKVClient(c)} if c != nil { api.callOpts = c.callOpts } return api }

如此,样例代码中的调用cli.Put(),其实就是调用kv.Put(),即以下代码:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 原代码见-->etcd-3.3.1/clientv3/kv.go line:113 func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) { r, err := kv.Do(ctx, OpPut(key, val, opts...)) return r.put, toErr(ctx, err) } //... //原代码见-->etcd-3.3.1/clientv3/kv.go line:144 //上面的Put()函数会调用Do()函数来执行Put()操作,其他的Get、Delete操作也一样 func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) { var err error switch op.t { case tRange: var resp *pb.RangeResponse resp, err = kv.remote.Range(ctx, op.toRangeRequest(), kv.callOpts...) if err == nil { return OpResponse{get: (*GetResponse)(resp)}, nil } case tPut: var resp *pb.PutResponse r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV, IgnoreValue: op.ignoreValue, IgnoreLease: op.ignoreLease} resp, err = kv.remote.Put(ctx, r, kv.callOpts...) if err == nil { return OpResponse{put: (*PutResponse)(resp)}, nil } case tDeleteRange: var resp *pb.DeleteRangeResponse r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PrevKv: op.prevKV} resp, err = kv.remote.DeleteRange(ctx, r, kv.callOpts...) if err == nil { return OpResponse{del: (*DeleteResponse)(resp)}, nil } case tTxn: var resp *pb.TxnResponse resp, err = kv.remote.Txn(ctx, op.toTxnRequest(), kv.callOpts...) if err == nil { return OpResponse{txn: (*TxnResponse)(resp)}, nil } default: panic("Unknown op") } return OpResponse{}, toErr(ctx, err) }

一个Put操作,经过一层又一层的函数调用,终于来到了kv.remote.Put(ctx, r)

clientv3的重试机制

而通过上面贴出的NewKV(c *Client)的源码,可以看出kv.remote的值等于RetryKVClient(c)的返回值。
RetryKVClient()的实现代码如下:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 原代码见-->etcd-3.3.1/clientv3/retry.go line:136 type retryKVClient struct { kc pb.KVClient retryf retryRPCFunc } // RetryKVClient implements a KVClient. func RetryKVClient(c *Client) pb.KVClient { return &retryKVClient{ //返回一个&retryKVClient{} kc: pb.NewKVClient(c.conn), retryf: c.newAuthRetryWrapper(c.newRetryWrapper()), } } //此处range即为 get操作 func (rkv *retryKVClient) Range(ctx context.Context, in *pb.RangeRequest, opts ...grpc.CallOption) (resp *pb.RangeResponse, err error) { err = rkv.retryf(ctx, func(rctx context.Context) error { resp, err = rkv.kc.Range(rctx, in, opts...) return err }, repeatable) return resp, err } func (rkv *retryKVClient) Put(ctx context.Context, in *pb.PutRequest, opts ...grpc.CallOption) (resp *pb.PutResponse, err error) { err = rkv.retryf(ctx, func(rctx context.Context) error { resp, err = rkv.kc.Put(rctx, in, opts...) return err }, nonRepeatable) return resp, err }

所以上面的kv.remote.Put(ctx, r)实际上调用的是retryKVClient.Put()
retryKVClient.Put()中,调用rkv.retryf来执行下面这个函数:

复制代码
1
2
3
4
func(rctx context.Context) error { resp, err = rkv.kc.Put(rctx, in, opts...) return err }

这个函数才是最终调用grpc接口,发送grpc请求并返回grpc回复的处理代码。这个函数由rkv.retryf来执行。

RetryKVClient中,rkv.retryfc.newAuthRetryWrapper(c.newRetryWrapper())的返回值,实现如下

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 原代码见-->etcd-3.3.1/clientv3/retry.go line:113 func (c *Client) newAuthRetryWrapper(retryf retryRPCFunc) retryRPCFunc { return func(rpcCtx context.Context, f rpcFunc, rp retryPolicy) error { //返回一个函数 for { pinned := c.balancer.pinned() err := retryf(rpcCtx, f, rp) //调用retryf进行处理kv.remote.Put(ctx, r)等操作 if err == nil { return nil } logger.Lvl(4).Infof("clientv3/auth-retry: error %q on pinned endpoint %q", err.Error(), pinned) // always stop retry on etcd errors other than invalid auth token if rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken { gterr := c.getToken(rpcCtx) if gterr != nil { logger.Lvl(4).Infof("clientv3/auth-retry: cannot retry due to error %q(%q) on pinned endpoint %q", err.Error(), gterr.Error(), pinned) return err // return the original error for simplicity } continue } return err } } }

上面入参中的retryf为以下函数的返回值:在这个函数里面,如果一个endpoint不可用,会通知负载均衡器去尝试与其他的endpoint建立连接。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 原代码见-->etcd-3.3.1/clientv3/retry.go line:79 func (c *Client) newRetryWrapper() retryRPCFunc { return func(rpcCtx context.Context, f rpcFunc, rp retryPolicy) error { var isStop retryStopErrFunc switch rp { case repeatable: isStop = isRepeatableStopError case nonRepeatable: isStop = isNonRepeatableStopError } for { if err := readyWait(rpcCtx, c.ctx, c.balancer.ConnectNotify()); err != nil { return err } pinned := c.balancer.pinned() //获取当前正在使用的endpoint err := f(rpcCtx) //执行kv.remote.Put(ctx, r)等操作 if err == nil { return nil } logger.Lvl(4).Infof("clientv3/retry: error %q on pinned endpoint %q", err.Error(), pinned) if s, ok := status.FromError(err); ok && (s.Code() == codes.Unavailable || s.Code() == codes.DeadlineExceeded || s.Code() == codes.Internal) { // mark this before endpoint switch is triggered c.balancer.hostPortError(pinned, err) c.balancer.next() //获取下一个endpoint logger.Lvl(4).Infof("clientv3/retry: switching from %q due to error %q", pinned, err.Error()) } if isStop(err) { return err } } } }

最后

以上就是失眠西装最近收集整理的关于Etcd源码分析之clientv3源码分析(2)Etcd源码分析之clientv3源码分析(2)的全部内容,更多相关Etcd源码分析之clientv3源码分析(2)Etcd源码分析之clientv3源码分析(2)内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(49)

评论列表共有 0 条评论

立即
投稿
返回
顶部