我是靠谱客的博主 包容橘子,最近开发中收集的这篇文章主要介绍ETCD 源码学习--Watch(client)主要文件结构图主要数据结构主要函数总结,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

在 ETCD 源码学习过程,不会讲解太多的源码知识,只讲解相关的实现机制,需要关注源码细节的朋友可以自行根据文章中的提示,找到相关源码进行学习。

服务端监听某个 key 或某个范围 key 变化,当被监听的 key 的值有任何变动,都会主动通知客户端(变更的版本号>指定版本号)。

主要文件

/etcd/clientv3/watch.go

结构图

主要数据结构

//对外提供 Watch 接口,同时负责关闭 grpcStream 的流程。
type watcher struct {
//访问服务的RPC客户端
	remote   pb.WatchClient //etcdserver/etcdserverpb/rpc.pb.go
	...
// key=>grpcStream
streams map[string]*watchGrpcStream
	lg      *zap.Logger
}
/*
1.实例化过程中,会创建 WatchClient, 这个 WatchClient 用于和服务端进行数据传输。同时会启动一个 goroutine 用于接收服务端的消息。
2.管理 watcherStream 关闭,创建等流程, 
3.启动一个 goroutine 用于处理 reqc、respc 等管道的消息。
4.分发事件给所有监听者。
*/
type watchGrpcStream struct {
	owner    *watcher
	remote   pb.WatchClient
....
    //watchID=>watcherStream, 一个长连接
	substreams map[int64]*watcherStream
	//等待与服务器建立连接的 watcherStream
	resuming []*watcherStream
    //客户端发起 watch 请求会被追加到管道
	reqc chan watchStreamRequest
	//服务端的响应追加到该管道
	respc chan *pb.WatchResponse
	....
resumec chan struct{}
}
/*
主要用于管理 watcheRequest 与各个管道之间的数据传输关系。
管道outc、recvc、buf 之间的关系是:
  (1)watchClient会有个goroutine 用于接收服务端的消息,并将这个消息追加到recvc 管道,
  (2)如果 recvc 中有数据,会读取出来然后追加到buf缓冲中。
  (3)如果 buf 不为空,会将消息发送给 outc, 客户端通过该管道,接收服务端的响应。
*/

type watcherStream struct {
    //记录用户的请求实例
	initReq watchRequest
    //服务端响应会被追加到这个管道,
	outc chan WatchResponse
	//接收服务端的响应
	recvc chan *WatchResponse
	...
    //响应缓冲池
	buf []*WatchResponse
}
/*请求结构体
一个watcherRequest 代表一个监听请求。
*/
type watchRequest struct {
	ctx context.Context
	/*
     key 和 end 共同组成监控范围
     rev 是被监控的版本
    */
    key string
	end string
	rev int64
    ...
    //最终客户会通过该通道接收服务端的响应
	retc chan chan WatchResponse
}
//响应结构体
type WatchResponse struct {
	Header pb.ResponseHeader
Events []*Event
	CompactRevision int64
...
}

主要函数

监控key

/*
主要流程:
1.请求数据构建
2.生成 grpcStream 的在 streams 中的标识
3.如果标识不存在于 streams,创建一个 grpcStream
4.发起请求,等待请求响应
5.返回监听者接收消息的管道
*/
func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
	var filters []pb.WatchCreateRequest_FilterType
	...
	//组装请求
	wr := &watchRequest{
		...
	}

	ok := false
	//获取streamKey
	ctxKey := streamKeyFromCtx(ctx)

	w.mu.Lock()
	if w.streams == nil { //streams 未初始化,有问题
		// closed
		w.mu.Unlock()
		ch := make(chan WatchResponse)
		close(ch)
		return ch
	}
	//如果已存在直接获取,不存则申请一个grpcStream
	wgs := w.streams[ctxKey]
	if wgs == nil {
		wgs = w.newWatcherGrpcStream(ctx)
		w.streams[ctxKey] = wgs
	}
	...
	// couldn't create channel; return closed channel
	closeCh := make(chan WatchResponse, 1)

	// 发送请求
	select {
	case reqc <- wr:
		ok = true
		....
	}
	//等待与服务器建立链接,同时获取接收消息的管道,并返回给监听者
	if ok {
		select {
		case ret := <-wr.retc:
			return ret
		...
	}
	...
}

创建GrpcStream 实例

func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
	ctx, cancel := context.WithCancel(&valCtx{inctx})
	wgs := &watchGrpcStream{
		...
	}
	go wgs.run()
	return wgs
}
/*
流程:
1.创建与服务交互的客户端实例
2.处理监听者请求
3.处理服务端消息
*/

