我是靠谱客的博主 喜悦百褶裙,最近开发中收集的这篇文章主要介绍第四章 zk源码解读笔记,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

一、客户端源码

1、总体流程

1.1、zkClient客户端流程如下图所示

 

1.2、zkCli.sh配置代码如下:

# use POSTIX interface, symlink is followed automatically
ZOOBIN="${BASH_SOURCE-$0}"
ZOOBIN="$(dirname "${ZOOBIN}")"
ZOOBINDIR="$(cd "${ZOOBIN}"; pwd)"
if [ -e "$ZOOBIN/../libexec/zkEnv.sh" ]; then
  . "$ZOOBINDIR"/../libexec/zkEnv.sh
else
  . "$ZOOBINDIR"/zkEnv.sh
fi
"$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" 
     -cp "$CLASSPATH" $CLIENT_JVMFLAGS $JVMFLAGS 
     org.apache.zookeeper.ZooKeeperMain "$@"

启动客户端 zkCli.sh文件里面的配置,实际运行代码如下:

public static void main(String[] args) throws KeeperException, IOException, InterruptedException {
    ZooKeeperMain main = new ZooKeeperMain(args);
    main.run();
}

1.3、Main方法流程:

new ZooKeeperMain 对象,调用run()方法,在ZookeeperMain的构造方法里面,重点是:

public ZooKeeperMain(String[] args) throws IOException, InterruptedException {
    this.cl.parseOptions(args);
    System.out.println("Connecting to " + this.cl.getOption("server"));
    this.connectToZK(this.cl.getOption("server"));
}

protected void connectToZK(String newHost) throws InterruptedException, IOException {
    if (this.zk != null && this.zk.getState().isAlive()) {
        this.zk.close();
    }
    this.host = newHost;
    boolean readOnly = this.cl.getOption("readonly") != null;
    this.zk = new ZooKeeper(this.host, Integer.parseInt(this.cl.getOption("timeout")),

 new ZooKeeperMain.MyWatcher(), readOnly);
}

public void start() {
    this.sendThread.start();
    this.eventThread.start();
}

最终在connectToZK方法里面也就是使用原生的Zk客户端进行连接的。

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) 、 throws IOException {
    this.watchManager = new ZooKeeper.ZKWatchManager();
    LOG.info("Initiating client connection, connectString=" + connectString + "sessionTimeout="+ sessionTimeout + " watcher=" + watcher);
    this.watchManager.defaultWatcher = watcher;
    ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
    HostProvider hostProvider = new StaticHostProvider(connectStringParser.getServerAddresses());
    this.cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout,

 this, this.watchManager, getClientCnxnSocket(), canBeReadOnly); //获得和服务端连接的对象
    this.cnxn.start(); 
}

ClientCnxn.ClientCnxn():

public ClientCnxn(String hosts, int sessionTimeout, ZooKeeper zooKeeper,ClientWatchManager watcher,

long sessionId, byte[] sessionPasswd) throws IOException{
        this.zooKeeper = zooKeeper;
        this.watcher = watcher;
        this.sessionId = sessionId;
        this.sessionPasswd = sessionPasswd;
        // parse out chroot, if any
        int off = hosts.indexOf('/');
        if (off >= 0) {
            String chrootPath = hosts.substring(off);
            // ignore "/" chroot spec, same as null
            if (chrootPath.length() == 1) {
                this.chrootPath = null;
            } else {
                PathUtils.validatePath(chrootPath);
                this.chrootPath = chrootPath;
            }
            hosts = hosts.substring(0,  off);
        } else {
            this.chrootPath = null;
        }
        String hostsList[] = hosts.split(",");
        for (String host : hostsList) {
            int port = 2181;
            String parts[] = host.split(":");
            if (parts.length > 1) {
                port = Integer.parseInt(parts[1]);
                host = parts[0];
            }
            InetAddress addrs[] = InetAddress.getAllByName(host);
            for (InetAddress addr : addrs) {
                serverAddrs.add(new InetSocketAddress(addr, port));
            }
        }
        this.sessionTimeout = sessionTimeout;
        connectTimeout = sessionTimeout / hostsList.length;
        readTimeout = sessionTimeout * 2 / 3;
        Collections.shuffle(serverAddrs);
        sendThread = new SendThread();
        eventThread = new EventThread();
    }

mian方法流程图如下所示:

 

第三步第四步执行如下: 

