我是靠谱客的博主 高挑寒风,最近开发中收集的这篇文章主要介绍ETCD数据库源码分析——集群间网络层服务端接口,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

从上一篇文章ETCD数据库源码分析——集群通信初始化我们知道:

  • 集群通信服务端会调用configurePeerListeners函数为配置Config中LPUrls每个url创建一个peerListener,该函数为初始化peerListener结构体会调用transport.NewListenerWithOpts函数创建net.Listener。configurePeerListeners函数并没有初始化peerListener结构体的serve函数。
  • 集群通信服务端会调用servePeers函数启动服务。servePeers函数会创建协程运行cmux.New(p.Listener).Serve()函数。
  • 集群通信服务端会调用servePeers函数启动服务。servePeers函数首先NewPeerHandler会调用newPeerHandler(lg,s,s.RaftHandler()...),RaftHandler()会返回s.r.transport.Handler(),最终transport.Handler函数返回了注册了pipelineHandler、streamHandler、snapHandler的mux来获取相应的Handler。servePeers函数会创建协程运行&http.Server{Handler: ph, }.Serve(m.Match(cmux.Any()))

这篇文章我们来学习一下transport.NewListenerWithOpts函数和pipelineHandler、streamHandler、snapHandler。

NewListenerWithOpts函数

Listener的初始化代码如下所示,transport.NewListenerWithOpts函数代码位于client/pkg/transport/listener.go文件中,作为transport包内export到外部的函数。

		peers[i] = &peerListener{close: func(context.Context) error { return nil }}
		peers[i].Listener, err = transport.NewListenerWithOpts(u.Host, u.Scheme,
			transport.WithTLSInfo(&cfg.PeerTLSInfo),
			transport.WithSocketOpts(&cfg.SocketOpts),
			transport.WithTimeout(rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout),
		)

client/pkg/transport/listener.go文件export两个函数给其他代码调用。NewListener函数用于不带选项时使用,从调用看两个函数并没有本质的不同,最终都是会调用client/pkg/transport/listener.go文件中的newListener函数。

// NewListener creates a new listner.
func NewListener(addr, scheme string, tlsinfo *TLSInfo) (l net.Listener, err error) {
	return newListener(addr, scheme, WithTLSInfo(tlsinfo))
}
// NewListenerWithOpts creates a new listener which accepts listener options.
func NewListenerWithOpts(addr, scheme string, opts ...ListenerOption) (net.Listener, error) {
	return newListener(addr, scheme, opts...)
}

newListener函数比较负责,会根据不同的协议和选项调用不同的函数产生不同的Listener。针对unix或unixs协议调用NewUnixListener函数(定义在client/pkg/transport/unix_listener.go文件);通过listen选项(相关代码位于client/pkg/transport/listener_opts.go文件)来设定监听配置;rwTimeoutListener定义在client/pkg/transport/timeout_listener.go文件中;NewTLSListener定义在client/pkg/transport/listener_tls.go中。其实万变不离其宗net.Listener GO包。

func newListener(addr, scheme string, opts ...ListenerOption) (net.Listener, error) {
	if scheme == "unix" || scheme == "unixs" { // unix sockets via unix://laddr		
		return NewUnixListener(addr)
	}

	lnOpts := newListenOpts(opts...)  // listen选项相关代码位于client/pkg/transport/listener_opts.go文件
	switch {
	case lnOpts.IsSocketOpts():	
		config, err := newListenConfig(lnOpts.socketOpts) // new ListenConfig with socket options.
		if err != nil { return nil, err }
		lnOpts.ListenConfig = config
		// check for timeout
		fallthrough
	case lnOpts.IsTimeout(), lnOpts.IsSocketOpts():	
		ln, err := lnOpts.ListenConfig.Listen(context.TODO(), "tcp", addr) // timeout listener with socket options.
		if err != nil { return nil, err }
		lnOpts.Listener = &rwTimeoutListener{
			Listener:     ln,
			readTimeout:  lnOpts.readTimeout,
			writeTimeout: lnOpts.writeTimeout,
		}
	case lnOpts.IsTimeout():
		ln, err := net.Listen("tcp", addr)
		if err != nil { return nil, err }
		lnOpts.Listener = &rwTimeoutListener{
			Listener:     ln,
			readTimeout:  lnOpts.readTimeout,
			writeTimeout: lnOpts.writeTimeout,
		}
	default:
		ln, err := net.Listen("tcp", addr)
		if err != nil { return nil, err }
		lnOpts.Listener = ln
	}

	//  only skip if not passing TLSInfo
	if lnOpts.skipTLSInfoCheck && !lnOpts.IsTLS() { return lnOpts.Listener, nil }
	return wrapTLS(scheme, lnOpts.tlsInfo, lnOpts.Listener)
}
func wrapTLS(scheme string, tlsinfo *TLSInfo, l net.Listener) (net.Listener, error) {
	if scheme != "https" && scheme != "unixs" { return l, nil }
	if tlsinfo != nil && tlsinfo.SkipClientSANVerify { return NewTLSListener(l, tlsinfo) }
	return newTLSListener(l, tlsinfo, checkSAN)
}

