我是靠谱客的博主 冷傲电脑,最近开发中收集的这篇文章主要介绍ZooKeeper源码分析之完整网络通信流程(三)2021SC@SDUSC前言客户端收到服务端的响应后的处理流程总结,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

文章目录

  • 2021SC@SDUSC
  • 前言
  • 客户端收到服务端的响应后的处理流程
  • 总结

2021SC@SDUSC

前言

上一章讲到服务端处理完请求,通过NIOServerCnxn的sendBuffer()方法写入客户端的缓冲区中,在此之前一步已经初始化了ConnectResponse响应对象,因此写完后会调用rsp.serialize(bos, “connect”),返回给客户端一个ConnectResponse响应对象。当客户端收到这一响应对象时,又该如何操作,且看下文。

客户端收到服务端的响应后的处理流程

在第一章中,讲述过客户端的sendThread线程发出请求中,处于等待服务端响应的状态。此时服务端发回响应,客户端的sendThread收到响应,此时需要进行读操作,读取缓冲区中的内容。 具体流程如下:

(1) 由于sendThread处于等待状态,此时当CPU调用该线程时,线程执行run()方法,其中调用了clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this),这与之前客户端发出请求一样,只是其中doTransport内的doIO方法调用了不同的操作语句,之前发出请求调用的是写入操作,而现在处理响应则是读取操作。

    void doIO(Queue<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {
        SocketChannel sock = (SocketChannel) sockKey.channel();
        if (sock == null) {
            throw new IOException("Socket is null!");
        }
        // 如果可读
        if (sockKey.isReadable()) {
            // 读取输入缓冲区的响应包的长度
            int rc = sock.read(incomingBuffer);
            if (rc < 0) {
                throw new EndOfStreamException("Unable to read additional data from server sessionid 0x"
                                               + Long.toHexString(sessionId)
                                               + ", likely server has closed socket");
            }
            // 如果输入缓冲区的响应包读取满,读取同一个包需要读取两次,一次为长度,一次为内容
            if (!incomingBuffer.hasRemaining()) {
                incomingBuffer.flip();
                // 如果读取的是长度,则给incomingBuffer分配包长度的空间
                if (incomingBuffer == lenBuffer) {
                    recvCount.getAndIncrement();
                    readLength();
                } else if (!initialized) {// 如果还未初始化,即会话未建立,则服务端必须返回ConnectResponse
                    // 读取ConnectRequest,将incomingBuffer的内容反序列化成ConnectResponse对象
                    readConnectResult();
                    // 继续读后续响应
                    enableRead();
                    // 如果还有写请求,确保write事件ok
                    if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
                        // Since SASL authentication has completed (if client is configured to do so),
                        // outgoing packets waiting in the outgoingQueue can now be sent.
                        enableWrite();
                    }
                    // 准备读下一个响应
                    lenBuffer.clear();
                    incomingBuffer = lenBuffer;
                    updateLastHeard();
                    // 会话建立完毕
                    initialized = true;
                } else {
                    sendThread.readResponse(incomingBuffer);
                    lenBuffer.clear();
                    incomingBuffer = lenBuffer;
                    updateLastHeard();
                }
            }
        }
        ……
        ……
 }

(2)显然此时给定的场景会话是未被初始化的,则调用readConnectResult(),因此需要分析该方法是如何将incomingBuffer的内容反序列化成ConnectResponse对象的。

    void readConnectResult() throws IOException {
        if (LOG.isTraceEnabled()) {
            StringBuilder buf = new StringBuilder("0x[");
            for (byte b : incomingBuffer.array()) {
                buf.append(Integer.toHexString(b)).append(",");
            }
            buf.append("]");
            if (LOG.isTraceEnabled()) {
                LOG.trace("readConnectResult {} {}", incomingBuffer.remaining(), buf.toString());
            }
        }
        // 将incomingBuffer反序列化成ConnectResponse
        ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
        BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
        ConnectResponse conRsp = new ConnectResponse();
        conRsp.deserialize(bbia, "connect");

        // 仅读标志位
        boolean isRO = false;
        try {
            isRO = bbia.readBool("readOnly");
        } catch (IOException e) {
            // this is ok -- just a packet from an old server which
            // doesn't contain readOnly field
            LOG.warn("Connected to an old server; r-o mode will be unavailable");
        }
        // 服务端返回的会话ID
        this.sessionId = conRsp.getSessionId();
        // 后续处理,初始化客户端的一些参数,最后触发WatchedEvent线程
        sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), isRO);
    }

