概述
从本篇开始,将会有一个系列对Etcd源码进行分析。
我之前阅读过很多开源软件,对阅读开源软件,有如下基本思路:
1、了解该软件相关背景知识,例如相关博客、官网,要相信自己不是第一个分析该软件的人
2、对该软件进行使用,例如:编译、运行或者基于接口进行开发
3、找到该软件的合适切入点进行源码分析,例如网络相关的软件(ovs、etcd)找到socket监听服务、接收消息、发送消息
对于etcd这种分布式存储系统,我们的切入点就是socket服务。
一、配置文件
1.1、配置文件简介
研究一个软件最好的方式就是先把配置文件搞清楚,这个是根基。我们看一下etcd的配置文件,默认存储位置为/etc/etcd/etcd.conf,该配置文件一共有5个section分别是:
名称 | 作用 |
member | 本节点的配置,包括监听服务端口、心跳时间等 |
cluster | 集群配置,包括集群状态、集群名称以及本节点广播地址 |
proxy | 用于网络自动发现服务 |
security | 安全配置 |
logging | 日志功能组件 |
其中配置文件中比较重要的是member和cluster配置项。
1.2、部分参数详细说明
1.2.1、member
在Etcd中一共监听了两个服务端口分别2379和2380,对应在member中体现的如下两个配置项:
名称 | 举例 | 作用 |
ETCD_LISTEN_CLIENT_URLS | ETCD_LISTEN_CLIENT_URLS="http://0.0.0.0:2379" | 2379端口用于客户端访问etcd数据库,例如向etcd中插入数据。 |
ETCD_LISTEN_PEER_URLS
| ETCD_LISTEN_PEER_URLS="http://0.0.0.0:2380" | 2380端口用于集群成员间进行通信,例如集群数据同步、心跳。 |
1.2.2、cluster
初次看到配置文件,都会有一个疑问,为什么在members已经设置了监听服务地址,为什么在cluster还要再次设置一次广播地址呢?原因:etcd主要的通信协议主要是http协议,对于http协议中所周知它是B/S结构,而非C/S结构,只能一端主动给另一端发消息而反过来则不可。所以对于集群来说,双方必须都要知道对方具体监听地址。
名称 | 举例 | 作用 |
ETCD_ADVERTISE_CLIENT_URLS | ETCD_LISTEN_CLIENT_URLS="http://10.10.10.128:2379" | 同上表 |
ETCD_INITIAL_ADVERTISE_PEER_URLS | ETCD_INITIAL_ADVERTISE_PEER_URLS="http://10.10.10.128:2380" | 同上表 |
集群相关配置,在后面介绍Raft集群时再进行详细说明。
二、服务监听
从本小节开始介绍详细代码。我们都知道,建立socket服务端一共有5个基本步骤(C语言):创建socket套接字、bind地址及端口、listen监听服务、accept接收客户端连接、启动新线程为客户端服务。正所谓万变不离其宗,到了etcd中(etcd使用默认golang http模块)也是这些步骤,只不过是被封装了一下(语法糖)。
2.1、总体流程图
从这里开始介绍具体流程以及关键代码,对于数据结构会有专门一篇介绍,如下是从main方法入口函数:
流程图简要说明:
1、方法startEtcdOrProxyV2()中,会根据配置文件启动两种不同模式:默认模式和代理模式。默认模式进入方法startEtcd,代理模式进入方法startProxy。这里介绍默认模式。
2、方法newConfig,从名字上来看就知道,此方法是用于读取配置文件或者生成默认配置,在1.2章节中介绍的配置项就是在此方法中读取。
3、执行完方法serve后,会退回到startEtcd中,然后就阻塞在startEtcd方法中,这样整个etcd启动完毕。
2.2、核心方法embed/etcd.go StartEtcd
此方法内容比较长,分为三部分说明:
/*
* 构建Etcd结构 包含一个server、listener
*/
serving := false
e = &Etcd{cfg: *inCfg, stopc: make(chan struct{})} //etcd结构体
cfg := &e.cfg
defer func() {//析构函数
if e == nil || err == nil {
return
}
if !serving {
// errored before starting gRPC server for serveCtx.grpcServerC
for _, sctx := range e.sctxs {
close(sctx.grpcServerC)
}
}
e.Close()
e = nil
}()
if e.Peers, err = startPeerListeners(cfg); err != nil {//为peer创建listener,socket三部曲只到了第二个步骤
return
}
if e.sctxs, err = startClientListeners(cfg); err != nil {//为client创建listener,socket三部曲只到了第二个步骤
return
}
for _, sctx := range e.sctxs {
e.Clients = append(e.Clients, sctx.l)
}
上面完成etcd结构初始化以及listener创建。
// 创建EtcdServer
srvcfg := &etcdserver.ServerConfig{
Name:
cfg.Name,
ClientURLs:
cfg.ACUrls, //客户端url监听地址,2379端口
PeerURLs:
cfg.APUrls, //peer url监听地址,2380端口
DataDir:
cfg.Dir,
DedicatedWALDir:
cfg.WalDir,
SnapCount:
cfg.SnapCount,
MaxSnapFiles:
cfg.MaxSnapFiles,
MaxWALFiles:
cfg.MaxWalFiles,
InitialPeerURLsMap:
urlsmap,
InitialClusterToken:
token,
DiscoveryURL:
cfg.Durl,
DiscoveryProxy:
cfg.Dproxy,
NewCluster:
cfg.IsNewCluster(),
/* 是否新的集群 */
ForceNewCluster:
cfg.ForceNewCluster,
PeerTLSInfo:
cfg.PeerTLSInfo,
TickMs:
cfg.TickMs,
ElectionTicks:
cfg.ElectionTicks(),
AutoCompactionRetention: cfg.AutoCompactionRetention,
QuotaBackendBytes:
cfg.QuotaBackendBytes,
StrictReconfigCheck:
cfg.StrictReconfigCheck,
ClientCertAuthEnabled:
cfg.ClientTLSInfo.ClientCertAuth,
AuthToken:
cfg.AuthToken,
}
//创建EtcdServer并且创建raftNode并运行raftNode
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
return
}
// configure peer handlers after rafthttp.Transport started
// 生成http.hander 用于处理peer请求
ph := etcdhttp.NewPeerHandler(e.Server)
for _, p := range e.Peers {
srv := &http.Server{
Handler:
ph,
ReadTimeout: 5 * time.Minute,
ErrorLog:
defaultLog.New(ioutil.Discard, "", 0), // do not log user error
}
l := p.Listener //上一段代码创建的listener
p.serve = func() error { return srv.Serve(l) } //回调函数,激活服务,主要是Accept方法
p.close = func(ctx context.Context) error {关闭服务,回调掉函数。即socket关闭时调用此方法
// gracefully shutdown http.Server
// close open listeners, idle connections
// until context cancel or time-out
return srv.Shutdown(ctx)
}
}
//上面handler用于处理peer发过来的请求以及设置回调函数。
// buffer channel so goroutines on closed connections won't wait forever
e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs))
//运行EtcdSever 监听服务
e.Server.Start()
if err = e.serve(); err != nil {//激活服务,主要调用第二段代码的回调函数serve
return
}
serving = true
下面重点分析一下listerner以及serve回调函数。
2.3、Listener分析
Listener有两个分别为:peer listener和client listener,两者大同小异,这里拿peer listener做为分析对象。
方法startPeerListeners,中主要核心代码,如下:
for i, u := range cfg.LPUrls {//循环遍历多个peer url
if u.Scheme == "http" {
if !cfg.PeerTLSInfo.Empty() {
plog.Warningf("The scheme of peer url %s is HTTP while peer key/cert files are presented. Ignored peer key/cert files.", u.String())
}
if cfg.PeerTLSInfo.ClientCertAuth {
plog.Warningf("The scheme of peer url %s is HTTP while client cert auth (--peer-client-cert-auth) is enabled. Ignored client cert auth for this url.", u.String())
}
}
/* 构造peerListener对象 监听2380 作为服务端模式 */
peers[i] = &peerListener{close: func(context.Context) error { return nil }}
//调用接口,创建listener对象,返回来之后,
//socket套接字已经完成listener监听流程
peers[i].Listener, err = rafthttp.NewListener(u, &cfg.PeerTLSInfo)
if err != nil {
return nil, err
}
// once serve, overwrite with 'http.Server.Shutdown'
// close回调方法,用于关闭socket套接字
peers[i].close = func(context.Context) error {
return peers[i].Listener.Close()
}
plog.Info("listening for peers on ", u.String())
}
func newListener(addr string, scheme string) (net.Listener, error) {
if scheme == "unix" || scheme == "unixs" {
// unix sockets via unix://laddr
return NewUnixListener(addr)
}
return net.Listen("tcp", addr) //调用golang内置方法,返回listener对象
}
从startPeerListeners到net.Listen整个流程中,有掺杂tls以及unix socket相关逻辑,添加这些只为了保证各种需求,大体流程并没有变化,这里不在对齐进行详细说明。至此,两个服务均已完成监听步骤,下面就是接收对端请求即Accept过程。
三、服务激活
在上面已经介绍了,服务端socket需要调用Accept方法,我们来看一下serve方法。方法serve大致内容为:将每个服务放到gorouting中,也就是启动一个协程来监听服务。
func (e *Etcd) serve() (err error) {
var ctlscfg *tls.Config
if !e.cfg.ClientTLSInfo.Empty() {
plog.Infof("ClientTLS: %s", e.cfg.ClientTLSInfo)
if ctlscfg, err = e.cfg.ClientTLSInfo.ServerConfig(); err != nil {
return err
}
}
if e.cfg.CorsInfo.String() != "" {
plog.Infof("cors = %s", e.cfg.CorsInfo)
}
// Start the peer server in a goroutine
// 为Peer启动协程
for _, pl := range e.Peers {
go func(l *peerListener) {
// 集群peer 前期已经创建listener,此处将会调用accept,
// 那么serve()是什么地方定义的?
e.errHandler(l.serve())
}(pl)
}
// Start a client server goroutine for each listen address
// 为Client启动协程
var h http.Handler
if e.Config().EnableV2 {
h = v2http.NewClientHandler(e.Server, e.Server.Cfg.ReqTimeout())
} else {
mux := http.NewServeMux()
etcdhttp.HandleBasic(mux, e.Server)
h = mux
}
h = http.Handler(&cors.CORSHandler{Handler: h, Info: e.cfg.CorsInfo})
for _, sctx := range e.sctxs {
go func(s *serveCtx) {
// client 前期已经创建listener,此处将调用accept
e.errHandler(s.serve(e.Server, ctlscfg, h, e.errHandler))
}(sctx)
}
return nil
}
那么在l.serve()或者s.serve()在什么地方定义的?在方法StartEtcd中生成http.PeerHandler附近,在第二章节已经有介绍。我们来看一下具体内容:
srv := &http.Server{
Handler:
ph, //http handler 用于处理业务逻辑 后面调用handler中ServeHTTP方法
ReadTimeout: 5 * time.Minute,
ErrorLog:
defaultLog.New(ioutil.Discard, "", 0), // do not log user error
}
l := p.Listener
p.serve = func() error { return srv.Serve(l) } //激活服务,这地方调用的srv.Serve(l),此方法调用golang内置方法http.Server.
下面简要分析一下http.Server方法:
func (srv *Server) Serve(l net.Listener) error {
defer l.Close()
if fn := testHookServerServe; fn != nil {
fn(srv, l)
}
var tempDelay time.Duration // how long to sleep on accept failure
if err := srv.setupHTTP2_Serve(); err != nil {
return err
}
srv.trackListener(l, true)
defer srv.trackListener(l, false)
baseCtx := context.Background() // base is always background, per Issue 16220
ctx := context.WithValue(baseCtx, ServerContextKey, srv)
for {//死循环
rw, e := l.Accept()
//阻塞等待客户端程序连接
.... //省略一些代码
tempDelay = 0
c := srv.newConn(rw)
c.setState(c.rwc, StateNew) // before Serve can return
go c.serve(ctx) //表示启动协程,服务新的连接。
}
}
此处的Accept定义在什么地方呢?Accept为golang内置方法,定义在Go/src/net/tcpsock.go,对于Accept内容不再展开,只要知道它会一直阻塞,直到有新的连接才会返回。
这里着重介绍一下http/server.go中serve方法。在serve方法内部主要内容是一个for循环:
for {
w, err := c.readRequest(ctx) //接收到对端发送的http请求
if c.r.remain != c.server.initialReadLimitSize() {
// If we read any bytes off the wire, we're active.
c.setState(c.rwc, StateActive)
}
...
// Expect 100 Continue support
req := w.req
if req.expectsContinue() {
if req.ProtoAtLeast(1, 1) && req.ContentLength != 0 {
// Wrap the Body reader with one that replies on the connection
req.Body = &expectContinueReader{readCloser: req.Body, resp: w}
}
} else if req.Header.get("Expect") != "" {
w.sendExpectationFailed()
return
}
c.curReq.Store(w)
if requestBodyRemains(req.Body) {
registerOnHitEOF(req.Body, w.conn.r.startBackgroundRead)
} else {
if w.conn.bufr.Buffered() > 0 {
w.conn.r.closeNotifyFromPipelinedRequest()
}
w.conn.r.startBackgroundRead()
}
// HTTP cannot have multiple simultaneous active requests.[*]
// Until the server replies to this request, it can't read another,
// so we might as well run the handler in this goroutine.
// [*] Not strictly true: HTTP pipelining. We could let them all process
// in parallel even if their responses need to be serialized.
// But we're not going to implement HTTP pipelining because it
// was never deployed in the wild and the answer is HTTP/2.
serverHandler{c.server}.ServeHTTP(w, w.req) //处理http请求
w.cancelCtx()
if c.hijacked() {
return
}
w.finishRequest() //将缓冲区数据发送到对端,完成http此次请求
...
}
调用c.readRequest(ctx)方法等待对端发来的请求,调用serverHandler{c.server}.ServeHTTP(w,w.req)处理客户端请求并且发送到对端。来看一下ServeHTTP实现:
func (sh serverHandler) ServeHTTP(rw ResponseWriter, req *Request) {
handler := sh.srv.Handler
if handler == nil {
handler = DefaultServeMux
}
if req.RequestURI == "*"&& req.Method == "OPTIONS" {
handler = globalOptionsHandler{}
}
handler.ServeHTTP(rw, req)
}
会发现在内部会调用handler.ServeHTTP方法,那么此处的handler在什么地方定义的?ServeHTTP又在哪里定义的?继续看下面章节
四、接收流程
上一章节介绍调用handler.ServeHTTP,可想而知,这个是golang内置的http框架,框架不知道具体业务需求是什么,所以一般场景下handler都是用户自定义的,用户根据不同业务需求来实现不同的handler。对Etcd中存在很多handler,后面会文章进行详细分析。
现在解答一下,handler在什么地方定义以及ServeHTTP定义在什么地方?这个其实在第三章就已经有提到,在创建完listener会创建一个handler,如etcdhttp.NewPeerHandler(e.Server),处理集群节点内消息。
先来看一下创建的handler里面有什么内容?用peerhandler作为例子说明,在构造方法NewPeerHandler中主要调用newPeerHandler方法:
func newPeerHandler(cluster api.Cluster, raftHandler http.Handler, leaseHandler http.Handler) http.Handler {
mh := &peerMembersHandler{
cluster: cluster,
}
//将url和业务层handler注册到servemux中,也就是每一个url请求都会有其对应的handler进行处理。
mux := http.NewServeMux() //初始化一个Serve Multiplexer结构
mux.HandleFunc("/", http.NotFound)
mux.Handle(rafthttp.RaftPrefix, raftHandler)
/* rafthttp.RaftPrefix == /raft */
mux.Handle(rafthttp.RaftPrefix+"/", raftHandler)
mux.Handle(peerMembersPrefix, mh) //处理请求/members handler是mh,即peerMembersHandler
if leaseHandler != nil {
mux.Handle(leasehttp.LeasePrefix, leaseHandler)
/* /leases */
mux.Handle(leasehttp.LeaseInternalPrefix, leaseHandler) /* /leases/internal */
}
mux.HandleFunc(versionPath, versionHandler(cluster, serveVersion))
return mux
}
通过上面的代码可知,应用层业务逻辑需要自己注册url和handler,这样才能保证每个http request都能够被处理。而每个handler都必须要实现对应接口ServeHTTP,例如peerMembersHandler,实现的ServeHTTP接口是用于返回集群成员列表。那么此处只是完成注册,那么在什么地方会调用此处handler?接下来看一下golang内置方法。
流程图中所有方法均在Go/src/net/http/server.go中。通过流程图可知:
// ServeHTTP dispatches the request to the handler whose
// pattern most closely matches the request URL.
func (mux *ServeMux) ServeHTTP(w ResponseWriter, r *Request) {
if r.RequestURI == "*" {
if r.ProtoAtLeast(1, 1) {
w.Header().Set("Connection", "close")
}
w.WriteHeader(StatusBadRequest)
return
}
h, _ := mux.Handler(r)
//注册表查找,用户自定义的handler,并且调用ServeHTTP接口,处理该http request请求。
h.ServeHTTP(w, r)
}
五、发送流程
对于发送流程,这里打算简单介绍一下,后续在介绍raft协议是会详细说明。
通过之前介绍,ServeHTTP接口是用于处理http request请求入口,每一个handler都必须实现此接口。此接口有两个参数,分别为:ResponseWriter,Request。其中ResponseWriter就是用于响应,http请求,这里用peerMembersHandler作为举例(比较简单),代码如下:
/* 获取集群成员列表 */
func (h *peerMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r, "GET") {
return
}
w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String())
if r.URL.Path != peerMembersPrefix {
http.Error(w, "bad path", http.StatusBadRequest)
return
}
ms := h.cluster.Members() //获取集群成员列表
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(ms); err != nil {//调用json接口,进行格式化并且将数据写到缓冲区中
plog.Warningf("failed to encode members response (%v)", err)
}
}
通过查看Encode代码可知,最后会将会数据写入到ResponseWriter的缓冲区中,即调用Wirter接口,并没有真正发送到对端。那么在什么地方才会真正发出去呢?在golang http源码w.finishRequest(),此处会进行刷新操作,将缓冲区数据发送到对端。在上面已经提交到。
六、总结
至此,介绍Etcd网络通信流程就结束了,下一篇介绍网络模型进阶篇。
最后
以上就是故意小熊猫为你收集整理的Etcd源码分析-网络模型一、配置文件二、服务监听三、服务激活四、接收流程五、发送流程六、总结的全部内容,希望文章能够帮你解决Etcd源码分析-网络模型一、配置文件二、服务监听三、服务激活四、接收流程五、发送流程六、总结所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复