我是靠谱客的博主 无聊发卡,最近开发中收集的这篇文章主要介绍etcd 2.3.7启动流程分析,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

etcd总共有两种模式。一种是proxy,一种是作为kvstore,这里主要记录了etcd作为kvstore的启动流程,etcd的启动入口在etcd.go/startEtcd,本章先粗略的描述下启动流程的重要环节,后面在详细描述没有重要环节里面的键步骤。

  1. 首先为各个peer建立net.Listener,用于后续监听各个peer的连接。

    // 存储peers net.Listener的数组
    plns := make([]net.Listener, 0)
    for _, u := range cfg.lpurls {
    if u.Scheme == "http" && !cfg.peerTLSInfo.Empty() {
    plog.Warningf("The scheme of peer url %s is http while peer key/cert files are presented. Ignored peer key/cert files.", u.String())
    }
    var l net.Listener
    l, err = rafthttp.NewListener(u, cfg.peerTLSInfo)
    if err != nil {
    return nil, err
    }
    urlStr := u.String()
    plog.Info("listening for peers on ", urlStr)
    defer func() {
    if err != nil {
    l.Close()
    plog.Info("stopping listening for peers on ", urlStr)
    }
    }()
    // 把建立好的net.Listener放到数组里面
    plns = append(plns, l)
    }
  2. 为客户端建立net.Listener,用于监听客户端的连接。

    clns := make([]net.Listener, 0)
    for _, u := range cfg.lcurls {
    if u.Scheme == "http" && !cfg.clientTLSInfo.Empty() {
    plog.Warningf("The scheme of client url %s is http while client key/cert files are presented. Ignored client key/cert files.", u.String())
    }
    var l net.Listener
    l, err = net.Listen("tcp", u.Host)
    if err != nil {
    return nil, err
    }
    if fdLimit, err := runtimeutil.FDLimit(); err == nil {
    if fdLimit <= reservedInternalFDNum {
    plog.Fatalf("file descriptor limit[%d] of etcd process is too low, and should be set higher than %d to ensure internal usage", fdLimit, reservedInternalFDNum)
    }
    l = transport.LimitListener(l, int(fdLimit-reservedInternalFDNum))
    }
    // Do not wrap around this listener if TLS Info is set.
    // HTTPS server expects TLS Conn created by TLSListener.
    l, err = transport.NewKeepAliveListener(l, u.Scheme, cfg.clientTLSInfo)
    if err != nil {
    return nil, err
    }
    urlStr := u.String()
    plog.Info("listening for client requests on ", urlStr)
    defer func() {
    if err != nil {
    l.Close()
    plog.Info("stopping listening for client requests on ", urlStr)
    }
    }()
    clns = append(clns, l)
    }
  3. 新建一个etcdserver对象,并启动EtcdServer

    func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
    // 生成一个存储etcd目录结构的对象
    st := store.New(StoreClusterPrefix, StoreKeysPrefix)
    var (
    // 管理etcd预写式日志的对象
    w
    *wal.WAL
    // 代表raft算法的中的一个机器节点,主要用于借助raft算法以及peer,完成各类事务的提交
    n
    raft.Node
    s
    *raft.MemoryStorage
    id types.ID
    cl *cluster
    )
    // ...
    // 是否存储预写式日志文件,如果存在的会对snap已经wal进行加载,回复etcdserver重启前的状态
    haveWAL := wal.Exist(cfg.WALDir())
    ss := snap.New(cfg.SnapDir())
    prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.peerDialTimeout())
    if err != nil {
    return nil, err
    }
    var remotes []*Member
    switch {
    // ...
    // 这里以有WAL进行记录
    case haveWAL:
    if err := fileutil.IsDirWriteable(cfg.MemberDir()); err != nil {
    return nil, fmt.Errorf("cannot write to member directory: %v", err)
    }
    if err := fileutil.IsDirWriteable(cfg.WALDir()); err != nil {
    return nil, fmt.Errorf("cannot write to WAL directory: %v", err)
    }
    if cfg.ShouldDiscover() {
    plog.Warningf("discovery token ignored since a cluster has already been initialized. Valid log found at %q", cfg.WALDir())
    }
    var snapshot *raftpb.Snapshot
    var err error
    snapshot, err = ss.Load()
    if err != nil && err != snap.ErrNoSnapshot {
    return nil, err
    }
    if snapshot != nil {
    if err := st.Recovery(snapshot.Data); err != nil {
    plog.Panicf("recovered store from snapshot error: %v", err)
    }
    plog.Infof("recovered store from snapshot at index %d", snapshot.Metadata.Index)
    }
    cfg.Print()
    // 借助于wal日志以及snapshot把该etcdserverh中存储的数据恢复到重启前的状态,然后启动raftNode的run方法
    if !cfg.ForceNewCluster {
    id, cl, n, s, w = restartNode(cfg, snapshot)
    } else {
    id, cl, n, s, w = restartAsStandaloneNode(cfg, snapshot)
    }
    cl.SetStore(st)
    cl.Recover()
    default:
    return nil, fmt.Errorf("unsupported bootstrap config")
    }
    // ...
    srv := &EtcdServer{
    cfg:
    cfg,
    snapCount: cfg.SnapCount,
    errorc:
    make(chan error, 1),
    store:
    st,
    // server提交写请求主要通过raftNode.Propose方法,后续交互raft算法内部逻辑完成。
    r: raftNode{
    Node:
    n,
    ticker:
    time.Tick(time.Duration(cfg.TickMs) * time.Millisecond),
    raftStorage: s,
    storage:
    NewStorage(w, ss),
    },
    id:
    id,
    attributes:
    Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
    cluster:
    cl,
    stats:
    sstats,
    lstats:
    lstats,
    SyncTicker:
    time.Tick(500 * time.Millisecond),
    peerRt:
    prt,
    reqIDGen:
    idutil.NewGenerator(uint16(id), time.Now()),
    forceVersionC: make(chan struct{}),
    msgSnapC:
    make(chan raftpb.Message, maxInFlightMsgSnap),
    }
    tr := &rafthttp.Transport{
    TLSInfo:
    cfg.PeerTLSInfo,
    DialTimeout: cfg.peerDialTimeout(),
    ID:
    id,
    URLs:
    cfg.PeerURLs,
    ClusterID:
    cl.ID(),
    Raft:
    srv,
    Snapshotter: ss,
    ServerStats: sstats,
    LeaderStats: lstats,
    ErrorC:
    srv.errorc,
    V3demo:
    cfg.V3demo,
    }
    if err := tr.Start(); err != nil {
    return nil, err
    }
    // add all remotes into transport
    for _, m := range remotes {
    if m.ID != id {
    tr.AddRemote(m.ID, m.PeerURLs)
    }
    }
    for _, m := range cl.Members() {
    if m.ID != id {
    // 这个AddPeer的过程中主要完成以下关键两步(针对单个peer):
    // (1)启动StreamReader用于连接到peer并随时准备从peer接收消息,启动消息处理goroutine,然后通过EtcdServer.Process把消息交给Server处理
    // (2)启动StreamWriter用于向peer发送消息
    tr.AddPeer(m.ID, m.PeerURLs)
    }
    }
    srv.r.transport = tr
    return srv, nil
    }
    // EtcdServer.Start主要完成两个关键步骤:
    // (1)启动raftNode,作为EtcdServer与raft.node间的数据交互以及事务提交的中介
    // (2)启动自身run,主要raftNode.apply()管道中接收已经被大部分集群节点commit的请求,然后在自身提交
    s.Start()
  4. 启动设置客户端以及peer的请求处理handler并启动相关HTTP监听服务

    ch := &cors.CORSHandler{
    // etcdhttp/client.go
    Handler: etcdhttp.NewClientHandler(s, srvcfg.ReqTimeout()),
    Info:
    cfg.corsInfo,
    }
    // streamHandler监听peer的连接,收到peer的连接conn后通过peer.attachOutgoingConn把conn和peer的streamWriter关联起来,用于后续向该peer发送消息。
    ph := etcdhttp.NewPeerHandler(s)
    for _, l := range plns {
    go func(l net.Listener) {
    plog.Fatal(serveHTTP(l, ph, 5*time.Minute))
    }(l)
    }
    // Start a client server goroutine for each listen address
    for _, l := range clns {
    go func(l net.Listener) {
    plog.Fatal(serveHTTP(l, ch, 0))
    }(l)
    }

    在etcdhttp/client.go文件中NewClientHandler的主要代码如下:


mux := http.NewServeMux()
mux.HandleFunc("/", http.NotFound)
mux.Handle(healthPath, healthHandler(server))
mux.HandleFunc(versionPath,
versionHandler(server.Cluster(), serveVersion))
// 处理key-value相关操作
mux.Handle(keysPrefix, kh)
mux.Handle(keysPrefix+"/", kh)
// ...其他handler
//处理成员管理相关操作
mux.Handle(membersPrefix, mh)
mux.Handle(membersPrefix+"/", mh)
mux.Handle(deprecatedMachinesPrefix, dmh)
// 路线前缀相关值如下,可以在etcdserver启动后通过http://ip:port/XXXPrefix 直接请求对应的handler
const (
authPrefix
= "/v2/auth"
keysPrefix
= "/v2/keys"
deprecatedMachinesPrefix = "/v2/machines"
membersPrefix
= "/v2/members"
statsPrefix
= "/v2/stats"
varsPath
= "/debug/vars"
metricsPath
= "/metrics"
healthPath
= "/health"
versionPath
= "/version"
configPath
= "/config"
pprofPrefix
= "/debug/pprof"
)

NewPeerHandler最终对应文件rafthttp/transport.go中的代码如下:

func (t *Transport) Handler() http.Handler {
pipelineHandler := newPipelineHandler(t, t.Raft, t.ClusterID)
// 这里我们主要关注streamHandler
streamHandler := newStreamHandler(t, t, t.Raft, t.ID, t.ClusterID)
snapHandler := newSnapshotHandler(t, t.Raft, t.Snapshotter, t.ClusterID)
mux := http.NewServeMux()
mux.Handle(RaftPrefix, pipelineHandler)
mux.Handle(RaftStreamPrefix+"/", streamHandler)
mux.Handle(RaftSnapshotPrefix, snapHandler)
mux.Handle(ProbingPrefix, probing.NewHandler())
return mux
}

在文件rafthttp/http.go中streamHandler用于处理http的请求的关键代码如下:

func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// ...
//获取请求来源peerid
fromStr := path.Base(r.URL.Path)
from, err := types.IDFromString(fromStr)
// ...
// 通过peerid拿到对应的peer对象
p := h.peerGetter.Get(from)
// ...
w.WriteHeader(http.StatusOK)
w.(http.Flusher).Flush()
c := newCloseNotifier()
conn := &outgoingConn{
t:
t,
Writer:
w,
Flusher: w.(http.Flusher),
Closer:
c,
}
// 把conn和peer的StreamWriter对象关联起来
p.attachOutgoingConn(conn)
<-c.closeNotify()
}

最后

以上就是无聊发卡为你收集整理的etcd 2.3.7启动流程分析的全部内容,希望文章能够帮你解决etcd 2.3.7启动流程分析所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(52)

评论列表共有 0 条评论

立即
投稿
返回
顶部