pipelineHandler、streamHandler、snapHandler

servePeers函数首先NewPeerHandler会调用newPeerHandler(lg,s,s.RaftHandler()...),RaftHandler()会返回s.r.transport.Handler(),最终transport.Handler函数返回了注册了pipelineHandler、streamHandler、snapHandler的mux来获取相应的Handler。首先我们先看etcdserver.ServerPeerV2接口,其包含了ServerPeer,而ServerPeer又包含了ServerV2接口,最终可以发现ServerV2接口包含了Server接口。而EtcdServer(server/etcdserver/server.go)是Server接口的实现,因此servePeers函数代码ph := etcdhttp.NewPeerHandler(e.GetLogger(), e.Server)中形参etcdserver.ServerPeerV2传入EtcdServer实参是没有任何问题的。

type ServerPeerV2 interface {
	ServerPeer
	HashKVHandler() http.Handler
	DowngradeEnabledHandler() http.Handler
}
type ServerPeer interface {
	ServerV2
	RaftHandler() http.Handler
	LeaseHandler() http.Handler
}
type ServerV2 interface {
	Server
	Leader() types.ID
	// Do takes a V2 request and attempts to fulfill it, returning a Response.
	Do(ctx context.Context, r pb.Request) (Response, error)
	ClientCertAuthEnabled() bool
}
type Server interface {
	AddMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error)
	RemoveMember(ctx context.Context, id uint64) ([]*membership.Member, error)
	UpdateMember(ctx context.Context, updateMemb membership.Member) ([]*membership.Member, error)
	PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error)
	ClusterVersion() *semver.Version
	StorageVersion() *semver.Version
	Cluster() api.Cluster
	Alarms() []*pb.AlarmMember
	LeaderChangedNotify() <-chan struct{}
}

NewPeerHandler函数调用newPeerHandler函数注册raftHandler、leaseHandler、hashKVHandler、downgradeEnabledHandler和versionHandler、peerMembersHandler、peerMemberPromoteHandler到mux中。这里罗列一下这些handler所对应的路径。

handlerpath
raftHandler/raft 或 /raft/
leaseHandler/leases 或 /leases/internal
hashKVHandler/members/hashkv
downgradeEnabledHandler/downgrade/enabled
versionHandler/version
peerMembersHandler/members
peerMemberPromoteHandler/members/promote/
// NewPeerHandler generates an http.Handler to handle etcd peer requests.
func NewPeerHandler(lg *zap.Logger, s etcdserver.ServerPeerV2) http.Handler {
	return newPeerHandler(lg, s, s.RaftHandler(), s.LeaseHandler(), s.HashKVHandler(), s.DowngradeEnabledHandler())
}
func newPeerHandler(lg *zap.Logger,s etcdserver.Server,raftHandler http.Handler,leaseHandler http.Handler,hashKVHandler http.Handler,downgradeEnabledHandler http.Handler,) http.Handler {
	if lg == nil { lg = zap.NewNop() }
	peerMembersHandler := newPeerMembersHandler(lg, s.Cluster())
	peerMemberPromoteHandler := newPeerMemberPromoteHandler(lg, s)
	mux := http.NewServeMux()
	mux.HandleFunc("/", http.NotFound)
	mux.Handle(rafthttp.RaftPrefix, raftHandler)
	mux.Handle(rafthttp.RaftPrefix+"/", raftHandler)
	mux.Handle(peerMembersPath, peerMembersHandler)
	mux.Handle(peerMemberPromotePrefix, peerMemberPromoteHandler)
	if leaseHandler != nil {
		mux.Handle(leasehttp.LeasePrefix, leaseHandler)
		mux.Handle(leasehttp.LeaseInternalPrefix, leaseHandler)
	}
	if downgradeEnabledHandler != nil {
		mux.Handle(etcdserver.DowngradeEnabledPath, downgradeEnabledHandler)
	}
	if hashKVHandler != nil {
		mux.Handle(etcdserver.PeerHashKVPath, hashKVHandler)
	}
	mux.HandleFunc(versionPath, versionHandler(s, serveVersion))
	return mux
}

RaftHandler()
RaftHandler()函数定义在server/etcdserver/server.go文件中,其会调用transport的Handler函数。

func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() }

transport的Handler函数定义在server/etcdserver/api/rafthttp/transport.go文件中,如下代码所示,创建pipelineHandler、streamHandler、snapHandler并将其注册到mux中。