func (w *watchGrpcStream) run() {
	...
	defer func() {
		//关闭所有stream
		...
	}()

	//创建与服务端交互的客户端
	if wc, closeErr = w.newWatchClient(); closeErr != nil {
		return
	}
    ...
	var cur *pb.WatchResponse
	for {
		select {
		//监听请求
		case req := <-w.reqc:
			switch wreq := req.(type) {
			//监听请求
			case *watchRequest:
				outc := make(chan WatchResponse, 1)
				ws := &watcherStream{
                    ...
				}

				//启动一个 goroutine 用于处理 watcherStream 管道之间的数据传输
				go w.serveSubstream(ws, w.resumec)
				
				//等待与服务器建立连接的 ws
				//系统保证 watchRequest 会按照先进先出的规则与服务器建立连接
				w.resuming = append(w.resuming, ws)
				if len(w.resuming) == 1 { //如果只有一个,说明当前 ws 可以马上建立连接
					if err := wc.Send(ws.initReq.toPB()); err != nil {
						lg.Warningf("error when sending request: %v", err)
					}
				}
			//????
			case *progressRequest:
				if err := wc.Send(wreq.toPB()); err != nil {
					lg.Warningf("error when sending request: %v", err)
				}
			}

		// 处理服务端的消息
		case pbresp := <-w.respc:
			...
			switch {
			case pbresp.Created:
				// response to head of queue creation
				if ws := w.resuming[0]; ws != nil {
					//注册 WathchId=>ws 的关系
					w.addSubstream(pbresp, ws)
					//通知所有的监听者
					w.dispatchEvent(pbresp)
					w.resuming[0] = nil
				}
				//处理下一个需要与服务器交互的请求
				if ws := w.nextResume(); ws != nil {
					if err := wc.Send(ws.initReq.toPB()); err != nil {
						...
					}
				}

			case pbresp.Canceled && pbresp.CompactRevision == 0:
			
			case cur.Fragment:
				continue
			default:
				
			}

		// watch client failed on Recv; spawn another if possible
		case err := <-w.errc:
		case <-w.ctx.Done():
		case ws := <-w.closingc:
		}
	}
}

watchStream 消息处理

/*
1.如果 buf 有消息,先发送到对应的管道
2.处理服务消息
(1)如果创建类型,需要看看是否给监听者发送过消息接收的管道,如果没有,就发送,同时设置已发送。
(2)更新下一次监听的版本号
(3)将需要发送给监听者的消息暂存到 buf 中
*/
func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{}) {
	//下一次监听的版本号
	nextRev := ws.initReq.rev
	...
	emptyWr := &WatchResponse{}
	for {
		curWr := emptyWr
		outc := ws.outc
		if len(ws.buf) > 0 { //如果缓冲中还有消息,先发送给监听者
			curWr = ws.buf[0]
		} else {
			outc = nil
		}
		select {
		case outc <- *curWr: //发送服务消息给监听者
			...
		case wr, ok := <-ws.recvc: //如果服务有消息发送过来
			if !ok {
				// shutdown from closeSubstream
				return
			}
			//创建监听请求类型
			if wr.Created {
				if ws.initReq.retc != nil { //如果不为 nil,说明还没给监听者发送接收消息的 消息管道
					ws.initReq.retc <- ws.outc //给监听者发送接收消息管道
	
					ws.initReq.retc = nil //表示已经发送消息管道
					if ws.initReq.createdNotify { //发送消息该监听者
						ws.outc <- *wr
					}
				
					if ws.initReq.rev == 0 { //设置最新版本号
						nextRev = wr.Header.Revision
					}
				}
			} else { //更新下次接收变化的版本号
				nextRev = wr.Header.Revision
			}
			if wr.Created {
				continue
			}
			//将消息缓存到 buf 中
			ws.buf = append(ws.buf, wr)
		case <-w.ctx.Done():
			return
		case <-ws.initReq.ctx.Done():
			return
		case <-resumec:
			resuming = true
			return
		}
	}
}

消息分发

func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
	events := make([]*Event, len(pbresp.Events))
	for i, ev := range pbresp.Events {
		events[i] = (*Event)(ev)
	}
	wr := &WatchResponse{
		...
	}
	if wr.IsProgressNotify() && pbresp.WatchId == -1 {
		return w.broadcastResponse(wr)
	}

	return w.unicastResponse(wr, pbresp.WatchId)

}

//将消息广播给所有监听者
func (w *watchGrpcStream) broadcastResponse(wr *WatchResponse) bool {
	for _, ws := range w.substreams {
		select {
		case ws.recvc <- wr:
		case <-ws.donec:
		}
	}
	return true
}
//将消息发送给特定监听者
func (w *watchGrpcStream) unicastResponse(wr *WatchResponse, watchId int64) bool {
	ws, ok := w.substreams[watchId]
	if !ok {
		return false
	}
	select {
	case ws.recvc <- wr:
	case <-ws.donec:
		return false
	}
	return true
}
客户端实例:
func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
	...
	//打开 or 创建 客户端实例
	wc, err := w.openWatchClient() 
	...
	//goroutine 用于接收服务端消息
	go w.serveWatchClient(wc)
	return wc, nil
}

func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {
	for {
		resp, err := wc.Recv()
		...
		select {
		case w.respc <- resp:
		...
	}
}

总结

1.监听通过 watcher、 watchGrpcStream 、watchStream、watcherRequest 和 WatchResponse 等数据结构实现,其中 wathcer 主要对外提供 Watch 接口和处理关闭流程,每一个 watcherRequest 会构建 一个 watchGrpcStream,watchGrpcStream 用于管理 watchStream 和各种管道消息处理,watchStream 主要用于管理 watcheRequest 与各个管道之间的数据传输关系。


2.总共有3种后台 goroutine,watchGrpcStream.run 处理监听请求和服务端消息。 watchGrpcStream.serveSubstream(每个 watcherRequest 都会有),作为服务端和监听者的中间层,将服务端的消息分发给监听者。watchGrpcStream.serveWatchClient 用于接收服务端的消息,并将消息发送给 w.respc, 等待 serveSubstream 处理。

3.Watch 主要实现是通过Stream 实现,只有其中的内部原理,有待研究。

最后

以上就是包容橘子为你收集整理的ETCD 源码学习--Watch(client)主要文件结构图主要数据结构主要函数总结的全部内容,希望文章能够帮你解决ETCD 源码学习--Watch(client)主要文件结构图主要数据结构主要函数总结所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(58)

评论列表共有 0 条评论

立即
投稿
返回
顶部