我是靠谱客的博主 冷傲电脑,这篇文章主要介绍ZooKeeper源码分析之完整网络通信流程(三)2021SC@SDUSC前言客户端收到服务端的响应后的处理流程总结,现在分享给大家,希望可以做个参考。

文章目录

  • 2021SC@SDUSC
  • 前言
  • 客户端收到服务端的响应后的处理流程
  • 总结

2021SC@SDUSC

前言

上一章讲到服务端处理完请求,通过NIOServerCnxn的sendBuffer()方法写入客户端的缓冲区中,在此之前一步已经初始化了ConnectResponse响应对象,因此写完后会调用rsp.serialize(bos, “connect”),返回给客户端一个ConnectResponse响应对象。当客户端收到这一响应对象时,又该如何操作,且看下文。

客户端收到服务端的响应后的处理流程

在第一章中,讲述过客户端的sendThread线程发出请求中,处于等待服务端响应的状态。此时服务端发回响应,客户端的sendThread收到响应,此时需要进行读操作,读取缓冲区中的内容。 具体流程如下:

(1) 由于sendThread处于等待状态,此时当CPU调用该线程时,线程执行run()方法,其中调用了clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this),这与之前客户端发出请求一样,只是其中doTransport内的doIO方法调用了不同的操作语句,之前发出请求调用的是写入操作,而现在处理响应则是读取操作。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
void doIO(Queue<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException { SocketChannel sock = (SocketChannel) sockKey.channel(); if (sock == null) { throw new IOException("Socket is null!"); } // 如果可读 if (sockKey.isReadable()) { // 读取输入缓冲区的响应包的长度 int rc = sock.read(incomingBuffer); if (rc < 0) { throw new EndOfStreamException("Unable to read additional data from server sessionid 0x" + Long.toHexString(sessionId) + ", likely server has closed socket"); } // 如果输入缓冲区的响应包读取满,读取同一个包需要读取两次,一次为长度,一次为内容 if (!incomingBuffer.hasRemaining()) { incomingBuffer.flip(); // 如果读取的是长度,则给incomingBuffer分配包长度的空间 if (incomingBuffer == lenBuffer) { recvCount.getAndIncrement(); readLength(); } else if (!initialized) {// 如果还未初始化,即会话未建立,则服务端必须返回ConnectResponse // 读取ConnectRequest,将incomingBuffer的内容反序列化成ConnectResponse对象 readConnectResult(); // 继续读后续响应 enableRead(); // 如果还有写请求,确保write事件ok if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) { // Since SASL authentication has completed (if client is configured to do so), // outgoing packets waiting in the outgoingQueue can now be sent. enableWrite(); } // 准备读下一个响应 lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); // 会话建立完毕 initialized = true; } else { sendThread.readResponse(incomingBuffer); lenBuffer.clear(); incomingBuffer = lenBuffer; updateLastHeard(); } } } …… …… }

(2)显然此时给定的场景会话是未被初始化的,则调用readConnectResult(),因此需要分析该方法是如何将incomingBuffer的内容反序列化成ConnectResponse对象的。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void readConnectResult() throws IOException { if (LOG.isTraceEnabled()) { StringBuilder buf = new StringBuilder("0x["); for (byte b : incomingBuffer.array()) { buf.append(Integer.toHexString(b)).append(","); } buf.append("]"); if (LOG.isTraceEnabled()) { LOG.trace("readConnectResult {} {}", incomingBuffer.remaining(), buf.toString()); } } // 将incomingBuffer反序列化成ConnectResponse ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer); BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis); ConnectResponse conRsp = new ConnectResponse(); conRsp.deserialize(bbia, "connect"); // 仅读标志位 boolean isRO = false; try { isRO = bbia.readBool("readOnly"); } catch (IOException e) { // this is ok -- just a packet from an old server which // doesn't contain readOnly field LOG.warn("Connected to an old server; r-o mode will be unavailable"); } // 服务端返回的会话ID this.sessionId = conRsp.getSessionId(); // 后续处理,初始化客户端的一些参数,最后触发WatchedEvent线程 sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), isRO); }

(3) 反序列化后,则需要关心后续处理,此时则需要进入onConnected方法中进行分析。onConnected是ClientCnxn下的具体方法,作用是初始化客户端的一些参数,为了最后触发WatchedEvent线程。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void onConnected( int _negotiatedSessionTimeout, long _sessionId, byte[] _sessionPasswd, boolean isRO) throws IOException { …… …… if (!readOnly && isRO) { LOG.error("Read/write client got connected to read-only server"); } // 初始化客户端的会话相关参数 readTimeout = negotiatedSessionTimeout * 2 / 3; connectTimeout = negotiatedSessionTimeout / hostProvider.size(); hostProvider.onConnected(); sessionId = _sessionId; sessionPasswd = _sessionPasswd; changeZkState((isRO) ? States.CONNECTEDREADONLY : States.CONNECTED); seenRwServerBefore |= !isRO; LOG.info( "Session establishment complete on server {}, session id = 0x{}, negotiated timeout = {}{}", clientCnxnSocket.getRemoteSocketAddress(), Long.toHexString(sessionId), negotiatedSessionTimeout, (isRO ? " (READ-ONLY mode)" : "")); //触发一个SyncConnected事件,EventThread异步通知注册的watcher来处理 KeeperState eventState = (isRO) ? KeeperState.ConnectedReadOnly : KeeperState.SyncConnected; eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, eventState, null)); }

(4)当 EventThread异步通知注册的watcher来处理时,此时会进入queueEvent中进行处理。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) { if (event.getType() == EventType.None && sessionState == event.getState()) { return; } // EventThread同步会话状态 sessionState = event.getState(); final Set<Watcher> watchers; if (materializedWatchers == null) { // materialize the watchers based on the event // 找出那些需要被通知的watcher,主线程直接调用对应watcher接口 watchers = watcher.materialize(event.getState(), event.getType(), event.getPath()); } else { watchers = new HashSet<Watcher>(); watchers.addAll(materializedWatchers); } WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event); // queue the pair (watch set & event) for later processing // 提交异步队列处理 waitingEvents.add(pair); }

(5) 此时EventThread线程则会执行run(),运行起来从waitingEvents队列中取出事件,从而调用processEvent处理事件。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public void run() { try { // 正在运行 isRunning = true; while (true) { // 从等待处理事件队列中取出事件处理 Object event = waitingEvents.take(); // 如果是死亡事件 if (event == eventOfDeath) { // 清除 wasKilled = true; } else {// 否则,处理该事件 processEvent(event); } // 如果是死亡事件 if (wasKilled) { // 同步处理 synchronized (waitingEvents) { if (waitingEvents.isEmpty()) { isRunning = false; break; } } } } } catch (InterruptedException e) { LOG.error("Event thread exiting due to interruption", e); }

(6) 显然,我们更加关心EventThread调用processEvent是如何处理事件。由于processEvent有几百行代码,这里则挑选出重要的来说明。此外,由于watcher不是本章内容的主要部分,这里不再进行详细阐述,如需了解,可自行前往该博客中查看:https://blog.csdn.net/par_ser?spm=1001.2014.3001.5343

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void processEvent(Object event) { try { if (event instanceof WatcherSetEventPair) { // 每一个wathcer处理事件 WatcherSetEventPair pair = (WatcherSetEventPair) event; for (Watcher watcher : pair.watchers) { try { watcher.process(pair.event); } catch (Throwable t) { LOG.error("Error while calling watcher ", t); } } …… …… }

总结

到这里客户端与服务端之间的网络通信流程已经分析完毕,其中客户端与服务端之间的会话已经建立成功,进而可以进行后续的业务处理。通过介绍ZooKeeper客户端与服务端之间的网络通信流程,同时详细讲解了客户端与服务端之间是如何交互数据的。虽然给定的场景为建立连接的场景,但是其他网络通信业务流程也是和这流程类似的。

最后

以上就是冷傲电脑最近收集整理的关于ZooKeeper源码分析之完整网络通信流程(三)2021SC@SDUSC前言客户端收到服务端的响应后的处理流程总结的全部内容,更多相关ZooKeeper源码分析之完整网络通信流程(三)2021SC@SDUSC前言客户端收到服务端内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(59)

评论列表共有 0 条评论

立即
投稿
返回
顶部