概述
从上一篇文章ETCD数据库源码分析——集群通信初始化我们知道:
- 集群通信服务端会调用configurePeerListeners函数为配置Config中LPUrls每个url创建一个peerListener,该函数为初始化peerListener结构体会调用
transport.NewListenerWithOpts
函数创建net.Listener。configurePeerListeners函数并没有初始化peerListener结构体的serve函数。 - 集群通信服务端会调用servePeers函数启动服务。servePeers函数会创建协程运行
cmux.New(p.Listener).Serve()
函数。 - 集群通信服务端会调用servePeers函数启动服务。servePeers函数首先NewPeerHandler会调用
newPeerHandler(lg,s,s.RaftHandler()...)
,RaftHandler()会返回s.r.transport.Handler(),最终transport.Handler函数返回了注册了pipelineHandler、streamHandler、snapHandler的mux来获取相应的Handler。servePeers函数会创建协程运行&http.Server{Handler: ph, }.Serve(m.Match(cmux.Any()))
这篇文章我们来学习一下transport.NewListenerWithOpts
函数和pipelineHandler、streamHandler、snapHandler。
NewListenerWithOpts函数
Listener的初始化代码如下所示,transport.NewListenerWithOpts函数代码位于client/pkg/transport/listener.go文件中,作为transport包内export到外部的函数。
peers[i] = &peerListener{close: func(context.Context) error { return nil }}
peers[i].Listener, err = transport.NewListenerWithOpts(u.Host, u.Scheme,
transport.WithTLSInfo(&cfg.PeerTLSInfo),
transport.WithSocketOpts(&cfg.SocketOpts),
transport.WithTimeout(rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout),
)
client/pkg/transport/listener.go文件export两个函数给其他代码调用。NewListener函数用于不带选项时使用,从调用看两个函数并没有本质的不同,最终都是会调用client/pkg/transport/listener.go文件中的newListener函数。
// NewListener creates a new listner.
func NewListener(addr, scheme string, tlsinfo *TLSInfo) (l net.Listener, err error) {
return newListener(addr, scheme, WithTLSInfo(tlsinfo))
}
// NewListenerWithOpts creates a new listener which accepts listener options.
func NewListenerWithOpts(addr, scheme string, opts ...ListenerOption) (net.Listener, error) {
return newListener(addr, scheme, opts...)
}
newListener函数比较负责,会根据不同的协议和选项调用不同的函数产生不同的Listener。针对unix或unixs协议调用NewUnixListener函数(定义在client/pkg/transport/unix_listener.go文件);通过listen选项(相关代码位于client/pkg/transport/listener_opts.go文件)来设定监听配置;rwTimeoutListener定义在client/pkg/transport/timeout_listener.go文件中;NewTLSListener定义在client/pkg/transport/listener_tls.go中。其实万变不离其宗net.Listener GO包。
func newListener(addr, scheme string, opts ...ListenerOption) (net.Listener, error) {
if scheme == "unix" || scheme == "unixs" { // unix sockets via unix://laddr
return NewUnixListener(addr)
}
lnOpts := newListenOpts(opts...) // listen选项相关代码位于client/pkg/transport/listener_opts.go文件
switch {
case lnOpts.IsSocketOpts():
config, err := newListenConfig(lnOpts.socketOpts) // new ListenConfig with socket options.
if err != nil { return nil, err }
lnOpts.ListenConfig = config
// check for timeout
fallthrough
case lnOpts.IsTimeout(), lnOpts.IsSocketOpts():
ln, err := lnOpts.ListenConfig.Listen(context.TODO(), "tcp", addr) // timeout listener with socket options.
if err != nil { return nil, err }
lnOpts.Listener = &rwTimeoutListener{
Listener: ln,
readTimeout: lnOpts.readTimeout,
writeTimeout: lnOpts.writeTimeout,
}
case lnOpts.IsTimeout():
ln, err := net.Listen("tcp", addr)
if err != nil { return nil, err }
lnOpts.Listener = &rwTimeoutListener{
Listener: ln,
readTimeout: lnOpts.readTimeout,
writeTimeout: lnOpts.writeTimeout,
}
default:
ln, err := net.Listen("tcp", addr)
if err != nil { return nil, err }
lnOpts.Listener = ln
}
// only skip if not passing TLSInfo
if lnOpts.skipTLSInfoCheck && !lnOpts.IsTLS() { return lnOpts.Listener, nil }
return wrapTLS(scheme, lnOpts.tlsInfo, lnOpts.Listener)
}
func wrapTLS(scheme string, tlsinfo *TLSInfo, l net.Listener) (net.Listener, error) {
if scheme != "https" && scheme != "unixs" { return l, nil }
if tlsinfo != nil && tlsinfo.SkipClientSANVerify { return NewTLSListener(l, tlsinfo) }
return newTLSListener(l, tlsinfo, checkSAN)
}
pipelineHandler、streamHandler、snapHandler
servePeers函数首先NewPeerHandler会调用newPeerHandler(lg,s,s.RaftHandler()...)
,RaftHandler()会返回s.r.transport.Handler(),最终transport.Handler函数返回了注册了pipelineHandler、streamHandler、snapHandler的mux来获取相应的Handler。首先我们先看etcdserver.ServerPeerV2接口,其包含了ServerPeer,而ServerPeer又包含了ServerV2接口,最终可以发现ServerV2接口包含了Server接口。而EtcdServer(server/etcdserver/server.go)是Server接口的实现,因此servePeers函数代码ph := etcdhttp.NewPeerHandler(e.GetLogger(), e.Server)
中形参etcdserver.ServerPeerV2传入EtcdServer实参是没有任何问题的。
type ServerPeerV2 interface {
ServerPeer
HashKVHandler() http.Handler
DowngradeEnabledHandler() http.Handler
}
type ServerPeer interface {
ServerV2
RaftHandler() http.Handler
LeaseHandler() http.Handler
}
type ServerV2 interface {
Server
Leader() types.ID
// Do takes a V2 request and attempts to fulfill it, returning a Response.
Do(ctx context.Context, r pb.Request) (Response, error)
ClientCertAuthEnabled() bool
}
type Server interface {
AddMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error)
RemoveMember(ctx context.Context, id uint64) ([]*membership.Member, error)
UpdateMember(ctx context.Context, updateMemb membership.Member) ([]*membership.Member, error)
PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error)
ClusterVersion() *semver.Version
StorageVersion() *semver.Version
Cluster() api.Cluster
Alarms() []*pb.AlarmMember
LeaderChangedNotify() <-chan struct{}
}
NewPeerHandler函数调用newPeerHandler函数注册raftHandler、leaseHandler、hashKVHandler、downgradeEnabledHandler和versionHandler、peerMembersHandler、peerMemberPromoteHandler到mux中。这里罗列一下这些handler所对应的路径。
handler | path |
---|---|
raftHandler | /raft 或 /raft/ |
leaseHandler | /leases 或 /leases/internal |
hashKVHandler | /members/hashkv |
downgradeEnabledHandler | /downgrade/enabled |
versionHandler | /version |
peerMembersHandler | /members |
peerMemberPromoteHandler | /members/promote/ |
// NewPeerHandler generates an http.Handler to handle etcd peer requests.
func NewPeerHandler(lg *zap.Logger, s etcdserver.ServerPeerV2) http.Handler {
return newPeerHandler(lg, s, s.RaftHandler(), s.LeaseHandler(), s.HashKVHandler(), s.DowngradeEnabledHandler())
}
func newPeerHandler(lg *zap.Logger,s etcdserver.Server,raftHandler http.Handler,leaseHandler http.Handler,hashKVHandler http.Handler,downgradeEnabledHandler http.Handler,) http.Handler {
if lg == nil { lg = zap.NewNop() }
peerMembersHandler := newPeerMembersHandler(lg, s.Cluster())
peerMemberPromoteHandler := newPeerMemberPromoteHandler(lg, s)
mux := http.NewServeMux()
mux.HandleFunc("/", http.NotFound)
mux.Handle(rafthttp.RaftPrefix, raftHandler)
mux.Handle(rafthttp.RaftPrefix+"/", raftHandler)
mux.Handle(peerMembersPath, peerMembersHandler)
mux.Handle(peerMemberPromotePrefix, peerMemberPromoteHandler)
if leaseHandler != nil {
mux.Handle(leasehttp.LeasePrefix, leaseHandler)
mux.Handle(leasehttp.LeaseInternalPrefix, leaseHandler)
}
if downgradeEnabledHandler != nil {
mux.Handle(etcdserver.DowngradeEnabledPath, downgradeEnabledHandler)
}
if hashKVHandler != nil {
mux.Handle(etcdserver.PeerHashKVPath, hashKVHandler)
}
mux.HandleFunc(versionPath, versionHandler(s, serveVersion))
return mux
}
RaftHandler()
RaftHandler()函数定义在server/etcdserver/server.go文件中,其会调用transport的Handler函数。
func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() }
transport的Handler函数定义在server/etcdserver/api/rafthttp/transport.go文件中,如下代码所示,创建pipelineHandler、streamHandler、snapHandler并将其注册到mux中。
func (t *Transport) Handler() http.Handler {
pipelineHandler := newPipelineHandler(t, t.Raft, t.ClusterID)
streamHandler := newStreamHandler(t, t, t.Raft, t.ID, t.ClusterID)
snapHandler := newSnapshotHandler(t, t.Raft, t.Snapshotter, t.ClusterID)
mux := http.NewServeMux()
mux.Handle(RaftPrefix, pipelineHandler)
mux.Handle(RaftStreamPrefix+"/", streamHandler)
mux.Handle(RaftSnapshotPrefix, snapHandler)
mux.Handle(ProbingPrefix, probing.NewHandler())
return mux
}
这里罗列一下这些handler所对应的路径。
handler | path |
---|---|
raftHandler | /raft 或 /raft/ |
pipelineHandler | /raft |
streamHandler | /raft/stream |
snapHandler | /raft/snapshot |
probing.NewHandler() | /raft/probing |
LeaseHandler()
LeaseHandler()函数定义在server/etcdserver/server.go文件中,其会调用leasehttp的NewHandler函数。
func (s *EtcdServer) LeaseHandler() http.Handler {
if s.lessor == nil { return nil }
return leasehttp.NewHandler(s.lessor, s.ApplyWait)
}
HashKVHandler()
HashKVHandler()函数定义在server/etcdserver/corrupt.go文件中,其会返回hashKVHandler结构体。
func (s *EtcdServer) HashKVHandler() http.Handler {
return &hashKVHandler{lg: s.Logger(), server: s}
}
type hashKVHandler struct {
lg *zap.Logger
server *EtcdServer
}
DowngradeEnabledHandler()
DowngradeEnabledHandler()函数定义在server/etcdserver/server.go文件中,其会返回downgradeEnabledHandler结构体。
type downgradeEnabledHandler struct {
lg *zap.Logger
cluster api.Cluster
server *EtcdServer
}
func (s *EtcdServer) DowngradeEnabledHandler() http.Handler {
return &downgradeEnabledHandler{
lg: s.Logger(),
cluster: s.cluster,
server: s,
}
}
versionHandler(s, serveVersion)
versionHandler()函数定义在server/etcdserver/api/etcdhttp/version.go文件中,其会向其函数体中的函数传入ResponseWriter对象和http.Request,其内部函数会调用server.ClusterVersion()和server.StorageVersion()获取集群版本和存储版本,然后调用serveVersion函数将版本信息封装成Versions结构体,设置http相应头信息,序列化后写出。
func versionHandler(server etcdserver.Server, fn func(http.ResponseWriter, *http.Request, string, string)) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
clusterVersion := server.ClusterVersion()
storageVersion := server.StorageVersion()
clusterVersionStr, storageVersionStr := "not_decided", "unknown"
if clusterVersion != nil {
clusterVersionStr = clusterVersion.String()
}
if storageVersion != nil {
storageVersionStr = storageVersion.String()
}
fn(w, r, clusterVersionStr, storageVersionStr)
}
}
func serveVersion(w http.ResponseWriter, r *http.Request, clusterV, storageV string) {
if !allowMethod(w, r, "GET") { return }
vs := version.Versions{
Server: version.Version,
Cluster: clusterV,
Storage: storageV,
}
w.Header().Set("Content-Type", "application/json")
b, err := json.Marshal(&vs)
if err != nil { panic(fmt.Sprintf("cannot marshal versions to json (%v)", err)) }
w.Write(b)
}
newPeerMembersHandler(lg, s.Cluster())
newPeerMembersHandler()函数定义在server/etcdserver/api/etcdhttp/peer.go文件中,其会返回peerMembersHandler结构体。
func newPeerMembersHandler(lg *zap.Logger, cluster api.Cluster) http.Handler {
return &peerMembersHandler{
lg: lg,
cluster: cluster,
}
}
type peerMembersHandler struct {
lg *zap.Logger
cluster api.Cluster
}
newPeerMemberPromoteHandler(lg, s)
newPeerMemberPromoteHandler()函数定义在server/etcdserver/api/etcdhttp/peer.go文件中,其会返回peerMemberPromoteHandler结构体。
func newPeerMemberPromoteHandler(lg *zap.Logger, s etcdserver.Server) http.Handler {
return &peerMemberPromoteHandler{
lg: lg,
cluster: s.Cluster(),
server: s,
}
}
type peerMemberPromoteHandler struct {
lg *zap.Logger
cluster api.Cluster
server etcdserver.Server
}
最后
以上就是高挑寒风为你收集整理的ETCD数据库源码分析——集群间网络层服务端接口的全部内容,希望文章能够帮你解决ETCD数据库源码分析——集群间网络层服务端接口所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复