@SuppressWarnings("unchecked")
void run() throws KeeperException, IOException, InterruptedException {
    if (cl.getCommand() == null) {
        System.out.println("Welcome to ZooKeeper!");
        boolean jlinemissing = false;
        // only use jline if it's in the classpath
        try {
                Class consoleC = Class.forName("jline.ConsoleReader");
                Class completorC =Class.forName("org.apache.zookeeper.JLineZNodeCompletor");
                System.out.println("JLine support is enabled");
                Object console = consoleC.getConstructor().newInstance();
                Object completor =completorC.getConstructor(ZooKeeper.class).newInstance(zk);
                Method addCompletor = consoleC.getMethod("addCompletor", Class.forName("jline.Completor"));
                addCompletor.invoke(console, completor);
                String line;
                Method readLine = consoleC.getMethod("readLine", String.class);
              // 循环读取命令并执行
                while ((line = (String)readLine.invoke(console, getPrompt())) != null) {
                    executeLine(line);
                }
            }
    }

    public void executeLine(String line)throws InterruptedException, IOException, KeeperException {
      if (!line.equals("")) {
        cl.parseCommand(line);
        addToHistory(commandCount,line);
        processCmd(cl); // 第5步:执行命令
        commandCount++;
      }
    }

第6步:处理命令

protected boolean processZKCmd(MyCommandOptions co) throws KeeperException
  IOException, InterruptedException{
        Stat stat = new Stat();
        String[] args = co.getArgArray();
        String cmd = co.getCommand();
        if (args.length < 1) {
            usage();
            return false;
        }
        if (!commandMap.containsKey(cmd)) {
            usage();
            return false;
        }
        boolean watch = args.length > 2;
        String path = null;
        List<ACL> acl = Ids.OPEN_ACL_UNSAFE;
        LOG.debug("Processing " + cmd);
        if (cmd.equals("quit")) {
            System.out.println("Quitting...");
            zk.close();
            System.exit(0);
        } else if (cmd.equals("history")) {
         .........

         // 创建节点命令
        if (cmd.equals("create") && args.length >= 3) {
            int first = 0;
            CreateMode flags = CreateMode.PERSISTENT; // 持久节点
            if ((args[1].equals("-e") && args[2].equals("-s"))
                    || (args[1]).equals("-s") && (args[2].equals("-e"))) {
                first+=2;
                flags = CreateMode.EPHEMERAL_SEQUENTIAL; 临时顺序节点
            } else if (args[1].equals("-e")) {
                first++;
                flags = CreateMode.EPHEMERAL; // 临时节点
            } else if (args[1].equals("-s")) {
                first++;
                flags = CreateMode.PERSISTENT_SEQUENTIAL; // 持久顺序节点
            }
            if (args.length == first + 4) {
                acl = parseACLs(args[first+3]);
            }
            path = args[first + 1];

            // 第7步:解析create命令
            String newPath = zk.create(path, args[first+2].getBytes(), acl,flags);
            System.err.println("Created " + newPath);
        } else if (cmd.equals("delete") && args.length >= 2) {
            path = args[1];
            zk.delete(path, watch ? Integer.parseInt(args[2]) : -1);
        }

// 第8,9步:解析create命令参数

// 第8,9步:解析create命令参数
public String create(final String path, byte data[], List<ACL> acl, CreateMode createMode)
        throws KeeperException, InterruptedException {
        final String clientPath = path;
        PathUtils.validatePath(clientPath, createMode.isSequential());
        final String serverPath = prependChroot(clientPath);
 		// 第10步:
        RequestHeader h = new RequestHeader();
        h.setType(ZooDefs.OpCode.create);
        CreateRequest request = new CreateRequest();
        CreateResponse response = new CreateResponse();
        request.setData(data);
        request.setFlags(createMode.toFlag());
        request.setPath(serverPath);
        if (acl != null && acl.size() == 0) {
            throw new KeeperException.InvalidACLException();
        }
        request.setAcl(acl);
          // 第11步:提交命令
        ReplyHeader r = cnxn.submitRequest(h, request, response, null);
        if (r.getErr() != 0) {
            throw     KeeperException.create(KeeperException.Code.get(r.getErr()),clientPath);
        }
        if (cnxn.chrootPath == null) {
            return response.getPath();
        } else {
            return response.getPath().substring(cnxn.chrootPath.length());
        }
    }

request打包成packet:

public ReplyHeader submitRequest(RequestHeader h, Record request,Record response, 
	WatchRegistration watchRegistration) throws InterruptedException {
        ReplyHeader r = new ReplyHeader();
        Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration);
        // 第13步:同步等待结果
synchronized (packet) {
            while (!packet.finished) {
                packet.wait();
            }
        }
        return r;
}
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,Record response, AsyncCallback cb, 
	String clientPath,String serverPath, Object ctx, WatchRegistration watchRegistration){
        Packet packet = null;
        synchronized (outgoingQueue) {
            if (h.getType() != OpCode.ping && h.getType() != OpCode.auth) {
                h.setXid(getXid());
            }
            packet = new Packet(h, r, request, response, null, watchRegistration);
            packet.cb = cb;
            packet.ctx = ctx;
            packet.clientPath = clientPath;
            packet.serverPath = serverPath;
            if (!zooKeeper.state.isAlive()) {
                conLossPacket(packet);
            } else {
	        //第12步 将request包装成packet并加入outgoingQueue队列中
                outgoingQueue.add(packet); 
            }
        }
        synchronized (sendThread) {
            selector.wakeup();
        }
        return packet;
    }

2、开启SendThread线程

SendThread流程图如下:

核心run方法代码org.apache.zookeeper.ClientCnxn.SendThread#run如下:

public void run() {
    this.clientCnxnSocket.introduce(this, ClientCnxn.this.sessionId);
    this.clientCnxnSocket.updateNow();
    this.clientCnxnSocket.updateLastSendAndHeard();
    long lastPingRwServer = Time.currentElapsedTime();
    int MAX_SEND_PING_INTERVAL = true;
    InetSocketAddress serverAddress = null;
    while(ClientCnxn.this.state.isAlive()) {
        try {
            if (!this.clientCnxnSocket.isConnected()) {
                if (!this.isFirstConnect) {
                    try {
                        Thread.sleep((long)this.r.nextInt(1000));
                    } catch (InterruptedException var10) {
                        ClientCnxn.LOG.warn("Unexpected exception", var10);
                    }
                }
                if (ClientCnxn.this.closing || !ClientCnxn.this.state.isAlive()) {
                    break;
                }
                if (this.rwServerAddress != null) {
                    serverAddress = this.rwServerAddress;
                    this.rwServerAddress = null;
                } else {
                    serverAddress = ClientCnxn.this.hostProvider.next(1000L);
                }

    第2步:开始连接
                this.startConnect(serverAddress);
                this.clientCnxnSocket.updateLastSendAndHeard();
            }
            int to;
            if (ClientCnxn.this.state.isConnected()) {
                if (ClientCnxn.this.zooKeeperSaslClient != null) {
                    boolean sendAuthEvent = false;
                    if (ClientCnxn.this.zooKeeperSaslClient.getSaslState() == SaslState.INITIAL) {
                        try {
                            ClientCnxn.this.zooKeeperSaslClient.initialize(ClientCnxn.this);
                        } catch (SaslException var9) {
                          ClientCnxn.LOG.error("SASL authentication with Zookeeper Quorum member failed: " + var9);
                            ClientCnxn.this.state = States.AUTH_FAILED;
                            sendAuthEvent = true;
                        }
                    }
                    KeeperState authState = ClientCnxn.this.zooKeeperSaslClient.getKeeperState();
                    if (authState != null) {
                        if (authState == KeeperState.AuthFailed) {
                            ClientCnxn.this.state = States.AUTH_FAILED;
                            sendAuthEvent = true;
                        } else if (authState == KeeperState.SaslAuthenticated) {
                            sendAuthEvent = true;
                        }
                    }
                    if (sendAuthEvent) {
                        ClientCnxn.this.eventThread.queueEvent(new WatchedEvent(EventType.None,

authState, (String)null));
                    }
                }
                to = ClientCnxn.this.readTimeout - this.clientCnxnSocket.getIdleRecv();
            } else {
                to = ClientCnxn.this.connectTimeout - this.clientCnxnSocket.getIdleRecv();
            }
            if (to <= 0) {
                String warnInfo = "Client session timed out, have not heard from server in "

  + this.clientCnxnSocket.getIdleRecv() + "ms for sessionid 0x"

+ Long.toHexString(ClientCnxn.this.sessionId);
                ClientCnxn.LOG.warn(warnInfo);
                throw new ClientCnxn.SessionTimeoutException(warnInfo);
            }
            if (ClientCnxn.this.state.isConnected()) {
                int timeToNextPing = ClientCnxn.this.readTimeout / 2 - this.clientCnxnSocket.getIdleSend()

- (this.clientCnxnSocket.getIdleSend() > 1000 ? 1000 : 0);
                if (timeToNextPing > 0 && this.clientCnxnSocket.getIdleSend() <= 10000) {
                    if (timeToNextPing < to) {
                        to = timeToNextPing;
                    }
                } else {
                    this.sendPing();
                    this.clientCnxnSocket.updateLastSend();
                }
            }
            if (ClientCnxn.this.state == States.CONNECTEDREADONLY) {
                long now = Time.currentElapsedTime();
                int idlePingRwServer = (int)(now - lastPingRwServer);
                if (idlePingRwServer >= this.pingRwTimeout) {
                    lastPingRwServer = now;
                    idlePingRwServer = 0;
                    this.pingRwTimeout = Math.min(2 * this.pingRwTimeout, 60000);
                    this.pingRwServer();
                }
                to = Math.min(to, this.pingRwTimeout - idlePingRwServer);
            }

 7、待发送队列、已发送队列
            this.clientCnxnSocket.doTransport(to, ClientCnxn.this.pendingQueue, ClientCnxn.

this.outgoingQueue, ClientCnxn.this);
        }     

}
    this.cleanup();
    this.clientCnxnSocket.close();
    if (ClientCnxn.this.state.isAlive()) {
        ClientCnxn.this.eventThread.queueEvent(new WatchedEvent(EventType.None,

KeeperState.Disconnected, (String)null));
    }
    ZooTrace.logTraceMessage(ClientCnxn.LOG, ZooTrace.getTextTraceLevel(),

"SendThread exited loop for session: 0x" + Long.toHexString(ClientCnxn.this.getSessionId()));
}
private void startConnect(InetSocketAddress addr) throws IOException {
    saslLoginFailed = false;
    state = States.CONNECTING;
    setName(getName().replaceAll("\(.*\)", "(" + addr.getHostName() + ":" + addr.getPort() + ")"));
    if (ZooKeeperSaslClient.isEnabled()) {
        try {
            String principalUserName = System.getProperty(ZK_SASL_CLIENT_USERNAME, "zookeeper");
            zooKeeperSaslClient =new ZooKeeperSaslClient(principalUserName+"/"+addr.getHostName());
        } catch (LoginException e) {
            LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server

without "+ "SASL authentication, if Zookeeper server allows it.");
            eventThread.queueEvent(new WatchedEvent(
              Watcher.Event.EventType.None,
              Watcher.Event.KeeperState.AuthFailed, null));
            saslLoginFailed = true;
        }
    }
    logStartConnect(addr);
    clientCnxnSocket.connect(addr); //3、使用NIO方式连接
}
@Override
void connect(InetSocketAddress addr) throws IOException {
    SocketChannel sock = createSock();
    try {
       registerAndConnect(sock, addr); // 4、注册连接
    } 
    initialized = false;
    lenBuffer.clear();
    incomingBuffer = lenBuffer;
}

void registerAndConnect(SocketChannel sock, InetSocketAddress addr) throws IOException {
    sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
    boolean immediateConnect = sock.connect(addr);
    if (immediateConnect) {
        sendThread.primeConnection();  // 4、注册连接
    }
}

void primeConnection() throws IOException {
    isFirstConnect = false;
    long sessId = (seenRwServerBefore) ? sessionId : 0;
    ConnectRequest conReq = new ConnectRequest(0, lastZxid,sessionTimeout, sessId, sessionPasswd);
    synchronized (outgoingQueue) {
        if (!disableAutoWatchReset) {......}
        for (AuthData id : authInfo) {
            outgoingQueue.addFirst(new Packet(new RequestHeader(-4,OpCode.auth), null,

new AuthPacket(0, id.scheme,id.data), null, null));
        }

 5、将ConnectRequest 分装到packet中并加入到队列
        outgoingQueue.addFirst(new Packet(null, null, conReq,null, null, readOnly));
    }
    clientCnxnSocket.enableReadWriteOnly();
}

6、连接到zk后继续sendThread.run()方法

7、待发送队列、已发送队列(sendThread.run())

this.clientCnxnSocket.doTransport(to, ClientCnxn.this.pendingQueue, ClientCnxn.this.outgoingQueue, ClientCnxn.this);

@Override
void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
        throws IOException, InterruptedException {
    selector.select(waitTimeOut);
    Set<SelectionKey> selected;
    synchronized (this) { selected = selector.selectedKeys(); }
    updateNow();
    for (SelectionKey k : selected) {
        SocketChannel sc = ((SocketChannel) k.channel());
        if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
            if (sc.finishConnect()) {
                updateLastSendAndHeard();
                sendThread.primeConnection();
            }
        } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
            doIO(pendingQueue, outgoingQueue, cnxn); //8、IO方法:包括发送和接受数据
        }
    }
    if (sendThread.getZkState().isConnected()) {
        synchronized(outgoingQueue) {
      if (findSendablePacket(outgoingQueue,cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
                enableWrite();
            }
        }
    }
    selected.clear();
}

