概述
etcd总共有两种模式。一种是proxy,一种是作为kvstore,这里主要记录了etcd作为kvstore的启动流程,etcd的启动入口在etcd.go/startEtcd,本章先粗略的描述下启动流程的重要环节,后面在详细描述没有重要环节里面的键步骤。
首先为各个peer建立net.Listener,用于后续监听各个peer的连接。
// 存储peers net.Listener的数组 plns := make([]net.Listener, 0) for _, u := range cfg.lpurls { if u.Scheme == "http" && !cfg.peerTLSInfo.Empty() { plog.Warningf("The scheme of peer url %s is http while peer key/cert files are presented. Ignored peer key/cert files.", u.String()) } var l net.Listener l, err = rafthttp.NewListener(u, cfg.peerTLSInfo) if err != nil { return nil, err } urlStr := u.String() plog.Info("listening for peers on ", urlStr) defer func() { if err != nil { l.Close() plog.Info("stopping listening for peers on ", urlStr) } }() // 把建立好的net.Listener放到数组里面 plns = append(plns, l) }
为客户端建立net.Listener,用于监听客户端的连接。
clns := make([]net.Listener, 0) for _, u := range cfg.lcurls { if u.Scheme == "http" && !cfg.clientTLSInfo.Empty() { plog.Warningf("The scheme of client url %s is http while client key/cert files are presented. Ignored client key/cert files.", u.String()) } var l net.Listener l, err = net.Listen("tcp", u.Host) if err != nil { return nil, err } if fdLimit, err := runtimeutil.FDLimit(); err == nil { if fdLimit <= reservedInternalFDNum { plog.Fatalf("file descriptor limit[%d] of etcd process is too low, and should be set higher than %d to ensure internal usage", fdLimit, reservedInternalFDNum) } l = transport.LimitListener(l, int(fdLimit-reservedInternalFDNum)) } // Do not wrap around this listener if TLS Info is set. // HTTPS server expects TLS Conn created by TLSListener. l, err = transport.NewKeepAliveListener(l, u.Scheme, cfg.clientTLSInfo) if err != nil { return nil, err } urlStr := u.String() plog.Info("listening for client requests on ", urlStr) defer func() { if err != nil { l.Close() plog.Info("stopping listening for client requests on ", urlStr) } }() clns = append(clns, l) }
新建一个etcdserver对象,并启动EtcdServer
func NewServer(cfg *ServerConfig) (*EtcdServer, error) { // 生成一个存储etcd目录结构的对象 st := store.New(StoreClusterPrefix, StoreKeysPrefix) var ( // 管理etcd预写式日志的对象 w *wal.WAL // 代表raft算法的中的一个机器节点,主要用于借助raft算法以及peer,完成各类事务的提交 n raft.Node s *raft.MemoryStorage id types.ID cl *cluster ) // ... // 是否存储预写式日志文件,如果存在的会对snap已经wal进行加载,回复etcdserver重启前的状态 haveWAL := wal.Exist(cfg.WALDir()) ss := snap.New(cfg.SnapDir()) prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.peerDialTimeout()) if err != nil { return nil, err } var remotes []*Member switch { // ... // 这里以有WAL进行记录 case haveWAL: if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil { return nil, fmt.Errorf("cannot write to member directory: %v", err) } if err := fileutil.IsDirWriteable(cfg.WALDir()); err != nil { return nil, fmt.Errorf("cannot write to WAL directory: %v", err) } if cfg.ShouldDiscover() { plog.Warningf("discovery token ignored since a cluster has already been initialized. Valid log found at %q", cfg.WALDir()) } var snapshot *raftpb.Snapshot var err error snapshot, err = ss.Load() if err != nil && err != snap.ErrNoSnapshot { return nil, err } if snapshot != nil { if err := st.Recovery(snapshot.Data); err != nil { plog.Panicf("recovered store from snapshot error: %v", err) } plog.Infof("recovered store from snapshot at index %d", snapshot.Metadata.Index) } cfg.Print() // 借助于wal日志以及snapshot把该etcdserverh中存储的数据恢复到重启前的状态,然后启动raftNode的run方法 if !cfg.ForceNewCluster { id, cl, n, s, w = restartNode(cfg, snapshot) } else { id, cl, n, s, w = restartAsStandaloneNode(cfg, snapshot) } cl.SetStore(st) cl.Recover() default: return nil, fmt.Errorf("unsupported bootstrap config") } // ... srv := &EtcdServer{ cfg: cfg, snapCount: cfg.SnapCount, errorc: make(chan error, 1), store: st, // server提交写请求主要通过raftNode.Propose方法,后续交互raft算法内部逻辑完成。 r: raftNode{ Node: n, ticker: time.Tick(time.Duration(cfg.TickMs) * time.Millisecond), raftStorage: s, storage: NewStorage(w, ss), }, id: id, attributes: Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, cluster: cl, stats: sstats, lstats: lstats, SyncTicker: time.Tick(500 * time.Millisecond), peerRt: prt, reqIDGen: idutil.NewGenerator(uint16(id), time.Now()), forceVersionC: make(chan struct{}), msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap), } tr := &rafthttp.Transport{ TLSInfo: cfg.PeerTLSInfo, DialTimeout: cfg.peerDialTimeout(), ID: id, URLs: cfg.PeerURLs, ClusterID: cl.ID(), Raft: srv, Snapshotter: ss, ServerStats: sstats, LeaderStats: lstats, ErrorC: srv.errorc, V3demo: cfg.V3demo, } if err := tr.Start(); err != nil { return nil, err } // add all remotes into transport for _, m := range remotes { if m.ID != id { tr.AddRemote(m.ID, m.PeerURLs) } } for _, m := range cl.Members() { if m.ID != id { // 这个AddPeer的过程中主要完成以下关键两步(针对单个peer): // (1)启动StreamReader用于连接到peer并随时准备从peer接收消息,启动消息处理goroutine,然后通过EtcdServer.Process把消息交给Server处理 // (2)启动StreamWriter用于向peer发送消息 tr.AddPeer(m.ID, m.PeerURLs) } } srv.r.transport = tr return srv, nil } // EtcdServer.Start主要完成两个关键步骤: // (1)启动raftNode,作为EtcdServer与raft.node间的数据交互以及事务提交的中介 // (2)启动自身run,主要raftNode.apply()管道中接收已经被大部分集群节点commit的请求,然后在自身提交 s.Start()
启动设置客户端以及peer的请求处理handler并启动相关HTTP监听服务
ch := &cors.CORSHandler{ // etcdhttp/client.go Handler: etcdhttp.NewClientHandler(s, srvcfg.ReqTimeout()), Info: cfg.corsInfo, } // streamHandler监听peer的连接,收到peer的连接conn后通过peer.attachOutgoingConn把conn和peer的streamWriter关联起来,用于后续向该peer发送消息。 ph := etcdhttp.NewPeerHandler(s) for _, l := range plns { go func(l net.Listener) { plog.Fatal(serveHTTP(l, ph, 5*time.Minute)) }(l) } // Start a client server goroutine for each listen address for _, l := range clns { go func(l net.Listener) { plog.Fatal(serveHTTP(l, ch, 0)) }(l) }
在etcdhttp/client.go文件中NewClientHandler的主要代码如下:
mux := http.NewServeMux()
mux.HandleFunc("/", http.NotFound)
mux.Handle(healthPath, healthHandler(server))
mux.HandleFunc(versionPath,
versionHandler(server.Cluster(), serveVersion))
// 处理key-value相关操作
mux.Handle(keysPrefix, kh)
mux.Handle(keysPrefix+"/", kh)
// ...其他handler
//处理成员管理相关操作
mux.Handle(membersPrefix, mh)
mux.Handle(membersPrefix+"/", mh)
mux.Handle(deprecatedMachinesPrefix, dmh)
// 路线前缀相关值如下,可以在etcdserver启动后通过http://ip:port/XXXPrefix 直接请求对应的handler
const (
authPrefix
= "/v2/auth"
keysPrefix
= "/v2/keys"
deprecatedMachinesPrefix = "/v2/machines"
membersPrefix
= "/v2/members"
statsPrefix
= "/v2/stats"
varsPath
= "/debug/vars"
metricsPath
= "/metrics"
healthPath
= "/health"
versionPath
= "/version"
configPath
= "/config"
pprofPrefix
= "/debug/pprof"
)
NewPeerHandler最终对应文件rafthttp/transport.go中的代码如下:
func (t *Transport) Handler() http.Handler {
pipelineHandler := newPipelineHandler(t, t.Raft, t.ClusterID)
// 这里我们主要关注streamHandler
streamHandler := newStreamHandler(t, t, t.Raft, t.ID, t.ClusterID)
snapHandler := newSnapshotHandler(t, t.Raft, t.Snapshotter, t.ClusterID)
mux := http.NewServeMux()
mux.Handle(RaftPrefix, pipelineHandler)
mux.Handle(RaftStreamPrefix+"/", streamHandler)
mux.Handle(RaftSnapshotPrefix, snapHandler)
mux.Handle(ProbingPrefix, probing.NewHandler())
return mux
}
在文件rafthttp/http.go中streamHandler用于处理http的请求的关键代码如下:
func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// ...
//获取请求来源peerid
fromStr := path.Base(r.URL.Path)
from, err := types.IDFromString(fromStr)
// ...
// 通过peerid拿到对应的peer对象
p := h.peerGetter.Get(from)
// ...
w.WriteHeader(http.StatusOK)
w.(http.Flusher).Flush()
c := newCloseNotifier()
conn := &outgoingConn{
t:
t,
Writer:
w,
Flusher: w.(http.Flusher),
Closer:
c,
}
// 把conn和peer的StreamWriter对象关联起来
p.attachOutgoingConn(conn)
<-c.closeNotify()
}
最后
以上就是无聊发卡为你收集整理的etcd 2.3.7启动流程分析的全部内容,希望文章能够帮你解决etcd 2.3.7启动流程分析所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复