概述
一、客户端源码
1、总体流程
1.1、zkClient客户端流程如下图所示
1.2、zkCli.sh配置代码如下:
# use POSTIX interface, symlink is followed automatically
ZOOBIN="${BASH_SOURCE-$0}"
ZOOBIN="$(dirname "${ZOOBIN}")"
ZOOBINDIR="$(cd "${ZOOBIN}"; pwd)"
if [ -e "$ZOOBIN/../libexec/zkEnv.sh" ]; then
. "$ZOOBINDIR"/../libexec/zkEnv.sh
else
. "$ZOOBINDIR"/zkEnv.sh
fi
"$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}"
-cp "$CLASSPATH" $CLIENT_JVMFLAGS $JVMFLAGS
org.apache.zookeeper.ZooKeeperMain "$@"
启动客户端 zkCli.sh文件里面的配置,实际运行代码如下:
public static void main(String[] args) throws KeeperException, IOException, InterruptedException {
ZooKeeperMain main = new ZooKeeperMain(args);
main.run();
}
1.3、Main方法流程:
new ZooKeeperMain 对象,调用run()方法,在ZookeeperMain的构造方法里面,重点是:
public ZooKeeperMain(String[] args) throws IOException, InterruptedException {
this.cl.parseOptions(args);
System.out.println("Connecting to " + this.cl.getOption("server"));
this.connectToZK(this.cl.getOption("server"));
}
protected void connectToZK(String newHost) throws InterruptedException, IOException {
if (this.zk != null && this.zk.getState().isAlive()) {
this.zk.close();
}
this.host = newHost;
boolean readOnly = this.cl.getOption("readonly") != null;
this.zk = new ZooKeeper(this.host, Integer.parseInt(this.cl.getOption("timeout")),
new ZooKeeperMain.MyWatcher(), readOnly);
}
public void start() {
this.sendThread.start();
this.eventThread.start();
}
最终在connectToZK方法里面也就是使用原生的Zk客户端进行连接的。
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) 、 throws IOException {
this.watchManager = new ZooKeeper.ZKWatchManager();
LOG.info("Initiating client connection, connectString=" + connectString + "sessionTimeout="+ sessionTimeout + " watcher=" + watcher);
this.watchManager.defaultWatcher = watcher;
ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
HostProvider hostProvider = new StaticHostProvider(connectStringParser.getServerAddresses());
this.cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout,
this, this.watchManager, getClientCnxnSocket(), canBeReadOnly); //获得和服务端连接的对象
this.cnxn.start();
}
ClientCnxn.ClientCnxn():
public ClientCnxn(String hosts, int sessionTimeout, ZooKeeper zooKeeper,ClientWatchManager watcher,
long sessionId, byte[] sessionPasswd) throws IOException{
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId;
this.sessionPasswd = sessionPasswd;
// parse out chroot, if any
int off = hosts.indexOf('/');
if (off >= 0) {
String chrootPath = hosts.substring(off);
// ignore "/" chroot spec, same as null
if (chrootPath.length() == 1) {
this.chrootPath = null;
} else {
PathUtils.validatePath(chrootPath);
this.chrootPath = chrootPath;
}
hosts = hosts.substring(0, off);
} else {
this.chrootPath = null;
}
String hostsList[] = hosts.split(",");
for (String host : hostsList) {
int port = 2181;
String parts[] = host.split(":");
if (parts.length > 1) {
port = Integer.parseInt(parts[1]);
host = parts[0];
}
InetAddress addrs[] = InetAddress.getAllByName(host);
for (InetAddress addr : addrs) {
serverAddrs.add(new InetSocketAddress(addr, port));
}
}
this.sessionTimeout = sessionTimeout;
connectTimeout = sessionTimeout / hostsList.length;
readTimeout = sessionTimeout * 2 / 3;
Collections.shuffle(serverAddrs);
sendThread = new SendThread();
eventThread = new EventThread();
}
mian方法流程图如下所示:
第三步第四步执行如下:
@SuppressWarnings("unchecked")
void run() throws KeeperException, IOException, InterruptedException {
if (cl.getCommand() == null) {
System.out.println("Welcome to ZooKeeper!");
boolean jlinemissing = false;
// only use jline if it's in the classpath
try {
Class consoleC = Class.forName("jline.ConsoleReader");
Class completorC =Class.forName("org.apache.zookeeper.JLineZNodeCompletor");
System.out.println("JLine support is enabled");
Object console = consoleC.getConstructor().newInstance();
Object completor =completorC.getConstructor(ZooKeeper.class).newInstance(zk);
Method addCompletor = consoleC.getMethod("addCompletor", Class.forName("jline.Completor"));
addCompletor.invoke(console, completor);
String line;
Method readLine = consoleC.getMethod("readLine", String.class);
// 循环读取命令并执行
while ((line = (String)readLine.invoke(console, getPrompt())) != null) {
executeLine(line);
}
}
}
public void executeLine(String line)throws InterruptedException, IOException, KeeperException {
if (!line.equals("")) {
cl.parseCommand(line);
addToHistory(commandCount,line);
processCmd(cl); // 第5步:执行命令
commandCount++;
}
}
第6步:处理命令
protected boolean processZKCmd(MyCommandOptions co) throws KeeperException
IOException, InterruptedException{
Stat stat = new Stat();
String[] args = co.getArgArray();
String cmd = co.getCommand();
if (args.length < 1) {
usage();
return false;
}
if (!commandMap.containsKey(cmd)) {
usage();
return false;
}
boolean watch = args.length > 2;
String path = null;
List<ACL> acl = Ids.OPEN_ACL_UNSAFE;
LOG.debug("Processing " + cmd);
if (cmd.equals("quit")) {
System.out.println("Quitting...");
zk.close();
System.exit(0);
} else if (cmd.equals("history")) {
.........
// 创建节点命令
if (cmd.equals("create") && args.length >= 3) {
int first = 0;
CreateMode flags = CreateMode.PERSISTENT; // 持久节点
if ((args[1].equals("-e") && args[2].equals("-s"))
|| (args[1]).equals("-s") && (args[2].equals("-e"))) {
first+=2;
flags = CreateMode.EPHEMERAL_SEQUENTIAL; 临时顺序节点
} else if (args[1].equals("-e")) {
first++;
flags = CreateMode.EPHEMERAL; // 临时节点
} else if (args[1].equals("-s")) {
first++;
flags = CreateMode.PERSISTENT_SEQUENTIAL; // 持久顺序节点
}
if (args.length == first + 4) {
acl = parseACLs(args[first+3]);
}
path = args[first + 1];
// 第7步:解析create命令
String newPath = zk.create(path, args[first+2].getBytes(), acl,flags);
System.err.println("Created " + newPath);
} else if (cmd.equals("delete") && args.length >= 2) {
path = args[1];
zk.delete(path, watch ? Integer.parseInt(args[2]) : -1);
}
// 第8,9步:解析create命令参数
// 第8,9步:解析create命令参数
public String create(final String path, byte data[], List<ACL> acl, CreateMode createMode)
throws KeeperException, InterruptedException {
final String clientPath = path;
PathUtils.validatePath(clientPath, createMode.isSequential());
final String serverPath = prependChroot(clientPath);
// 第10步:
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.create);
CreateRequest request = new CreateRequest();
CreateResponse response = new CreateResponse();
request.setData(data);
request.setFlags(createMode.toFlag());
request.setPath(serverPath);
if (acl != null && acl.size() == 0) {
throw new KeeperException.InvalidACLException();
}
request.setAcl(acl);
// 第11步:提交命令
ReplyHeader r = cnxn.submitRequest(h, request, response, null);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),clientPath);
}
if (cnxn.chrootPath == null) {
return response.getPath();
} else {
return response.getPath().substring(cnxn.chrootPath.length());
}
}
将request打包成packet:
public ReplyHeader submitRequest(RequestHeader h, Record request,Record response,
WatchRegistration watchRegistration) throws InterruptedException {
ReplyHeader r = new ReplyHeader();
Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration);
// 第13步:同步等待结果
synchronized (packet) {
while (!packet.finished) {
packet.wait();
}
}
return r;
}
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,Record response, AsyncCallback cb,
String clientPath,String serverPath, Object ctx, WatchRegistration watchRegistration){
Packet packet = null;
synchronized (outgoingQueue) {
if (h.getType() != OpCode.ping && h.getType() != OpCode.auth) {
h.setXid(getXid());
}
packet = new Packet(h, r, request, response, null, watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
if (!zooKeeper.state.isAlive()) {
conLossPacket(packet);
} else {
//第12步 将request包装成packet并加入outgoingQueue队列中
outgoingQueue.add(packet);
}
}
synchronized (sendThread) {
selector.wakeup();
}
return packet;
}
2、开启SendThread线程
SendThread流程图如下:
核心run方法代码org.apache.zookeeper.ClientCnxn.SendThread#run如下:
public void run() {
this.clientCnxnSocket.introduce(this, ClientCnxn.this.sessionId);
this.clientCnxnSocket.updateNow();
this.clientCnxnSocket.updateLastSendAndHeard();
long lastPingRwServer = Time.currentElapsedTime();
int MAX_SEND_PING_INTERVAL = true;
InetSocketAddress serverAddress = null;
while(ClientCnxn.this.state.isAlive()) {
try {
if (!this.clientCnxnSocket.isConnected()) {
if (!this.isFirstConnect) {
try {
Thread.sleep((long)this.r.nextInt(1000));
} catch (InterruptedException var10) {
ClientCnxn.LOG.warn("Unexpected exception", var10);
}
}
if (ClientCnxn.this.closing || !ClientCnxn.this.state.isAlive()) {
break;
}
if (this.rwServerAddress != null) {
serverAddress = this.rwServerAddress;
this.rwServerAddress = null;
} else {
serverAddress = ClientCnxn.this.hostProvider.next(1000L);
}
第2步:开始连接
this.startConnect(serverAddress);
this.clientCnxnSocket.updateLastSendAndHeard();
}
int to;
if (ClientCnxn.this.state.isConnected()) {
if (ClientCnxn.this.zooKeeperSaslClient != null) {
boolean sendAuthEvent = false;
if (ClientCnxn.this.zooKeeperSaslClient.getSaslState() == SaslState.INITIAL) {
try {
ClientCnxn.this.zooKeeperSaslClient.initialize(ClientCnxn.this);
} catch (SaslException var9) {
ClientCnxn.LOG.error("SASL authentication with Zookeeper Quorum member failed: " + var9);
ClientCnxn.this.state = States.AUTH_FAILED;
sendAuthEvent = true;
}
}
KeeperState authState = ClientCnxn.this.zooKeeperSaslClient.getKeeperState();
if (authState != null) {
if (authState == KeeperState.AuthFailed) {
ClientCnxn.this.state = States.AUTH_FAILED;
sendAuthEvent = true;
} else if (authState == KeeperState.SaslAuthenticated) {
sendAuthEvent = true;
}
}
if (sendAuthEvent) {
ClientCnxn.this.eventThread.queueEvent(new WatchedEvent(EventType.None,
authState, (String)null));
}
}
to = ClientCnxn.this.readTimeout - this.clientCnxnSocket.getIdleRecv();
} else {
to = ClientCnxn.this.connectTimeout - this.clientCnxnSocket.getIdleRecv();
}
if (to <= 0) {
String warnInfo = "Client session timed out, have not heard from server in "
+ this.clientCnxnSocket.getIdleRecv() + "ms for sessionid 0x"
+ Long.toHexString(ClientCnxn.this.sessionId);
ClientCnxn.LOG.warn(warnInfo);
throw new ClientCnxn.SessionTimeoutException(warnInfo);
}
if (ClientCnxn.this.state.isConnected()) {
int timeToNextPing = ClientCnxn.this.readTimeout / 2 - this.clientCnxnSocket.getIdleSend()
- (this.clientCnxnSocket.getIdleSend() > 1000 ? 1000 : 0);
if (timeToNextPing > 0 && this.clientCnxnSocket.getIdleSend() <= 10000) {
if (timeToNextPing < to) {
to = timeToNextPing;
}
} else {
this.sendPing();
this.clientCnxnSocket.updateLastSend();
}
}
if (ClientCnxn.this.state == States.CONNECTEDREADONLY) {
long now = Time.currentElapsedTime();
int idlePingRwServer = (int)(now - lastPingRwServer);
if (idlePingRwServer >= this.pingRwTimeout) {
lastPingRwServer = now;
idlePingRwServer = 0;
this.pingRwTimeout = Math.min(2 * this.pingRwTimeout, 60000);
this.pingRwServer();
}
to = Math.min(to, this.pingRwTimeout - idlePingRwServer);
}
7、待发送队列、已发送队列
this.clientCnxnSocket.doTransport(to, ClientCnxn.this.pendingQueue, ClientCnxn.
this.outgoingQueue, ClientCnxn.this);
}
}
this.cleanup();
this.clientCnxnSocket.close();
if (ClientCnxn.this.state.isAlive()) {
ClientCnxn.this.eventThread.queueEvent(new WatchedEvent(EventType.None,
KeeperState.Disconnected, (String)null));
}
ZooTrace.logTraceMessage(ClientCnxn.LOG, ZooTrace.getTextTraceLevel(),
"SendThread exited loop for session: 0x" + Long.toHexString(ClientCnxn.this.getSessionId()));
}
private void startConnect(InetSocketAddress addr) throws IOException {
saslLoginFailed = false;
state = States.CONNECTING;
setName(getName().replaceAll("\(.*\)", "(" + addr.getHostName() + ":" + addr.getPort() + ")"));
if (ZooKeeperSaslClient.isEnabled()) {
try {
String principalUserName = System.getProperty(ZK_SASL_CLIENT_USERNAME, "zookeeper");
zooKeeperSaslClient =new ZooKeeperSaslClient(principalUserName+"/"+addr.getHostName());
} catch (LoginException e) {
LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server
without "+ "SASL authentication, if Zookeeper server allows it.");
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null));
saslLoginFailed = true;
}
}
logStartConnect(addr);
clientCnxnSocket.connect(addr); //3、使用NIO方式连接
}
@Override
void connect(InetSocketAddress addr) throws IOException {
SocketChannel sock = createSock();
try {
registerAndConnect(sock, addr); // 4、注册连接
}
initialized = false;
lenBuffer.clear();
incomingBuffer = lenBuffer;
}
void registerAndConnect(SocketChannel sock, InetSocketAddress addr) throws IOException {
sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
boolean immediateConnect = sock.connect(addr);
if (immediateConnect) {
sendThread.primeConnection(); // 4、注册连接
}
}
void primeConnection() throws IOException {
isFirstConnect = false;
long sessId = (seenRwServerBefore) ? sessionId : 0;
ConnectRequest conReq = new ConnectRequest(0, lastZxid,sessionTimeout, sessId, sessionPasswd);
synchronized (outgoingQueue) {
if (!disableAutoWatchReset) {......}
for (AuthData id : authInfo) {
outgoingQueue.addFirst(new Packet(new RequestHeader(-4,OpCode.auth), null,
new AuthPacket(0, id.scheme,id.data), null, null));
}
5、将ConnectRequest 分装到packet中并加入到队列
outgoingQueue.addFirst(new Packet(null, null, conReq,null, null, readOnly));
}
clientCnxnSocket.enableReadWriteOnly();
}
6、连接到zk后继续sendThread.run()方法
7、待发送队列、已发送队列(sendThread.run())
this.clientCnxnSocket.doTransport(to, ClientCnxn.this.pendingQueue, ClientCnxn.this.outgoingQueue, ClientCnxn.this);
@Override
void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
throws IOException, InterruptedException {
selector.select(waitTimeOut);
Set<SelectionKey> selected;
synchronized (this) { selected = selector.selectedKeys(); }
updateNow();
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
if (sc.finishConnect()) {
updateLastSendAndHeard();
sendThread.primeConnection();
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
doIO(pendingQueue, outgoingQueue, cnxn); //8、IO方法:包括发送和接受数据
}
}
if (sendThread.getZkState().isConnected()) {
synchronized(outgoingQueue) {
if (findSendablePacket(outgoingQueue,cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
enableWrite();
}
}
}
selected.clear();
}
3、开启EventThread
流程图如下:
EventThread内部类代码如下:zookeeper-3.4.12.jar!orgapachezookeeperClientCnxn.class
class EventThread extends ZooKeeperThread {
private final LinkedBlockingQueue<Object> waitingEvents = new LinkedBlockingQueue();
private volatile KeeperState sessionState;
private volatile boolean wasKilled;
private volatile boolean isRunning;
EventThread() {
super(ClientCnxn.makeThreadName("-EventThread"));
this.sessionState = KeeperState.Disconnected;
this.wasKilled = false;
this.isRunning = false;
this.setDaemon(true);
}
org.apache.zookeeper.ClientCnxn.EventThread.run方法:
public void run() {
try {
this.isRunning = true;
while(true) {
Object event = this.waitingEvents.take();
if (event == ClientCnxn.this.eventOfDeath) {
this.wasKilled = true;
} else {
this.processEvent(event);
}
if (this.wasKilled) {
synchronized(this.waitingEvents) {
if (this.waitingEvents.isEmpty()) {
this.isRunning = false;
break;
}
}
}
}
} catch (InterruptedException var5) {
ClientCnxn.LOG.error("Event thread exiting due to interruption", var5);
}
ClientCnxn.LOG.info("EventThread shut down for session: 0x{}", Long.toHexString(ClientCnxn.this.getSessionId()));
}
4、总结:
public void start() {
this.sendThread.start();
this.eventThread.start();
}
二、服务端源码(单机)
1、总体流程
服务端代码处理流程与客户端类似,具体处理流程如下图所示:
2、具体处理流程
服务入口位置:QuorumPeerMain.main(String[] args)
zookeeper-3.4.12.jar!orgapachezookeeperserverquorumQuorumPeerMain.class
Step 1:
QuorumPeerMain.main(String[] args) {
QuorumPeerMain main = new QuorumPeerMain();
main.initializeAndRun(args);
protected void initializeAndRun(String[] args) throws ConfigException, IOException {
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
config.parse(args[0]);
}
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();
if (args.length == 1 && config.servers.size() > 0) {
this.runFromConfig(config);
} else {
ZooKeeperServerMain.main(args);
}
}
step2:
public static void main(String[] args) {
ZooKeeperServerMain main = new ZooKeeperServerMain();
main.initializeAndRun(args);
this.runFromConfig(config);
Step3:准备ZooKeeperServer配置信息
public void runFromConfig(ServerConfig config) throws IOException {
LOG.info("Starting server");
FileTxnSnapLog txnLog = null;
try {
ZooKeeperServer zkServer = new ZooKeeperServer();
CountDownLatch shutdownLatch = new CountDownLatch(1);
zkServer.registerServerShutdownHandler(new ZooKeeperServerShutdownHandler(shutdownLatch));
txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File(config.dataDir));
zkServer.setTxnLogFactory(txnLog);
zkServer.setTickTime(config.tickTime);
zkServer.setMinSessionTimeout(config.minSessionTimeout);
zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
this.cnxnFactory = ServerCnxnFactory.createFactory();
this.cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns());
this.cnxnFactory.startup(zkServer);
shutdownLatch.await();
this.shutdown();
this.cnxnFactory.join();
if (zkServer.canShutdown()) {
zkServer.shutdown(true);
}
} catch (InterruptedException var8) {
LOG.warn("Server interrupted", var8);
} finally {
if (txnLog != null) {
txnLog.close();
}
}
}
Step5:创建NIOServerCnxnFactory并调用configure方法
public static ServerCnxnFactory createFactory() throws IOException {
String serverCnxnFactoryName = System.getProperty("zookeeper.serverCnxnFactory");
if (serverCnxnFactoryName == null) {
serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
}
try {
ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory)Class.forName(serverCnxnFactoryName).getDeclaredConstructor().newInstance();
LOG.info("Using {} as server connection factory", serverCnxnFactoryName);
return serverCnxnFactory;
} catch (Exception var3) {
IOException ioe = new IOException("Couldn't instantiate " + serverCnxnFactoryName);
ioe.initCause(var3);
throw ioe;
}
}
Step 5.1、创建ZooKeeperThread并且打开ServerSocketChannel
public void configure(InetSocketAddress addr, int maxcc) throws IOException {
this.configureSaslLogin();
this.thread = new ZooKeeperThread(this, "NIOServerCxn.Factory:" + addr);
this.thread.setDaemon(true);
this.maxClientCnxns = maxcc;
this.ss = ServerSocketChannel.open();
this.ss.socket().setReuseAddress(true);
LOG.info("binding to port " + addr);
this.ss.socket().bind(addr);
this.ss.configureBlocking(false);
this.ss.register(this.selector, 16);
}
Step 6、回到step3的runFromConfig(ServerConfig config)方法,启动ZooKeeperServer线程
this.cnxnFactory.startup(zkServer);
public void startup(ZooKeeperServer zks) throws IOException, InterruptedException {
this.start();
this.setZooKeeperServer(zks);
step7:加载快照日志文件到内存
zks.startdata();
zks.startup();
}
Step8:启动ZooKeeperServer初始化请求执行器
public synchronized void startup() {
if (this.sessionTracker == null) {
this.createSessionTracker();
}
this.startSessionTracker();
this.setupRequestProcessors(); // 初始化请求执行器
this.registerJMX();
this.setState(ZooKeeperServer.State.RUNNING);
this.notifyAll();
}
Step9:开启处理线程
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
((SyncRequestProcessor)syncProcessor).start(); // 开启同步处理线程
this.firstProcessor = new PrepRequestProcessor(this, syncProcessor);
((PrepRequestProcessor)this.firstProcessor).start(); // 开启request处理线程
}
Step10:从submittedRequests获取请求
PrepRequestProcessor.run() {
while(true) {
try {
Request request = (Request)this.submittedRequests.take();
long traceMask = 2L;
if (request.type == 11) {
traceMask = 8L;
}
if (Request.requestOfDeath != request) {
this.pRequest(request);
continue;
}
}
}
}
Step11:根据request类型响应处理
protected void pRequest(Request request) throws RequestProcessorException {
request.hdr = null;
request.txn = null;
try {
switch(request.type) {
case -11:
case -10:
this.pRequest2Txn(request.type, this.zks.getNextZxid(), request,(Record)null, true);
break;
case 1:
CreateRequest createRequest = new CreateRequest();
this.pRequest2Txn(request.type, this.zks.getNextZxid(), request,createRequest, true);
break;
case 2:
DeleteRequest deleteRequest = new DeleteRequest();
this.pRequest2Txn(request.type, this.zks.getNextZxid(), request,deleteRequest, true);
break;
.....
case 101:
this.zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
break;
case 5:
SetDataRequest setDataRequest = new SetDataRequest();
this.pRequest2Txn(request.type, this.zks.getNextZxid(), request,setDataRequest, true);
break;
case 7:
SetACLRequest setAclRequest = new SetACLRequest();
this.pRequest2Txn(request.type, this.zks.getNextZxid(), request,setAclRequest, true);
break;
case 13:
CheckVersionRequest checkRequest = new CheckVersionRequest();
this.pRequest2Txn(request.type, this.zks.getNextZxid(), request,checkRequest, true);
break;
case 14:
MultiTransactionRecord multiRequest = new MultiTransactionRecord();
try {
ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest);
} catch (IOException var21) {
request.hdr = new TxnHeader(request.sessionId, request.cxid, this.zks.getNextZxid(),Time.currentWallTime(), 14);
throw var21;
}
List<Txn> txns = new ArrayList();
long zxid = this.zks.getNextZxid();
KeeperException ke = null;
HashMap<String, ChangeRecord> pendingChanges = this.getPendingChanges(multiRequest);
int index = 0;
for(Iterator var14 = multiRequest.iterator(); var14.hasNext(); ++index) {
Op op = (Op)var14.next();
Record subrequest = op.toRequestRecord();
if (ke != null) {
request.hdr.setType(-1);
request.txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue());
} else {
try {
this.pRequest2Txn(op.getType(), zxid, request, subrequest,false);
} catch (KeeperException var20) {
......
}
}
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
request.txn.serialize(boa, "request");
ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
txns.add(new Txn(request.hdr.getType(), bb.array()));
}
request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,Time.currentWallTime(), request.type);
request.txn = new MultiTxn(txns);
break;
default:
LOG.warn("unknown type " + request.type);
}
} catch (KeeperException var22) {
......
}
request.zxid = this.zks.getZxid();
// 最后nextProcessor把请求添加到SyncRequestProcessor.queuedRequests.add(request);
this.nextProcessor.processRequest(request);
}
然后根据不同请求,调用对应的PrepRequestProcessor.pRequest2Txn()方法中的addChangeRecord()方法把记录放到outstandingChanges队列中。
this.addChangeRecord(new ChangeRecord(request.hdr.getZxid(), path2Delete, (StatPersisted)null, 0, (List)null));
void addChangeRecord(ChangeRecord c) {
synchronized(this.zks.outstandingChanges) {
this.zks.outstandingChanges.add(c);
this.zks.outstandingChangesForPath.put(c.path, c);
}
}
SyncRequestProcessor.processRequest(Request request)方法:
this.nextProcessor.processRequest(request);
public void processRequest(Request request) {
this.queuedRequests.add(request);
}
Step12:调用SyncRequestProcessor.run方法
Step13:从queuedRequests队列中获取请求
si = (Request)this.queuedRequests.take();
Step14:将请求写入日志和快照文件
if (si != null) {
if (this.zks.getZKDatabase().append(si)) {
++logCount;
if (logCount > snapCount / 2 + randRoll) {
setRandRoll(this.r.nextInt(snapCount / 2));
// 请求信息写入日志文件
this.zks.getZKDatabase().rollLog();
if (this.snapInProcess != null && this.snapInProcess.isAlive()) {
LOG.warn("Too busy to snap, skipping");
} else {
this.snapInProcess = new ZooKeeperThread("Snapshot Thread") {
public void run() {
try {
// 请求信息写入快照文件
SyncRequestProcessor.this.zks.takeSnapshot();
} catch (Exception var2) {
SyncRequestProcessor.LOG.warn("Unexpected exception", var2);
}
}
};
this.snapInProcess.start();
}
logCount = 0;
}
}
Step15:调用FinalRequestProcessor.processRequest(Request request)处理请求
Step16:调用ZooKeeperServer.processTxn()处理请求
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
int opCode = hdr.getType();
long sessionId = hdr.getClientId();
ProcessTxnResult rc = this.getZKDatabase().processTxn(hdr, txn);
if (opCode == -10) {
if (txn instanceof CreateSessionTxn) {
CreateSessionTxn cst = (CreateSessionTxn)txn;
this.sessionTracker.addSession(sessionId, cst.getTimeOut());
} else {
LOG.warn("*****>>>>> Got " + txn.getClass() + " " + txn.toString());
}
} else if (opCode == -11) {
this.sessionTracker.removeSession(sessionId);
}
return rc;
}
Step17:dataTree.processTxn(hdr, txn)处理请求
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
return this.dataTree.processTxn(hdr, txn);
}
Step18:根据不同的请求类型处理,更新内存并发布相应的watch事件
case 1:
CreateTxn createTxn = (CreateTxn)txn;
rc.path = createTxn.getPath();
this.createNode(createTxn.getPath(), createTxn.getData(), createTxn.getAcl(), createTxn.getEphemeral()
? header.getClientId() : 0L, createTxn.getParentCVersion(), header.getZxid(), header.getTime());
break;
case 2:
DeleteTxn deleteTxn = (DeleteTxn)txn;
rc.path = deleteTxn.getPath();
this.deleteNode(deleteTxn.getPath(), header.getZxid());
break;
case 5:
SetDataTxn setDataTxn = (SetDataTxn)txn;
rc.path = setDataTxn.getPath();
rc.stat = this.setData(setDataTxn.getPath(), setDataTxn.getData(), setDataTxn.getVersion(),
header.getZxid(), header.getTime());
break;
case 7:
SetACLTxn setACLTxn = (SetACLTxn)txn;
rc.path = setACLTxn.getPath();
rc.stat = this.setACL(setACLTxn.getPath(), setACLTxn.getAcl(), setACLTxn.getVersion());
break;
以create处理发布watch事件为例:
public String createNode(String path, byte[] data, List<ACL> acl, long ephemeralOwner, int parentCVersion,
long zxid, long time) throws NoNodeException, NodeExistsException {
int lastSlash = path.lastIndexOf(47);
String parentName = path.substring(0, lastSlash);
String childName = path.substring(lastSlash + 1);
StatPersisted stat = new StatPersisted();
stat.setCtime(time);
stat.setMtime(time);
stat.setCzxid(zxid);
stat.setMzxid(zxid);
stat.setPzxid(zxid);
stat.setVersion(0);
stat.setAversion(0);
stat.setEphemeralOwner(ephemeralOwner);
DataNode parent = (DataNode)this.nodes.get(parentName);
if (parent == null) {
throw new NoNodeException();
} else {
synchronized(parent) {
Set<String> children = parent.getChildren();
if (children.contains(childName)) {
throw new NodeExistsException();
}
if (parentCVersion == -1) {
parentCVersion = parent.stat.getCversion();
++parentCVersion;
}
parent.stat.setCversion(parentCVersion);
parent.stat.setPzxid(zxid);
Long longval = this.aclCache.convertAcls(acl);
DataNode child = new DataNode(parent, data, longval, stat);
parent.addChild(childName);
this.nodes.put(path, child);
if (ephemeralOwner != 0L) {
HashSet<String> list = (HashSet)this.ephemerals.get(ephemeralOwner);
if (list == null) {
list = new HashSet();
this.ephemerals.put(ephemeralOwner, list);
}
synchronized(list) {
list.add(path);
}
}
}
if (parentName.startsWith("/zookeeper/quota")) {
if ("zookeeper_limits".equals(childName)) {
this.pTrie.addPath(parentName.substring("/zookeeper/quota".length()));
}
if ("zookeeper_stats".equals(childName)) {
this.updateQuotaForPath(parentName.substring("/zookeeper/quota".length()));
}
}
String lastPrefix;
if ((lastPrefix = this.getMaxPrefixWithQuota(path)) != null) {
this.updateCount(lastPrefix, 1);
this.updateBytes(lastPrefix, data == null ? 0L : (long)data.length);
}
// 发布watch事件
this.dataWatches.triggerWatch(path, EventType.NodeCreated);
this.childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, EventType.NodeChildrenChanged);
return path;
}
}
Step19:继续FinalRequestProcessor.processRequest(Request request)方法创建响应的response
switch(request.type) {
case -11:
lastOp = "CLOS";
closeSession = true;
err = Code.get(rc.err);
break;
case -10:
this.zks.serverStats().updateLatency(request.createTime);
lastOp = "SESS";
cnxn.updateStatsForResponse((long)request.cxid, request.zxid, lastOp, request.createTime,
Time.currentElapsedTime());
this.zks.finishSessionInit(request.cnxn, true);
return;
case 1:
lastOp = "CREA";
rsp = new CreateResponse(rc.path);
err = Code.get(rc.err);
break;
......
((MultiResponse)rsp).add((OpResult)subResult);
public class MultiResponse implements Record, Iterable<OpResult> {
private List<OpResult> results = new ArrayList();
Step21:调用ServerCnxn.updateStatsForResponse修改状态,并调用sendResponse方法把最终结果返回给对应的客户端。
protected synchronized void updateStatsForResponse(long cxid, long zxid, String op, long start, long end) {
if (cxid >= 0L) {
this.lastCxid = cxid;
}
this.lastZxid = zxid;
this.lastOp = op;
this.lastResponseTime = end;
long elapsed = end - start;
this.lastLatency = elapsed;
if (elapsed < this.minLatency) {
this.minLatency = elapsed;
}
if (elapsed > this.maxLatency) {
this.maxLatency = elapsed;
}
++this.count;
this.totalLatency += elapsed;
}
sendResponse方法实现是在NIOServerCnxn.sendResponse(ReplyHeader h, Record r, String tag)中实现的
public synchronized void sendResponse(ReplyHeader h, Record r, String tag) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
try {
baos.write(fourBytes);
bos.writeRecord(h, "header");
if (r != null) {
bos.writeRecord(r, tag);
}
baos.close();
} catch (IOException var12) {
LOG.error("Error serializing response");
}
byte[] b = baos.toByteArray();
ByteBuffer bb = ByteBuffer.wrap(b);
bb.putInt(b.length - 4).rewind();
this.sendBuffer(bb);
if (h.getXid() > 0) {
synchronized(this) {
--this.outstandingRequests;
}
synchronized(this.factory) {
if (this.zkServer.getInProcess() < this.outstandingLimit || this.outstandingRequests < 1) {
this.sk.selector().wakeup();
this.enableRecv();
}
}
}
} catch (Exception var14) {
LOG.warn("Unexpected exception. Destruction averted.", var14);
}
}
最后
以上就是喜悦百褶裙为你收集整理的第四章 zk源码解读笔记的全部内容,希望文章能够帮你解决第四章 zk源码解读笔记所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复