3、开启EventThread

流程图如下:

 

EventThread内部类代码如下:zookeeper-3.4.12.jar!orgapachezookeeperClientCnxn.class

class EventThread extends ZooKeeperThread {
    private final LinkedBlockingQueue<Object> waitingEvents = new LinkedBlockingQueue();
    private volatile KeeperState sessionState;
    private volatile boolean wasKilled;
    private volatile boolean isRunning;
    EventThread() {
        super(ClientCnxn.makeThreadName("-EventThread"));
        this.sessionState = KeeperState.Disconnected;
        this.wasKilled = false;
        this.isRunning = false;
        this.setDaemon(true);
    }

org.apache.zookeeper.ClientCnxn.EventThread.run方法:

public void run() {
    try {
        this.isRunning = true;
        while(true) {
            Object event = this.waitingEvents.take();
            if (event == ClientCnxn.this.eventOfDeath) {
                this.wasKilled = true;
            } else {
                this.processEvent(event);
            }
            if (this.wasKilled) {
                synchronized(this.waitingEvents) {
                    if (this.waitingEvents.isEmpty()) {
                        this.isRunning = false;
                        break;
                    }
                }
            }
        }
    } catch (InterruptedException var5) {
        ClientCnxn.LOG.error("Event thread exiting due to interruption", var5);
    }
    ClientCnxn.LOG.info("EventThread shut down for session: 0x{}", Long.toHexString(ClientCnxn.this.getSessionId()));
}

4、总结:

 

public void start() {
    this.sendThread.start();
    this.eventThread.start();
}

二、服务端源码(单机)

1、总体流程

服务端代码处理流程与客户端类似,具体处理流程如下图所示:

 

 

2、具体处理流程


服务入口位置:QuorumPeerMain.main(String[] args)

zookeeper-3.4.12.jar!orgapachezookeeperserverquorumQuorumPeerMain.class

Step 1:

QuorumPeerMain.main(String[] args) {
    QuorumPeerMain main = new QuorumPeerMain();
    main.initializeAndRun(args);

protected void initializeAndRun(String[] args) throws ConfigException, IOException {
    QuorumPeerConfig config = new QuorumPeerConfig();
    if (args.length == 1) {
        config.parse(args[0]);
    }
    DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval());
    purgeMgr.start();
    if (args.length == 1 && config.servers.size() > 0) {
        this.runFromConfig(config);
    } else {
        ZooKeeperServerMain.main(args);
    }
}

step2:

public static void main(String[] args) {
    ZooKeeperServerMain main = new ZooKeeperServerMain();
    main.initializeAndRun(args);

this.runFromConfig(config);

 

Step3:准备ZooKeeperServer配置信息

public void runFromConfig(ServerConfig config) throws IOException {
    LOG.info("Starting server");
    FileTxnSnapLog txnLog = null;
    try {
        ZooKeeperServer zkServer = new ZooKeeperServer();
        CountDownLatch shutdownLatch = new CountDownLatch(1);
        zkServer.registerServerShutdownHandler(new ZooKeeperServerShutdownHandler(shutdownLatch));
        txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File(config.dataDir));
        zkServer.setTxnLogFactory(txnLog);
        zkServer.setTickTime(config.tickTime);
        zkServer.setMinSessionTimeout(config.minSessionTimeout);
        zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
        this.cnxnFactory = ServerCnxnFactory.createFactory();
        this.cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns());
        this.cnxnFactory.startup(zkServer);
        shutdownLatch.await();
        this.shutdown();
        this.cnxnFactory.join();
        if (zkServer.canShutdown()) {
            zkServer.shutdown(true);
        }
    } catch (InterruptedException var8) {
        LOG.warn("Server interrupted", var8);
    } finally {
        if (txnLog != null) {
            txnLog.close();
        }
    }
}

