我是靠谱客的博主 失眠西装,最近开发中收集的这篇文章主要介绍Etcd源码分析之clientv3源码分析(2)Etcd源码分析之clientv3源码分析(2),觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

Etcd源码分析之clientv3源码分析(2)

在上一篇博客《Etcd源码分析之clientv3源码分析(1)》中介绍了如何创建一个clientv3的流程,在本篇博客中以一个PUT操作为例,介绍一下PUT、GET和DELETE这三个键值相关操作中的代码流程。

PUT操作样例程序

首先,一个简单的PUT操作代码代码样例如下:

import (
    "context"
    "log"
    "time"

    "github.com/coreos/etcd/clientv3"
)
var (
    dialTimeout    = 5 * time.Second  
    requestTimeout = 10 * time.Second 
    endpoints      = []string{"localhost:2379"}
)

//此函数来源于 -->etcd-3.3.1/clientv3/example_kv_test.go  line:26
func ExampleKV_put() {
    //创建一个客户端
    cli, err := clientv3.New(clientv3.Config{
        Endpoints:   endpoints ,  //客户端了解到服务端的localhost:2379
        DialTimeout: dialTimeout, //grpc 连接建立超时时间为5s
    })
    if err != nil {
        log.Fatal(err)
    }
    //函数退出前关闭客户端
    defer cli.Close()

    //因为clientv3里面设置了重试机制,如果一个endpoint不可用,clientv3会尝试其他endpoint,即使所有的
    //endpoints都不可用,如果不配置ctx为WithTimeout类型,clientV3会一直在重试,直到有可用endpoint,
    //将Put请求发送出去为止。所以此处配置了requestTimeout(10s),10s内Put操作还没有完成,则取消该操作,
    //同时Put也会返回错误。
    ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
    _, err = cli.Put(ctx, "sample_key", "sample_value") //PUT sample_key sample_value
    cancel() //虽然10s之后,ctx会自动失效,但是这里还是执行以下cancel()函数,避免context泄露。
    if err != nil {
        log.Fatal(err)
    }
}

func main(){
    ExampleKV_put()
}

PUT操作源码分析

对于Put操作,执行的是Client结构体中的Put函数,该函数属于KV interface

// 原代码见-->etcd-3.3.1/clientv3/kv.go  line:33
type KV interface {
    // Put puts a key-value pair into etcd.
    // Note that key,value can be plain bytes array and string is
    // an immutable representation of that bytes array.
    // To get a string of bytes, do string([]byte(0x10, 0x20)).
    Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)

    // Get retrieves keys.
    // By default, Get will return the value for "key", if any.
    // When passed WithRange(end), Get will return the keys in the range [key, end).
    // When passed WithFromKey(), Get returns keys greater than or equal to key.
    // When passed WithRev(rev) with rev > 0, Get retrieves keys at the given revision;
    // if the required revision is compacted, the request will fail with ErrCompacted .
    // When passed WithLimit(limit), the number of returned keys is bounded by limit.
    // When passed WithSort(), the keys will be sorted.
    Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)

    // Delete deletes a key, or optionally using WithRange(end), [key, end).
    Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)

    // Compact compacts etcd KV history before the given rev.
    Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)

    // Do applies a single Op on KV without a transaction.
    // Do is useful when declaring operations to be issued at a later time
    // whereas Get/Put/Delete are for better suited for when the operation
    // should be immediately issued at time of declaration.

    // Do applies a single Op on KV without a transaction.
    // Do is useful when creating arbitrary operations to be issued at a
    // later time; the user can range over the operations, calling Do to
    // execute them. Get/Put/Delete, on the other hand, are best suited
    // for when the operation should be issued at the time of declaration.
    Do(ctx context.Context, op Op) (OpResponse, error)

    // Txn creates a transaction.
    Txn(ctx context.Context) Txn
}

在Client结构体中,KV的定义如下:

// 原代码见-->etcd-3.3.1/clientv3/client.go line:45
// Client provides and manages an etcd v3 client session.
type Client struct {
    Cluster
    KV
    Lease
    Watcher
    Auth
    Maintenance
    //...
}

