概述
Etcd源码分析之clientv3源码分析(2)
在上一篇博客《Etcd源码分析之clientv3源码分析(1)》中介绍了如何创建一个clientv3的流程,在本篇博客中以一个PUT操作为例,介绍一下PUT、GET和DELETE这三个键值相关操作中的代码流程。
PUT操作样例程序
首先,一个简单的PUT操作代码代码样例如下:
import (
"context"
"log"
"time"
"github.com/coreos/etcd/clientv3"
)
var (
dialTimeout = 5 * time.Second
requestTimeout = 10 * time.Second
endpoints = []string{"localhost:2379"}
)
//此函数来源于 -->etcd-3.3.1/clientv3/example_kv_test.go line:26
func ExampleKV_put() {
//创建一个客户端
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints , //客户端了解到服务端的localhost:2379
DialTimeout: dialTimeout, //grpc 连接建立超时时间为5s
})
if err != nil {
log.Fatal(err)
}
//函数退出前关闭客户端
defer cli.Close()
//因为clientv3里面设置了重试机制,如果一个endpoint不可用,clientv3会尝试其他endpoint,即使所有的
//endpoints都不可用,如果不配置ctx为WithTimeout类型,clientV3会一直在重试,直到有可用endpoint,
//将Put请求发送出去为止。所以此处配置了requestTimeout(10s),10s内Put操作还没有完成,则取消该操作,
//同时Put也会返回错误。
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
_, err = cli.Put(ctx, "sample_key", "sample_value") //PUT sample_key sample_value
cancel() //虽然10s之后,ctx会自动失效,但是这里还是执行以下cancel()函数,避免context泄露。
if err != nil {
log.Fatal(err)
}
}
func main(){
ExampleKV_put()
}
PUT操作源码分析
对于Put操作,执行的是Client结构体中的Put函数,该函数属于KV interface
// 原代码见-->etcd-3.3.1/clientv3/kv.go line:33
type KV interface {
// Put puts a key-value pair into etcd.
// Note that key,value can be plain bytes array and string is
// an immutable representation of that bytes array.
// To get a string of bytes, do string([]byte(0x10, 0x20)).
Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
// Get retrieves keys.
// By default, Get will return the value for "key", if any.
// When passed WithRange(end), Get will return the keys in the range [key, end).
// When passed WithFromKey(), Get returns keys greater than or equal to key.
// When passed WithRev(rev) with rev > 0, Get retrieves keys at the given revision;
// if the required revision is compacted, the request will fail with ErrCompacted .
// When passed WithLimit(limit), the number of returned keys is bounded by limit.
// When passed WithSort(), the keys will be sorted.
Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)
// Delete deletes a key, or optionally using WithRange(end), [key, end).
Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)
// Compact compacts etcd KV history before the given rev.
Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)
// Do applies a single Op on KV without a transaction.
// Do is useful when declaring operations to be issued at a later time
// whereas Get/Put/Delete are for better suited for when the operation
// should be immediately issued at time of declaration.
// Do applies a single Op on KV without a transaction.
// Do is useful when creating arbitrary operations to be issued at a
// later time; the user can range over the operations, calling Do to
// execute them. Get/Put/Delete, on the other hand, are best suited
// for when the operation should be issued at the time of declaration.
Do(ctx context.Context, op Op) (OpResponse, error)
// Txn creates a transaction.
Txn(ctx context.Context) Txn
}
在Client结构体中,KV的定义如下:
// 原代码见-->etcd-3.3.1/clientv3/client.go line:45
// Client provides and manages an etcd v3 client session.
type Client struct {
Cluster
KV
Lease
Watcher
Auth
Maintenance
//...
}
那么在实际的代码运行中,Put接口是如何调用的呢?
首先在调用clientv3.New()
函数创建一个客户端时,其实最主要是调用newClient(cfg *Config)
函数(详情见上一篇博客),在该函数里面,通过以下代码,对Client.KV进行了赋值:
// 原代码见-->etcd-3.3.1/clientv3/client.go line:366
func newClient(cfg *Config) (*Client, error) {
//...
client.Cluster = NewCluster(client)
client.KV = NewKV(client)
client.Lease = NewLease(client)
client.Watcher = NewWatcher(client)
client.Auth = NewAuth(client)
client.Maintenance = NewMaintenance(client)
//...
return client, nil
}
此处的 NewKV()
函数,返回值为一个kv
结构体的地址,入参是当前的Client
客户端指针
// 原代码见-->etcd-3.3.1/clientv3/kv.go line:92
type kv struct {
remote pb.KVClient
callOpts []grpc.CallOption
}
func NewKV(c *Client) KV {
api := &kv{remote: RetryKVClient(c)}
if c != nil {
api.callOpts = c.callOpts
}
return api
}
如此,样例代码中的调用cli.Put()
,其实就是调用kv.Put()
,即以下代码:
// 原代码见-->etcd-3.3.1/clientv3/kv.go line:113
func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) {
r, err := kv.Do(ctx, OpPut(key, val, opts...))
return r.put, toErr(ctx, err)
}
//...
//原代码见-->etcd-3.3.1/clientv3/kv.go line:144
//上面的Put()函数会调用Do()函数来执行Put()操作,其他的Get、Delete操作也一样
func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
var err error
switch op.t {
case tRange:
var resp *pb.RangeResponse
resp, err = kv.remote.Range(ctx, op.toRangeRequest(), kv.callOpts...)
if err == nil {
return OpResponse{get: (*GetResponse)(resp)}, nil
}
case tPut:
var resp *pb.PutResponse
r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV, IgnoreValue: op.ignoreValue, IgnoreLease: op.ignoreLease}
resp, err = kv.remote.Put(ctx, r, kv.callOpts...)
if err == nil {
return OpResponse{put: (*PutResponse)(resp)}, nil
}
case tDeleteRange:
var resp *pb.DeleteRangeResponse
r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PrevKv: op.prevKV}
resp, err = kv.remote.DeleteRange(ctx, r, kv.callOpts...)
if err == nil {
return OpResponse{del: (*DeleteResponse)(resp)}, nil
}
case tTxn:
var resp *pb.TxnResponse
resp, err = kv.remote.Txn(ctx, op.toTxnRequest(), kv.callOpts...)
if err == nil {
return OpResponse{txn: (*TxnResponse)(resp)}, nil
}
default:
panic("Unknown op")
}
return OpResponse{}, toErr(ctx, err)
}
一个Put操作,经过一层又一层的函数调用,终于来到了kv.remote.Put(ctx, r)
。
clientv3的重试机制
而通过上面贴出的NewKV(c *Client)
的源码,可以看出kv.remote的值等于RetryKVClient(c)
的返回值。
RetryKVClient()的实现代码如下:
// 原代码见-->etcd-3.3.1/clientv3/retry.go line:136
type retryKVClient struct {
kc pb.KVClient
retryf retryRPCFunc
}
// RetryKVClient implements a KVClient.
func RetryKVClient(c *Client) pb.KVClient {
return &retryKVClient{ //返回一个&retryKVClient{}
kc: pb.NewKVClient(c.conn),
retryf: c.newAuthRetryWrapper(c.newRetryWrapper()),
}
}
//此处range即为 get操作
func (rkv *retryKVClient) Range(ctx context.Context, in *pb.RangeRequest, opts ...grpc.CallOption) (resp *pb.RangeResponse, err error) {
err = rkv.retryf(ctx, func(rctx context.Context) error {
resp, err = rkv.kc.Range(rctx, in, opts...)
return err
}, repeatable)
return resp, err
}
func (rkv *retryKVClient) Put(ctx context.Context, in *pb.PutRequest, opts ...grpc.CallOption) (resp *pb.PutResponse, err error) {
err = rkv.retryf(ctx, func(rctx context.Context) error {
resp, err = rkv.kc.Put(rctx, in, opts...)
return err
}, nonRepeatable)
return resp, err
}
所以上面的kv.remote.Put(ctx, r)
实际上调用的是retryKVClient.Put()
在retryKVClient.Put()
中,调用rkv.retryf
来执行下面这个函数:
func(rctx context.Context) error {
resp, err = rkv.kc.Put(rctx, in, opts...)
return err
}
这个函数才是最终调用grpc接口,发送grpc请求并返回grpc回复的处理代码。这个函数由rkv.retryf
来执行。
由RetryKVClient
中,rkv.retryf
为c.newAuthRetryWrapper(c.newRetryWrapper())
的返回值,实现如下
// 原代码见-->etcd-3.3.1/clientv3/retry.go line:113
func (c *Client) newAuthRetryWrapper(retryf retryRPCFunc) retryRPCFunc {
return func(rpcCtx context.Context, f rpcFunc, rp retryPolicy) error { //返回一个函数
for {
pinned := c.balancer.pinned()
err := retryf(rpcCtx, f, rp) //调用retryf进行处理kv.remote.Put(ctx, r)等操作
if err == nil {
return nil
}
logger.Lvl(4).Infof("clientv3/auth-retry: error %q on pinned endpoint %q", err.Error(), pinned)
// always stop retry on etcd errors other than invalid auth token
if rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken {
gterr := c.getToken(rpcCtx)
if gterr != nil {
logger.Lvl(4).Infof("clientv3/auth-retry: cannot retry due to error %q(%q) on pinned endpoint %q", err.Error(), gterr.Error(), pinned)
return err // return the original error for simplicity
}
continue
}
return err
}
}
}
上面入参中的retryf
为以下函数的返回值:在这个函数里面,如果一个endpoint不可用,会通知负载均衡器去尝试与其他的endpoint建立连接。
// 原代码见-->etcd-3.3.1/clientv3/retry.go line:79
func (c *Client) newRetryWrapper() retryRPCFunc {
return func(rpcCtx context.Context, f rpcFunc, rp retryPolicy) error {
var isStop retryStopErrFunc
switch rp {
case repeatable:
isStop = isRepeatableStopError
case nonRepeatable:
isStop = isNonRepeatableStopError
}
for {
if err := readyWait(rpcCtx, c.ctx, c.balancer.ConnectNotify()); err != nil {
return err
}
pinned := c.balancer.pinned() //获取当前正在使用的endpoint
err := f(rpcCtx) //执行kv.remote.Put(ctx, r)等操作
if err == nil {
return nil
}
logger.Lvl(4).Infof("clientv3/retry: error %q on pinned endpoint %q", err.Error(), pinned)
if s, ok := status.FromError(err); ok && (s.Code() == codes.Unavailable || s.Code() == codes.DeadlineExceeded || s.Code() == codes.Internal) {
// mark this before endpoint switch is triggered
c.balancer.hostPortError(pinned, err)
c.balancer.next() //获取下一个endpoint
logger.Lvl(4).Infof("clientv3/retry: switching from %q due to error %q", pinned, err.Error())
}
if isStop(err) {
return err
}
}
}
}
最后
以上就是失眠西装为你收集整理的Etcd源码分析之clientv3源码分析(2)Etcd源码分析之clientv3源码分析(2)的全部内容,希望文章能够帮你解决Etcd源码分析之clientv3源码分析(2)Etcd源码分析之clientv3源码分析(2)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复