Step5:创建NIOServerCnxnFactory并调用configure方法

public static ServerCnxnFactory createFactory() throws IOException {
    String serverCnxnFactoryName = System.getProperty("zookeeper.serverCnxnFactory");
    if (serverCnxnFactoryName == null) {
        serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
    }
    try {
        ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory)Class.forName(serverCnxnFactoryName).getDeclaredConstructor().newInstance();
        LOG.info("Using {} as server connection factory", serverCnxnFactoryName);
        return serverCnxnFactory;
    } catch (Exception var3) {
        IOException ioe = new IOException("Couldn't instantiate " + serverCnxnFactoryName);
        ioe.initCause(var3);
        throw ioe;
    }
}

Step 5.1、创建ZooKeeperThread并且打开ServerSocketChannel

public void configure(InetSocketAddress addr, int maxcc) throws IOException {
    this.configureSaslLogin();
    this.thread = new ZooKeeperThread(this, "NIOServerCxn.Factory:" + addr);
    this.thread.setDaemon(true);
    this.maxClientCnxns = maxcc;
    this.ss = ServerSocketChannel.open();
    this.ss.socket().setReuseAddress(true);
    LOG.info("binding to port " + addr);
    this.ss.socket().bind(addr);
    this.ss.configureBlocking(false);
    this.ss.register(this.selector, 16);
}