那么在实际的代码运行中,Put接口是如何调用的呢?
首先在调用clientv3.New()函数创建一个客户端时,其实最主要是调用newClient(cfg *Config)函数(详情见上一篇博客),在该函数里面,通过以下代码,对Client.KV进行了赋值:

// 原代码见-->etcd-3.3.1/clientv3/client.go  line:366
func newClient(cfg *Config) (*Client, error) {

    //...
    client.Cluster = NewCluster(client)
    client.KV = NewKV(client)
    client.Lease = NewLease(client)
    client.Watcher = NewWatcher(client)
    client.Auth = NewAuth(client)
    client.Maintenance = NewMaintenance(client)
    //...
    return client, nil
}

此处的 NewKV()函数,返回值为一个kv结构体的地址,入参是当前的Client客户端指针

// 原代码见-->etcd-3.3.1/clientv3/kv.go line:92
type kv struct {
    remote   pb.KVClient
    callOpts []grpc.CallOption
}

func NewKV(c *Client) KV {
    api := &kv{remote: RetryKVClient(c)}
    if c != nil {
        api.callOpts = c.callOpts
    }
    return api
}

如此,样例代码中的调用cli.Put(),其实就是调用kv.Put(),即以下代码:

// 原代码见-->etcd-3.3.1/clientv3/kv.go line:113
func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) {
    r, err := kv.Do(ctx, OpPut(key, val, opts...))
    return r.put, toErr(ctx, err)
}

//... 

//原代码见-->etcd-3.3.1/clientv3/kv.go line:144
//上面的Put()函数会调用Do()函数来执行Put()操作,其他的Get、Delete操作也一样
func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
    var err error
    switch op.t {
    case tRange:
        var resp *pb.RangeResponse
        resp, err = kv.remote.Range(ctx, op.toRangeRequest(), kv.callOpts...)
        if err == nil {
            return OpResponse{get: (*GetResponse)(resp)}, nil
        }
    case tPut:
        var resp *pb.PutResponse
        r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV, IgnoreValue: op.ignoreValue, IgnoreLease: op.ignoreLease}
        resp, err = kv.remote.Put(ctx, r, kv.callOpts...)
        if err == nil {
            return OpResponse{put: (*PutResponse)(resp)}, nil
        }
    case tDeleteRange:
        var resp *pb.DeleteRangeResponse
        r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PrevKv: op.prevKV}
        resp, err = kv.remote.DeleteRange(ctx, r, kv.callOpts...)
        if err == nil {
            return OpResponse{del: (*DeleteResponse)(resp)}, nil
        }
    case tTxn:
        var resp *pb.TxnResponse
        resp, err = kv.remote.Txn(ctx, op.toTxnRequest(), kv.callOpts...)
        if err == nil {
            return OpResponse{txn: (*TxnResponse)(resp)}, nil
        }
    default:
        panic("Unknown op")
    }
    return OpResponse{}, toErr(ctx, err)
}

一个Put操作,经过一层又一层的函数调用,终于来到了kv.remote.Put(ctx, r)

clientv3的重试机制

而通过上面贴出的NewKV(c *Client)的源码,可以看出kv.remote的值等于RetryKVClient(c)的返回值。
RetryKVClient()的实现代码如下:

// 原代码见-->etcd-3.3.1/clientv3/retry.go line:136
type retryKVClient struct {
    kc     pb.KVClient
    retryf retryRPCFunc
}

// RetryKVClient implements a KVClient.
func RetryKVClient(c *Client) pb.KVClient {
    return &retryKVClient{  //返回一个&retryKVClient{}
        kc:     pb.NewKVClient(c.conn),
        retryf: c.newAuthRetryWrapper(c.newRetryWrapper()),
    }
}
//此处range即为 get操作
func (rkv *retryKVClient) Range(ctx context.Context, in *pb.RangeRequest, opts ...grpc.CallOption) (resp *pb.RangeResponse, err error) {
    err = rkv.retryf(ctx, func(rctx context.Context) error {
        resp, err = rkv.kc.Range(rctx, in, opts...)
        return err
    }, repeatable)
    return resp, err
}

