概述
在 ETCD 源码学习过程,不会讲解太多的源码知识,只讲解相关的实现机制,需要关注源码细节的朋友可以自行根据文章中的提示,找到相关源码进行学习。
服务端监听某个 key 或某个范围 key 变化,当被监听的 key 的值有任何变动,都会主动通知客户端(变更的版本号>指定版本号)。
主要文件
/etcd/clientv3/watch.go
结构图
主要数据结构
//对外提供 Watch 接口,同时负责关闭 grpcStream 的流程。
type watcher struct {
//访问服务的RPC客户端
remote pb.WatchClient //etcdserver/etcdserverpb/rpc.pb.go
...
// key=>grpcStream
streams map[string]*watchGrpcStream
lg *zap.Logger
}
/*
1.实例化过程中,会创建 WatchClient, 这个 WatchClient 用于和服务端进行数据传输。同时会启动一个 goroutine 用于接收服务端的消息。
2.管理 watcherStream 关闭,创建等流程,
3.启动一个 goroutine 用于处理 reqc、respc 等管道的消息。
4.分发事件给所有监听者。
*/
type watchGrpcStream struct {
owner *watcher
remote pb.WatchClient
....
//watchID=>watcherStream, 一个长连接
substreams map[int64]*watcherStream
//等待与服务器建立连接的 watcherStream
resuming []*watcherStream
//客户端发起 watch 请求会被追加到管道
reqc chan watchStreamRequest
//服务端的响应追加到该管道
respc chan *pb.WatchResponse
....
resumec chan struct{}
}
/*
主要用于管理 watcheRequest 与各个管道之间的数据传输关系。
管道outc、recvc、buf 之间的关系是:
(1)watchClient会有个goroutine 用于接收服务端的消息,并将这个消息追加到recvc 管道,
(2)如果 recvc 中有数据,会读取出来然后追加到buf缓冲中。
(3)如果 buf 不为空,会将消息发送给 outc, 客户端通过该管道,接收服务端的响应。
*/
type watcherStream struct {
//记录用户的请求实例
initReq watchRequest
//服务端响应会被追加到这个管道,
outc chan WatchResponse
//接收服务端的响应
recvc chan *WatchResponse
...
//响应缓冲池
buf []*WatchResponse
}
/*请求结构体
一个watcherRequest 代表一个监听请求。
*/
type watchRequest struct {
ctx context.Context
/*
key 和 end 共同组成监控范围
rev 是被监控的版本
*/
key string
end string
rev int64
...
//最终客户会通过该通道接收服务端的响应
retc chan chan WatchResponse
}
//响应结构体
type WatchResponse struct {
Header pb.ResponseHeader
Events []*Event
CompactRevision int64
...
}
主要函数
监控key
/*
主要流程:
1.请求数据构建
2.生成 grpcStream 的在 streams 中的标识
3.如果标识不存在于 streams,创建一个 grpcStream
4.发起请求,等待请求响应
5.返回监听者接收消息的管道
*/
func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
var filters []pb.WatchCreateRequest_FilterType
...
//组装请求
wr := &watchRequest{
...
}
ok := false
//获取streamKey
ctxKey := streamKeyFromCtx(ctx)
w.mu.Lock()
if w.streams == nil { //streams 未初始化,有问题
// closed
w.mu.Unlock()
ch := make(chan WatchResponse)
close(ch)
return ch
}
//如果已存在直接获取,不存则申请一个grpcStream
wgs := w.streams[ctxKey]
if wgs == nil {
wgs = w.newWatcherGrpcStream(ctx)
w.streams[ctxKey] = wgs
}
...
// couldn't create channel; return closed channel
closeCh := make(chan WatchResponse, 1)
// 发送请求
select {
case reqc <- wr:
ok = true
....
}
//等待与服务器建立链接,同时获取接收消息的管道,并返回给监听者
if ok {
select {
case ret := <-wr.retc:
return ret
...
}
...
}
创建GrpcStream 实例
func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
ctx, cancel := context.WithCancel(&valCtx{inctx})
wgs := &watchGrpcStream{
...
}
go wgs.run()
return wgs
}
/*
流程:
1.创建与服务交互的客户端实例
2.处理监听者请求
3.处理服务端消息
*/
func (w *watchGrpcStream) run() {
...
defer func() {
//关闭所有stream
...
}()
//创建与服务端交互的客户端
if wc, closeErr = w.newWatchClient(); closeErr != nil {
return
}
...
var cur *pb.WatchResponse
for {
select {
//监听请求
case req := <-w.reqc:
switch wreq := req.(type) {
//监听请求
case *watchRequest:
outc := make(chan WatchResponse, 1)
ws := &watcherStream{
...
}
//启动一个 goroutine 用于处理 watcherStream 管道之间的数据传输
go w.serveSubstream(ws, w.resumec)
//等待与服务器建立连接的 ws
//系统保证 watchRequest 会按照先进先出的规则与服务器建立连接
w.resuming = append(w.resuming, ws)
if len(w.resuming) == 1 { //如果只有一个,说明当前 ws 可以马上建立连接
if err := wc.Send(ws.initReq.toPB()); err != nil {
lg.Warningf("error when sending request: %v", err)
}
}
//????
case *progressRequest:
if err := wc.Send(wreq.toPB()); err != nil {
lg.Warningf("error when sending request: %v", err)
}
}
// 处理服务端的消息
case pbresp := <-w.respc:
...
switch {
case pbresp.Created:
// response to head of queue creation
if ws := w.resuming[0]; ws != nil {
//注册 WathchId=>ws 的关系
w.addSubstream(pbresp, ws)
//通知所有的监听者
w.dispatchEvent(pbresp)
w.resuming[0] = nil
}
//处理下一个需要与服务器交互的请求
if ws := w.nextResume(); ws != nil {
if err := wc.Send(ws.initReq.toPB()); err != nil {
...
}
}
case pbresp.Canceled && pbresp.CompactRevision == 0:
case cur.Fragment:
continue
default:
}
// watch client failed on Recv; spawn another if possible
case err := <-w.errc:
case <-w.ctx.Done():
case ws := <-w.closingc:
}
}
}
watchStream 消息处理
/*
1.如果 buf 有消息,先发送到对应的管道
2.处理服务消息
(1)如果创建类型,需要看看是否给监听者发送过消息接收的管道,如果没有,就发送,同时设置已发送。
(2)更新下一次监听的版本号
(3)将需要发送给监听者的消息暂存到 buf 中
*/
func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{}) {
//下一次监听的版本号
nextRev := ws.initReq.rev
...
emptyWr := &WatchResponse{}
for {
curWr := emptyWr
outc := ws.outc
if len(ws.buf) > 0 { //如果缓冲中还有消息,先发送给监听者
curWr = ws.buf[0]
} else {
outc = nil
}
select {
case outc <- *curWr: //发送服务消息给监听者
...
case wr, ok := <-ws.recvc: //如果服务有消息发送过来
if !ok {
// shutdown from closeSubstream
return
}
//创建监听请求类型
if wr.Created {
if ws.initReq.retc != nil { //如果不为 nil,说明还没给监听者发送接收消息的 消息管道
ws.initReq.retc <- ws.outc //给监听者发送接收消息管道
ws.initReq.retc = nil //表示已经发送消息管道
if ws.initReq.createdNotify { //发送消息该监听者
ws.outc <- *wr
}
if ws.initReq.rev == 0 { //设置最新版本号
nextRev = wr.Header.Revision
}
}
} else { //更新下次接收变化的版本号
nextRev = wr.Header.Revision
}
if wr.Created {
continue
}
//将消息缓存到 buf 中
ws.buf = append(ws.buf, wr)
case <-w.ctx.Done():
return
case <-ws.initReq.ctx.Done():
return
case <-resumec:
resuming = true
return
}
}
}
消息分发
func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
events := make([]*Event, len(pbresp.Events))
for i, ev := range pbresp.Events {
events[i] = (*Event)(ev)
}
wr := &WatchResponse{
...
}
if wr.IsProgressNotify() && pbresp.WatchId == -1 {
return w.broadcastResponse(wr)
}
return w.unicastResponse(wr, pbresp.WatchId)
}
//将消息广播给所有监听者
func (w *watchGrpcStream) broadcastResponse(wr *WatchResponse) bool {
for _, ws := range w.substreams {
select {
case ws.recvc <- wr:
case <-ws.donec:
}
}
return true
}
//将消息发送给特定监听者
func (w *watchGrpcStream) unicastResponse(wr *WatchResponse, watchId int64) bool {
ws, ok := w.substreams[watchId]
if !ok {
return false
}
select {
case ws.recvc <- wr:
case <-ws.donec:
return false
}
return true
}
客户端实例:
func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
...
//打开 or 创建 客户端实例
wc, err := w.openWatchClient()
...
//goroutine 用于接收服务端消息
go w.serveWatchClient(wc)
return wc, nil
}
func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {
for {
resp, err := wc.Recv()
...
select {
case w.respc <- resp:
...
}
}
总结
1.监听通过 watcher、 watchGrpcStream 、watchStream、watcherRequest 和 WatchResponse 等数据结构实现,其中 wathcer 主要对外提供 Watch 接口和处理关闭流程,每一个 watcherRequest 会构建 一个 watchGrpcStream,watchGrpcStream 用于管理 watchStream 和各种管道消息处理,watchStream 主要用于管理 watcheRequest 与各个管道之间的数据传输关系。
2.总共有3种后台 goroutine,watchGrpcStream.run 处理监听请求和服务端消息。 watchGrpcStream.serveSubstream(每个 watcherRequest 都会有),作为服务端和监听者的中间层,将服务端的消息分发给监听者。watchGrpcStream.serveWatchClient 用于接收服务端的消息,并将消息发送给 w.respc, 等待 serveSubstream 处理。
3.Watch 主要实现是通过Stream 实现,只有其中的内部原理,有待研究。
最后
以上就是包容橘子为你收集整理的ETCD 源码学习--Watch(client)主要文件结构图主要数据结构主要函数总结的全部内容,希望文章能够帮你解决ETCD 源码学习--Watch(client)主要文件结构图主要数据结构主要函数总结所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复