Step 6、回到step3的runFromConfig(ServerConfig config)方法,启动ZooKeeperServer线程

this.cnxnFactory.startup(zkServer);

public void startup(ZooKeeperServer zks) throws IOException, InterruptedException {
    this.start();
    this.setZooKeeperServer(zks);

 step7:加载快照日志文件到内存
    zks.startdata();
    zks.startup();
}

Step8:启动ZooKeeperServer初始化请求执行器

public synchronized void startup() {
    if (this.sessionTracker == null) {
        this.createSessionTracker();
    }
    this.startSessionTracker();
    this.setupRequestProcessors(); // 初始化请求执行器
    this.registerJMX();
    this.setState(ZooKeeperServer.State.RUNNING);
    this.notifyAll();
}

Step9:开启处理线程

protected void setupRequestProcessors() {
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
    ((SyncRequestProcessor)syncProcessor).start(); //  开启同步处理线程
    this.firstProcessor = new PrepRequestProcessor(this, syncProcessor);
    ((PrepRequestProcessor)this.firstProcessor).start(); // 开启request处理线程
}

Step10:从submittedRequests获取请求

PrepRequestProcessor.run() {
    while(true) {
        try {
            Request request = (Request)this.submittedRequests.take();
            long traceMask = 2L;
            if (request.type == 11) {
                traceMask = 8L;
            }
            if (Request.requestOfDeath != request) {
                this.pRequest(request);
                continue;
            }
        } 
    }
}