func (rkv *retryKVClient) Put(ctx context.Context, in *pb.PutRequest, opts ...grpc.CallOption) (resp *pb.PutResponse, err error) {
    err = rkv.retryf(ctx, func(rctx context.Context) error {
        resp, err = rkv.kc.Put(rctx, in, opts...)
        return err
    }, nonRepeatable)
    return resp, err
}

所以上面的kv.remote.Put(ctx, r)实际上调用的是retryKVClient.Put()
retryKVClient.Put()中,调用rkv.retryf来执行下面这个函数:

func(rctx context.Context) error {
    resp, err = rkv.kc.Put(rctx, in, opts...)
    return err
}

这个函数才是最终调用grpc接口,发送grpc请求并返回grpc回复的处理代码。这个函数由rkv.retryf来执行。

RetryKVClient中,rkv.retryfc.newAuthRetryWrapper(c.newRetryWrapper())的返回值,实现如下

// 原代码见-->etcd-3.3.1/clientv3/retry.go line:113
func (c *Client) newAuthRetryWrapper(retryf retryRPCFunc) retryRPCFunc {
    return func(rpcCtx context.Context, f rpcFunc, rp retryPolicy) error { //返回一个函数
        for {
            pinned := c.balancer.pinned()
            err := retryf(rpcCtx, f, rp)  //调用retryf进行处理kv.remote.Put(ctx, r)等操作
            if err == nil {
                return nil
            }
            logger.Lvl(4).Infof("clientv3/auth-retry: error %q on pinned endpoint %q", err.Error(), pinned)
            // always stop retry on etcd errors other than invalid auth token
            if rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken {
                gterr := c.getToken(rpcCtx)
                if gterr != nil {
                    logger.Lvl(4).Infof("clientv3/auth-retry: cannot retry due to error %q(%q) on pinned endpoint %q", err.Error(), gterr.Error(), pinned)
                    return err // return the original error for simplicity
                }
                continue
            }
            return err
        }
    }
}

上面入参中的retryf为以下函数的返回值:在这个函数里面,如果一个endpoint不可用,会通知负载均衡器去尝试与其他的endpoint建立连接。

// 原代码见-->etcd-3.3.1/clientv3/retry.go line:79
func (c *Client) newRetryWrapper() retryRPCFunc {
    return func(rpcCtx context.Context, f rpcFunc, rp retryPolicy) error {
        var isStop retryStopErrFunc
        switch rp {
        case repeatable:
            isStop = isRepeatableStopError
        case nonRepeatable:
            isStop = isNonRepeatableStopError
        }
        for {
            if err := readyWait(rpcCtx, c.ctx, c.balancer.ConnectNotify()); err != nil {
                return err
            }
            pinned := c.balancer.pinned() //获取当前正在使用的endpoint
            err := f(rpcCtx) //执行kv.remote.Put(ctx, r)等操作
            if err == nil {
                return nil
            }
            logger.Lvl(4).Infof("clientv3/retry: error %q on pinned endpoint %q", err.Error(), pinned)

            if s, ok := status.FromError(err); ok && (s.Code() == codes.Unavailable || s.Code() == codes.DeadlineExceeded || s.Code() == codes.Internal) {
                // mark this before endpoint switch is triggered
                c.balancer.hostPortError(pinned, err) 
                c.balancer.next() //获取下一个endpoint
                logger.Lvl(4).Infof("clientv3/retry: switching from %q due to error %q", pinned, err.Error())
            }

            if isStop(err) {
                return err
            }
        }
    }
}

最后

以上就是失眠西装为你收集整理的Etcd源码分析之clientv3源码分析(2)Etcd源码分析之clientv3源码分析(2)的全部内容,希望文章能够帮你解决Etcd源码分析之clientv3源码分析(2)Etcd源码分析之clientv3源码分析(2)所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(51)

评论列表共有 0 条评论

立即
投稿
返回
顶部