(3) 反序列化后,则需要关心后续处理,此时则需要进入onConnected方法中进行分析。onConnected是ClientCnxn下的具体方法,作用是初始化客户端的一些参数,为了最后触发WatchedEvent线程。

        void onConnected(
            int _negotiatedSessionTimeout,
            long _sessionId,
            byte[] _sessionPasswd,
            boolean isRO) throws IOException {
            ……
            ……
            if (!readOnly && isRO) {
                LOG.error("Read/write client got connected to read-only server");
            }
            // 初始化客户端的会话相关参数
            readTimeout = negotiatedSessionTimeout * 2 / 3;
            connectTimeout = negotiatedSessionTimeout / hostProvider.size();
            hostProvider.onConnected();
            sessionId = _sessionId;
            sessionPasswd = _sessionPasswd;
            changeZkState((isRO) ? States.CONNECTEDREADONLY : States.CONNECTED);
            seenRwServerBefore |= !isRO;
            LOG.info(
                "Session establishment complete on server {}, session id = 0x{}, negotiated timeout = {}{}",
                clientCnxnSocket.getRemoteSocketAddress(),
                Long.toHexString(sessionId),
                negotiatedSessionTimeout,
                (isRO ? " (READ-ONLY mode)" : ""));
            //触发一个SyncConnected事件,EventThread异步通知注册的watcher来处理
            KeeperState eventState = (isRO) ? KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;
            eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, eventState, null));
        }

(4)当 EventThread异步通知注册的watcher来处理时,此时会进入queueEvent中进行处理。

        private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) {
            if (event.getType() == EventType.None && sessionState == event.getState()) {
                return;
            }
            // EventThread同步会话状态
            sessionState = event.getState();
            final Set<Watcher> watchers;
            if (materializedWatchers == null) {
                // materialize the watchers based on the event
                // 找出那些需要被通知的watcher,主线程直接调用对应watcher接口
                watchers = watcher.materialize(event.getState(), event.getType(), event.getPath());
            } else {
                watchers = new HashSet<Watcher>();
                watchers.addAll(materializedWatchers);
            }
            WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);
            // queue the pair (watch set & event) for later processing
            // 提交异步队列处理
            waitingEvents.add(pair);
        }

(5) 此时EventThread线程则会执行run(),运行起来从waitingEvents队列中取出事件,从而调用processEvent处理事件。

        public void run() {
            try {
                // 正在运行
                isRunning = true;
                while (true) {
                    // 从等待处理事件队列中取出事件处理
                    Object event = waitingEvents.take();
                    // 如果是死亡事件
                    if (event == eventOfDeath) {
                        // 清除
                        wasKilled = true;
                    } else {// 否则,处理该事件
                        processEvent(event);
                    }
                    // 如果是死亡事件
                    if (wasKilled) {
                        // 同步处理
                        synchronized (waitingEvents) {
                            if (waitingEvents.isEmpty()) {
                                isRunning = false;
                                break;
                            }
                        }
                    }
                }
            } catch (InterruptedException e) {
                LOG.error("Event thread exiting due to interruption", e);
            }

(6) 显然,我们更加关心EventThread调用processEvent是如何处理事件。由于processEvent有几百行代码,这里则挑选出重要的来说明。此外,由于watcher不是本章内容的主要部分,这里不再进行详细阐述,如需了解,可自行前往该博客中查看:https://blog.csdn.net/par_ser?spm=1001.2014.3001.5343

private void processEvent(Object event) {
            try {
                if (event instanceof WatcherSetEventPair) {
                    // 每一个wathcer处理事件
                    WatcherSetEventPair pair = (WatcherSetEventPair) event;
                    for (Watcher watcher : pair.watchers) {
                        try {
                            watcher.process(pair.event);
                        } catch (Throwable t) {
                            LOG.error("Error while calling watcher ", t);
                        }
                    }
                    ……
                    ……
        }

总结

到这里客户端与服务端之间的网络通信流程已经分析完毕,其中客户端与服务端之间的会话已经建立成功,进而可以进行后续的业务处理。通过介绍ZooKeeper客户端与服务端之间的网络通信流程,同时详细讲解了客户端与服务端之间是如何交互数据的。虽然给定的场景为建立连接的场景,但是其他网络通信业务流程也是和这流程类似的。

最后

以上就是冷傲电脑为你收集整理的ZooKeeper源码分析之完整网络通信流程(三)2021SC@SDUSC前言客户端收到服务端的响应后的处理流程总结的全部内容,希望文章能够帮你解决ZooKeeper源码分析之完整网络通信流程(三)2021SC@SDUSC前言客户端收到服务端的响应后的处理流程总结所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(64)

评论列表共有 0 条评论

立即
投稿
返回
顶部