Step11:根据request类型响应处理

protected void pRequest(Request request) throws RequestProcessorException {
    request.hdr = null;
    request.txn = null;
    try {
        switch(request.type) {
        case -11:
        case -10:
            this.pRequest2Txn(request.type, this.zks.getNextZxid(), request,(Record)null, true);
            break;
        case 1:
            CreateRequest createRequest = new CreateRequest();
            this.pRequest2Txn(request.type, this.zks.getNextZxid(), request,createRequest, true);
            break;
        case 2:
            DeleteRequest deleteRequest = new DeleteRequest();
            this.pRequest2Txn(request.type, this.zks.getNextZxid(), request,deleteRequest, true);
            break;
            .....
       case 101:
            this.zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
            break;
        case 5:
            SetDataRequest setDataRequest = new SetDataRequest();
            this.pRequest2Txn(request.type, this.zks.getNextZxid(), request,setDataRequest, true);
            break;
        case 7:
            SetACLRequest setAclRequest = new SetACLRequest();
            this.pRequest2Txn(request.type, this.zks.getNextZxid(), request,setAclRequest, true);
            break;
        case 13:
            CheckVersionRequest checkRequest = new CheckVersionRequest();
            this.pRequest2Txn(request.type, this.zks.getNextZxid(), request,checkRequest, true);
            break;
        case 14:
            MultiTransactionRecord multiRequest = new MultiTransactionRecord();
            try {
                ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest);
            } catch (IOException var21) {

              request.hdr = new TxnHeader(request.sessionId, request.cxid, this.zks.getNextZxid(),Time.currentWallTime(), 14);
                throw var21;
            }
            List<Txn> txns = new ArrayList();
            long zxid = this.zks.getNextZxid();
            KeeperException ke = null;
            HashMap<String, ChangeRecord> pendingChanges =    this.getPendingChanges(multiRequest);
            int index = 0;
            for(Iterator var14 = multiRequest.iterator(); var14.hasNext(); ++index) {
                Op op = (Op)var14.next();
                Record subrequest = op.toRequestRecord();
                if (ke != null) {
                    request.hdr.setType(-1);
                    request.txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue());
                } else {
                    try {
                        this.pRequest2Txn(op.getType(), zxid, request, subrequest,false);
                    } catch (KeeperException var20) {
                      ......
                    }
                }
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
                request.txn.serialize(boa, "request");
                ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
                txns.add(new Txn(request.hdr.getType(), bb.array()));
            }
               request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,Time.currentWallTime(), request.type);
            request.txn = new MultiTxn(txns);
            break;
        default:
            LOG.warn("unknown type " + request.type);
        }
    } catch (KeeperException var22) {
       ......
    } 
    request.zxid = this.zks.getZxid();

// 最后nextProcessor把请求添加到SyncRequestProcessor.queuedRequests.add(request);
    this.nextProcessor.processRequest(request);
}