func (t *Transport) Handler() http.Handler {
	pipelineHandler := newPipelineHandler(t, t.Raft, t.ClusterID)
	streamHandler := newStreamHandler(t, t, t.Raft, t.ID, t.ClusterID)
	snapHandler := newSnapshotHandler(t, t.Raft, t.Snapshotter, t.ClusterID)
	mux := http.NewServeMux()
	mux.Handle(RaftPrefix, pipelineHandler)
	mux.Handle(RaftStreamPrefix+"/", streamHandler)
	mux.Handle(RaftSnapshotPrefix, snapHandler)
	mux.Handle(ProbingPrefix, probing.NewHandler())
	return mux
}

这里罗列一下这些handler所对应的路径。

handlerpath
raftHandler/raft 或 /raft/
pipelineHandler/raft
streamHandler/raft/stream
snapHandler/raft/snapshot
probing.NewHandler()/raft/probing

LeaseHandler()
LeaseHandler()函数定义在server/etcdserver/server.go文件中,其会调用leasehttp的NewHandler函数。

func (s *EtcdServer) LeaseHandler() http.Handler {
	if s.lessor == nil { return nil }
	return leasehttp.NewHandler(s.lessor, s.ApplyWait)
}

HashKVHandler()
HashKVHandler()函数定义在server/etcdserver/corrupt.go文件中,其会返回hashKVHandler结构体。

func (s *EtcdServer) HashKVHandler() http.Handler {
	return &hashKVHandler{lg: s.Logger(), server: s}
}
type hashKVHandler struct {
	lg     *zap.Logger
	server *EtcdServer
}

DowngradeEnabledHandler()
DowngradeEnabledHandler()函数定义在server/etcdserver/server.go文件中,其会返回downgradeEnabledHandler结构体。

type downgradeEnabledHandler struct {
	lg      *zap.Logger
	cluster api.Cluster
	server  *EtcdServer
}
func (s *EtcdServer) DowngradeEnabledHandler() http.Handler {
	return &downgradeEnabledHandler{
		lg:      s.Logger(),
		cluster: s.cluster,
		server:  s,
	}
}

versionHandler(s, serveVersion)
versionHandler()函数定义在server/etcdserver/api/etcdhttp/version.go文件中,其会向其函数体中的函数传入ResponseWriter对象和http.Request,其内部函数会调用server.ClusterVersion()和server.StorageVersion()获取集群版本和存储版本,然后调用serveVersion函数将版本信息封装成Versions结构体,设置http相应头信息,序列化后写出。

func versionHandler(server etcdserver.Server, fn func(http.ResponseWriter, *http.Request, string, string)) http.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) {
		clusterVersion := server.ClusterVersion()
		storageVersion := server.StorageVersion()
		clusterVersionStr, storageVersionStr := "not_decided", "unknown"
		if clusterVersion != nil {
			clusterVersionStr = clusterVersion.String()
		}
		if storageVersion != nil {
			storageVersionStr = storageVersion.String()
		}
		fn(w, r, clusterVersionStr, storageVersionStr)
	}
}
func serveVersion(w http.ResponseWriter, r *http.Request, clusterV, storageV string) {
	if !allowMethod(w, r, "GET") { return }
	vs := version.Versions{
		Server:  version.Version,
		Cluster: clusterV,
		Storage: storageV,
	}

	w.Header().Set("Content-Type", "application/json")
	b, err := json.Marshal(&vs)
	if err != nil { panic(fmt.Sprintf("cannot marshal versions to json (%v)", err)) }
	w.Write(b)
}

newPeerMembersHandler(lg, s.Cluster())
newPeerMembersHandler()函数定义在server/etcdserver/api/etcdhttp/peer.go文件中,其会返回peerMembersHandler结构体。

func newPeerMembersHandler(lg *zap.Logger, cluster api.Cluster) http.Handler {
	return &peerMembersHandler{
		lg:      lg,
		cluster: cluster,
	}
}
type peerMembersHandler struct {
	lg      *zap.Logger
	cluster api.Cluster
}

newPeerMemberPromoteHandler(lg, s)
newPeerMemberPromoteHandler()函数定义在server/etcdserver/api/etcdhttp/peer.go文件中,其会返回peerMemberPromoteHandler结构体。

func newPeerMemberPromoteHandler(lg *zap.Logger, s etcdserver.Server) http.Handler {
	return &peerMemberPromoteHandler{
		lg:      lg,
		cluster: s.Cluster(),
		server:  s,
	}
}
type peerMemberPromoteHandler struct {
	lg      *zap.Logger
	cluster api.Cluster
	server  etcdserver.Server
}

最后

以上就是高挑寒风为你收集整理的ETCD数据库源码分析——集群间网络层服务端接口的全部内容,希望文章能够帮你解决ETCD数据库源码分析——集群间网络层服务端接口所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(56)

评论列表共有 0 条评论

立即
投稿
返回
顶部