然后根据不同请求,调用对应的PrepRequestProcessor.pRequest2Txn()方法中的addChangeRecord()方法把记录放到outstandingChanges队列中。

this.addChangeRecord(new ChangeRecord(request.hdr.getZxid(), path2Delete, (StatPersisted)null, 0, (List)null));

void addChangeRecord(ChangeRecord c) {
    synchronized(this.zks.outstandingChanges) {
        this.zks.outstandingChanges.add(c);
        this.zks.outstandingChangesForPath.put(c.path, c);
    }
}

SyncRequestProcessor.processRequest(Request request)方法:

this.nextProcessor.processRequest(request);

public void processRequest(Request request) {
    this.queuedRequests.add(request);
}

Step12:调用SyncRequestProcessor.run方法

Step13:从queuedRequests队列中获取请求

si = (Request)this.queuedRequests.take();

Step14:将请求写入日志和快照文件

if (si != null) {
    if (this.zks.getZKDatabase().append(si)) {
        ++logCount;
        if (logCount > snapCount / 2 + randRoll) {
            setRandRoll(this.r.nextInt(snapCount / 2));
            // 请求信息写入日志文件
            this.zks.getZKDatabase().rollLog();
            if (this.snapInProcess != null && this.snapInProcess.isAlive()) {
                LOG.warn("Too busy to snap, skipping");
            } else {
                this.snapInProcess = new ZooKeeperThread("Snapshot Thread") {
                    public void run() {
                        try {
                          // 请求信息写入快照文件
                            SyncRequestProcessor.this.zks.takeSnapshot();
                        } catch (Exception var2) {
                            SyncRequestProcessor.LOG.warn("Unexpected exception", var2);
                        }
                    }
                };
                this.snapInProcess.start();
            }
            logCount = 0;
        }
    }

Step15:调用FinalRequestProcessor.processRequest(Request request)处理请求

Step16:调用ZooKeeperServer.processTxn()处理请求

public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
    int opCode = hdr.getType();
    long sessionId = hdr.getClientId();
    ProcessTxnResult rc = this.getZKDatabase().processTxn(hdr, txn);
    if (opCode == -10) {
        if (txn instanceof CreateSessionTxn) {
            CreateSessionTxn cst = (CreateSessionTxn)txn;
            this.sessionTracker.addSession(sessionId, cst.getTimeOut());
        } else {
            LOG.warn("*****>>>>> Got " + txn.getClass() + " " + txn.toString());
        }
    } else if (opCode == -11) {
        this.sessionTracker.removeSession(sessionId);
    }
    return rc;
}

Step17:dataTree.processTxn(hdr, txn)处理请求

public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
    return this.dataTree.processTxn(hdr, txn);
}

Step18:根据不同的请求类型处理,更新内存并发布相应的watch事件

case 1:
    CreateTxn createTxn = (CreateTxn)txn;
    rc.path = createTxn.getPath();
    this.createNode(createTxn.getPath(), createTxn.getData(), createTxn.getAcl(), createTxn.getEphemeral()

  ? header.getClientId() : 0L, createTxn.getParentCVersion(), header.getZxid(), header.getTime());
    break;
case 2:
    DeleteTxn deleteTxn = (DeleteTxn)txn;
    rc.path = deleteTxn.getPath();
    this.deleteNode(deleteTxn.getPath(), header.getZxid());
    break;
case 5:
    SetDataTxn setDataTxn = (SetDataTxn)txn;
    rc.path = setDataTxn.getPath();
    rc.stat = this.setData(setDataTxn.getPath(), setDataTxn.getData(), setDataTxn.getVersion(),

header.getZxid(), header.getTime());
    break;
case 7:
    SetACLTxn setACLTxn = (SetACLTxn)txn;
    rc.path = setACLTxn.getPath();
    rc.stat = this.setACL(setACLTxn.getPath(), setACLTxn.getAcl(), setACLTxn.getVersion());
    break;

以create处理发布watch事件为例:

public String createNode(String path, byte[] data, List<ACL> acl, long ephemeralOwner, int parentCVersion,

long zxid, long time) throws NoNodeException, NodeExistsException {
    int lastSlash = path.lastIndexOf(47);
    String parentName = path.substring(0, lastSlash);
    String childName = path.substring(lastSlash + 1);
    StatPersisted stat = new StatPersisted();
    stat.setCtime(time);
    stat.setMtime(time);
    stat.setCzxid(zxid);
    stat.setMzxid(zxid);
    stat.setPzxid(zxid);
    stat.setVersion(0);
    stat.setAversion(0);
    stat.setEphemeralOwner(ephemeralOwner);
    DataNode parent = (DataNode)this.nodes.get(parentName);
    if (parent == null) {
        throw new NoNodeException();
    } else {
        synchronized(parent) {
            Set<String> children = parent.getChildren();
            if (children.contains(childName)) {
                throw new NodeExistsException();
            }
            if (parentCVersion == -1) {
                parentCVersion = parent.stat.getCversion();
                ++parentCVersion;
            }
            parent.stat.setCversion(parentCVersion);
            parent.stat.setPzxid(zxid);
            Long longval = this.aclCache.convertAcls(acl);
            DataNode child = new DataNode(parent, data, longval, stat);
            parent.addChild(childName);
            this.nodes.put(path, child);
            if (ephemeralOwner != 0L) {
                HashSet<String> list = (HashSet)this.ephemerals.get(ephemeralOwner);
                if (list == null) {
                    list = new HashSet();
                    this.ephemerals.put(ephemeralOwner, list);
                }
                synchronized(list) {
                    list.add(path);
                }
            }
        }
        if (parentName.startsWith("/zookeeper/quota")) {
            if ("zookeeper_limits".equals(childName)) {
                this.pTrie.addPath(parentName.substring("/zookeeper/quota".length()));
            }
            if ("zookeeper_stats".equals(childName)) {
                this.updateQuotaForPath(parentName.substring("/zookeeper/quota".length()));
            }
        }
        String lastPrefix;
        if ((lastPrefix = this.getMaxPrefixWithQuota(path)) != null) {
            this.updateCount(lastPrefix, 1);
            this.updateBytes(lastPrefix, data == null ? 0L : (long)data.length);
        }
        // 发布watch事件
        this.dataWatches.triggerWatch(path, EventType.NodeCreated);
        this.childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, EventType.NodeChildrenChanged);
        return path;
    }
}

Step19:继续FinalRequestProcessor.processRequest(Request request)方法创建响应的response

switch(request.type) {
case -11:
    lastOp = "CLOS";
    closeSession = true;
    err = Code.get(rc.err);
    break;
case -10:
    this.zks.serverStats().updateLatency(request.createTime);
    lastOp = "SESS";
    cnxn.updateStatsForResponse((long)request.cxid, request.zxid, lastOp, request.createTime,

 Time.currentElapsedTime());
    this.zks.finishSessionInit(request.cnxn, true);
    return;
case 1:
    lastOp = "CREA";
    rsp = new CreateResponse(rc.path);
    err = Code.get(rc.err);
    break;

    ......
((MultiResponse)rsp).add((OpResult)subResult);
public class MultiResponse implements Record, Iterable<OpResult> {
    private List<OpResult> results = new ArrayList();

Step21:调用ServerCnxn.updateStatsForResponse修改状态,并调用sendResponse方法把最终结果返回给对应的客户端。

protected synchronized void updateStatsForResponse(long cxid, long zxid, String op, long start, long end) {
    if (cxid >= 0L) {
        this.lastCxid = cxid;
    }
    this.lastZxid = zxid;
    this.lastOp = op;
    this.lastResponseTime = end;
    long elapsed = end - start;
    this.lastLatency = elapsed;
    if (elapsed < this.minLatency) {
        this.minLatency = elapsed;
    }
    if (elapsed > this.maxLatency) {
        this.maxLatency = elapsed;
    }
    ++this.count;
    this.totalLatency += elapsed;
}

sendResponse方法实现是在NIOServerCnxn.sendResponse(ReplyHeader h, Record r, String tag)中实现的

public synchronized void sendResponse(ReplyHeader h, Record r, String tag) {
    try {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
        try {
            baos.write(fourBytes);
            bos.writeRecord(h, "header");
            if (r != null) {
                bos.writeRecord(r, tag);
            }
            baos.close();
        } catch (IOException var12) {
            LOG.error("Error serializing response");
        }
        byte[] b = baos.toByteArray();
        ByteBuffer bb = ByteBuffer.wrap(b);
        bb.putInt(b.length - 4).rewind();
        this.sendBuffer(bb);
        if (h.getXid() > 0) {
            synchronized(this) {
                --this.outstandingRequests;
            }
            synchronized(this.factory) {
                if (this.zkServer.getInProcess() < this.outstandingLimit || this.outstandingRequests < 1) {
                    this.sk.selector().wakeup();
                    this.enableRecv();
                }
            }
        }
    } catch (Exception var14) {
        LOG.warn("Unexpected exception. Destruction averted.", var14);
    }
}

 

 

 

最后

以上就是喜悦百褶裙为你收集整理的第四章 zk源码解读笔记的全部内容,希望文章能够帮你解决第四章 zk源码解读笔记所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(55)

评论列表共有 0 条评论

立即
投稿
返回
顶部