概述
Netty服务端代码
先上Server端的代码
public static void main(String[] args) throws Exception {
// 服务类
ServerBootstrap bootstrap = new ServerBootstrap();
// boss线程,主要监听端口和获取worker线程及分配socketChannel给worker线程
ExecutorService boss = Executors.newCachedThreadPool();
// worker线程负责数据读写
ExecutorService worker = Executors.newCachedThreadPool();
// 设置niosocket工厂
bootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker));
// 设置管道的工厂
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
// 管道过滤器
//pipeline.addLast("fixedLength", new FixedLengthFrameDecoder(12));
//pipeline.addLast("lineBased", new LineBasedFrameDecoder(1024));
pipeline.addLast("delimiterBased", new DelimiterBasedFrameDecoder(1024,
ChannelBuffers.copiedBuffer("#@#".getBytes())));
pipeline.addLast("1",new StringDecoder());
pipeline.addLast("2",new ServerMessageHandler());
return pipeline;
}
});
// 服务类绑定端口
bootstrap.bind(new InetSocketAddress(7777));
System.out.println("服务端启动...");
}
work 线程池干了什么
进入NioServerSocketChannelFactory()方法
public NioServerSocketChannelFactory(
Executor bossExecutor, Executor workerExecutor) {
this(bossExecutor, workerExecutor, getMaxThreads(workerExecutor));
}
public NioServerSocketChannelFactory(
Executor bossExecutor, Executor workerExecutor,
int workerCount) {
this(bossExecutor, 1, workerExecutor, workerCount);
}
public NioServerSocketChannelFactory(
Executor bossExecutor, int bossCount, Executor workerExecutor,
int workerCount) {
this(bossExecutor, bossCount, new NioWorkerPool(workerExecutor, workerCount));
}
这边的workerCount是由getMaxThreads 方法获得的,这个方法里面是获取线程池的最大线程数,并将之与 “cpu核心数*2”比较,取两者中最小的值返回。Boss线程池数量只给了1.
我们先看NioWorkerPool方法
public NioWorkerPool(Executor workerExecutor, int workerCount) {
this(workerExecutor, workerCount, null);
}
public NioWorkerPool(Executor workerExecutor, int workerCount, ThreadNameDeterminer determiner) {
super(workerExecutor, workerCount, false);
this.determiner = determiner;
init();
}
其他没什么看头主要看super方法
AbstractNioWorkerPool(Executor workerExecutor, int workerCount, boolean autoInit) {
if (workerExecutor == null) {
throw new NullPointerException("workerExecutor");
}
if (workerCount <= 0) {
throw new IllegalArgumentException(
"workerCount (" + workerCount + ") " + "must be a positive integer.");
}
workers = new AbstractNioWorker[workerCount];
this.workerExecutor = workerExecutor;
if (autoInit) {
init();
}
}
protected void init() {
if (!initialized.compareAndSet(false, true)) {
throw new IllegalStateException("initialized already");
}
for (int i = 0; i < workers.length; i++) {
workers[i] = newWorker(workerExecutor);
}
waitForWorkerThreads();
}
i是work线程池的数量,在上面我们说了是cup核心数乘以2 。for 循环的意思是,新建workers数组,数组的大小和workerExecutor线程池个数一样的,数组类型是AbstractNioWorker。crtl+t 发现调用的是子类NioWorkerPool的newWorker方法。
一、开启Selector
我们一直往下调,来到一个比较重要的方法
AbstractNioSelector(Executor executor, ThreadNameDeterminer determiner) {
this.executor = executor;
//开启selector
openSelector(determiner);
}
private void openSelector(ThreadNameDeterminer determiner) {
try {
//真正的开启
selector = SelectorUtil.open();
} catch (Throwable t) {
throw new ChannelException("Failed to create a selector.", t);
}
// Start the worker thread with the new Selector.
boolean success = false;
try {
//determiner暂时为null
DeadLockProofWorker.start(executor, newThreadRenamingRunnable(id, determiner));
success = true;
} finally {
if (!success) {
// Release the Selector if the execution fails.
try {
selector.close();
} catch (Throwable t) {
logger.warn("Failed to close a selector.", t);
}
selector = null;
// The method will return to the caller at this point.
}
}
assert selector != null && selector.isOpen();
}
这个id就是第几个线程
我们先看newThreadRenamingRunnable方法,这个方法调用的是子类AbstractNioWorker的方法
@Override
protected ThreadRenamingRunnable newThreadRenamingRunnable(int id, ThreadNameDeterminer determiner) {
// 将线程重命名了【1】
return new ThreadRenamingRunnable(this, "New I/O worker #" + id, determiner);
}
此方法中new 了一个ThreadRenamingRunnable对象,并且将this id determiner 参数传入(构造方法主要为了赋值), 这边的this 就是AbstractNioWorker类。
我们再进入 DeadLockProofWorker.start(executor, newThreadRenamingRunnable(id, determiner));方法
public static void start(final Executor parent, final Runnable runnable) {
if (parent == null) {
throw new NullPointerException("parent");
}
if (runnable == null) {
throw new NullPointerException("runnable");
}
parent.execute(new Runnable() {
public void run() {
//这个方法进去看 是ThreadLocal里面放了个map ,将work线程池放入ThreadLocal中
PARENT.set(parent);
try {
runnable.run();
} finally {
PARENT.remove();
}
}
});
}
第一个参数是work线程池,第二个参数实际上是ThreadRenamingRunnable类的对象这个类实现了runnable 接口那么runnable.run 就是调用的ThreadRenamingRunnable的run方法。在上面我们没有去看ThreadRenamingRunnable类的构造方法。我们放到下面去看。
private static volatile ThreadNameDeterminer threadNameDeterminer =
ThreadNameDeterminer.PROPOSED;
public ThreadRenamingRunnable(Runnable runnable, String proposedThreadName, ThreadNameDeterminer determiner) {
if (runnable == null) {
throw new NullPointerException("runnable");
}
if (proposedThreadName == null) {
throw new NullPointerException("proposedThreadName");
}
this.runnable = runnable;
this.determiner = determiner;
this.proposedThreadName = proposedThreadName;
}
public void run() {
final Thread currentThread = Thread.currentThread();
final String oldThreadName = currentThread.getName();
final String newThreadName = getNewThreadName(oldThreadName);
// Change the thread name before starting the actual runnable.
boolean renamed = false;
if (!oldThreadName.equals(newThreadName)) {
try {
currentThread.setName(newThreadName);
renamed = true;
} catch (SecurityException e) {
logger.debug(
"Failed to rename a thread " +
"due to security restriction.", e);
}
}
// Run the actual runnable and revert the name back when it ends.
try {
runnable.run();
} finally {
if (renamed) {
// Revert the name back if the current thread was renamed.
// We do not check the exception here because we know it works.
currentThread.setName(oldThreadName);
}
}
}
“New I/O worker #” + id对应的是 proposedThreadName变量。还要注意此类的threadNameDeterminer 对应的是ThreadNameDeterminer.PROPOSED,正因为如此getNewThreadName(oldThreadName)方法调用的结果是proposedThreadName,即"New I/O worker #" + id
ThreadNameDeterminer PROPOSED = new ThreadNameDeterminer() {
public String determineThreadName(String currentThreadName,
String proposedThreadName) throws Exception {
return proposedThreadName;
}
};
再看上面的run 方法, 这个方法里面也有一个runnable.run()方法。
这个runnable 是调用构造函数的时候传进来赋值的,那么我们往前推,何时构造的呢?
注意看【1】处的代码就是在此处new出来的。runnable对应的第一个参数this,那么this就是代表的AbstractNioWorker这个类, 这个run方法里面是super.run(),即调用的父类的run方法。这个run方法里面是最重要的逻辑了。
public void run() {
thread = Thread.currentThread();
//这个我去找await阻塞处,找到了AbstractNioBossPool类的waitForBossThreads方法使用了,并且设置了超时异常
startupLatch.countDown();
int selectReturnsImmediately = 0;
Selector selector = this.selector;
if (selector == null) {
return;
}
// use 80% of the timeout for measure
final long minSelectTimeout = SelectorUtil.SELECT_TIMEOUT_NANOS * 80 / 100;
boolean wakenupFromLoop = false;
//死循环
for (;;) {
//先设置成false,当事件类型是INTEREST_OPS时调用setInterestOps 方法时会改变wakenUpset状态(具体什么时候有空debug),
wakenUp.set(false);
try {
long beforeSelect = System.nanoTime();
int selected = select(selector);
if (selected == 0 && !wakenupFromLoop && !wakenUp.get()) {
long timeBlocked = System.nanoTime() - beforeSelect;
if (timeBlocked < minSelectTimeout) {
boolean notConnected = false;
// loop over all keys as the selector may was unblocked because of a closed channel
for (SelectionKey key: selector.keys()) {
SelectableChannel ch = key.channel();
try {
if (ch instanceof DatagramChannel && !ch.isOpen() ||
ch instanceof SocketChannel && !((SocketChannel) ch).isConnected() &&
// Only cancel if the connection is not pending
// See https://github.com/netty/netty/issues/2931
!((SocketChannel) ch).isConnectionPending()) {
notConnected = true;
// cancel the key just to be on the safe side
key.cancel();
}
} catch (CancelledKeyException e) {
// ignore
}
}
if (notConnected) {
selectReturnsImmediately = 0;
} else {
if (Thread.interrupted() && !shutdown) {
// Thread was interrupted but NioSelector was not shutdown.
// As this is most likely a bug in the handler of the user or it's client
// library we will log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because the I/O thread " +
"has been interrupted. Use shutdown() to shut the NioSelector down.");
}
selectReturnsImmediately = 0;
} else {
// Returned before the minSelectTimeout elapsed with nothing selected.
// This may be because of a bug in JDK NIO Selector provider, so increment the counter
// which we will use later to see if it's really the bug in JDK.
selectReturnsImmediately ++;
}
}
} else {
selectReturnsImmediately = 0;
}
} else {
selectReturnsImmediately = 0;
}
if (SelectorUtil.EPOLL_BUG_WORKAROUND) {
if (selectReturnsImmediately == 1024) {
// The selector returned immediately for 10 times in a row,
// so recreate one selector as it seems like we hit the
// famous epoll(..) jdk bug.
rebuildSelector();
selector = this.selector;
selectReturnsImmediately = 0;
wakenupFromLoop = false;
// try to select again
continue;
}
} else {
// reset counter
selectReturnsImmediately = 0;
}
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp.get()) {
wakenupFromLoop = true;
//唤醒
selector.wakeup();
} else {
wakenupFromLoop = false;
}
cancelledKeys = 0;
//执行队列中的业务
processTaskQueue();
selector = this.selector; // processTaskQueue() can call rebuildSelector()
if (shutdown) {
this.selector = null;
// process one time again
processTaskQueue();
for (SelectionKey k: selector.keys()) {
close(k);
}
try {
selector.close();
} catch (IOException e) {
logger.warn(
"Failed to close a selector.", e);
}
shutdownLatch.countDown();
break;
} else {
//Bootstrap没有释放自己的资源就执行自己的业务 即读写
process(selector);
}
} catch (Throwable t) {
logger.warn(
"Unexpected exception in the selector loop.", t);
// Prevent possible consecutive immediate failures that lead to
// excessive CPU consumption.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore.
}
}
}
}
最重要的是那个队列和自己的读写业务,时间太晚,先写个大概发一版。后面还会写protocolBuffer协议,以及自己定义包头规则来传输等,并试着看源码。
先说说怎么将事件注册到队列中的呢?
我是crtl+shift+g 一直往前反推找到的。可是正推可能比较好看。我们就正推吧。
//当调用此处,会进入ClientBootstrap的connection方法
bootstrap.connect(new InetSocketAddress("127.0.0.1", 7777)).sync();
public ChannelFuture connect(SocketAddress remoteAddress) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
SocketAddress localAddress = (SocketAddress) getOption("localAddress");
//连接本地和远程地址
return connect(remoteAddress, localAddress);
}
public ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
ChannelPipeline pipeline;
try {
//这边是新建一个管道, 这个方法调用的Bootstrap 里面的方法。
//而Bootstrap 里面的管道是main 函数里面传过来的,具体可以看一下main 函数
pipeline = getPipelineFactory().getPipeline();
} catch (Exception e) {
throw new ChannelPipelineException("Failed to initialize a pipeline.", e);
}
// Set the options.
Channel ch = getFactory().newChannel(pipeline);
boolean success = false;
try {
ch.getConfig().setOptions(getOptions());
success = true;
} finally {
if (!success) {
ch.close();
}
}
// Bind.
if (localAddress != null) {
ch.bind(localAddress);
}
// Connect.
return ch.connect(remoteAddress);
}
//这个channel 连接远程地址
public static ChannelFuture connect(Channel channel, SocketAddress remoteAddress) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
ChannelFuture future = future(channel, true);
//注册的是connected 状态
channel.getPipeline().sendDownstream(new DownstreamChannelStateEvent(
channel, future, ChannelState.CONNECTED, remoteAddress));
return future;
}
public void sendDownstream(ChannelEvent e) {
DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);
if (tail == null) {
try {
//开始下发事件
getSink().eventSunk(this, e);
return;
} catch (Throwable t) {
notifyHandlerException(e, t);
return;
}
}
sendDownstream(tail, e);
}
//进入eventSunk方法因为状态是connected 所以走下面的方法
case CONNECTED:
if (value != null) {
connect(channel, future, (SocketAddress) value);
} else {
channel.worker.close(channel, future);
}
break;
private void connect(
final NioClientSocketChannel channel, final ChannelFuture cf,
SocketAddress remoteAddress) {
channel.requestedRemoteAddress = remoteAddress;
try {
if (channel.channel.connect(remoteAddress)) {
//把这个channel 注册到work上面去
channel.worker.register(channel, cf);
} else {
channel.getCloseFuture().addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture f)
throws Exception {
if (!cf.isDone()) {
cf.setFailure(new ClosedChannelException());
}
}
});
cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
channel.connectFuture = cf;
nextBoss().register(channel, cf);
}
} catch (Throwable t) {
if (t instanceof ConnectException) {
Throwable newT = new ConnectException(t.getMessage() + ": " + remoteAddress);
newT.setStackTrace(t.getStackTrace());
t = newT;
}
cf.setFailure(t);
fireExceptionCaught(channel, t);
channel.worker.close(channel, succeededFuture(channel));
}
}
public void register(Channel channel, ChannelFuture future) {
// 根据channe 和future 创建一个线程
Runnable task = createRegisterTask(channel, future);
registerTask(task);
}
protected final void registerTask(Runnable task) {
// 将创建好的线程注册到队列上
taskQueue.add(task);
Selector selector = this.selector;
if (selector != null) {
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
} else {
if (taskQueue.remove(task)) {
// the selector was null this means the Worker has already been shutdown.
throw new RejectedExecutionException("Worker has already been shutdown");
}
}
}
这个线程内部是实现的什么呢?
@Override
protected Runnable createRegisterTask(Channel channel, ChannelFuture future) {
boolean server = !(channel instanceof NioClientSocketChannel);
// 这个是调用的内部类
return new RegisterTask((NioSocketChannel) channel, future, server);
}
private final class RegisterTask implements Runnable {
private final NioSocketChannel channel;
private final ChannelFuture future;
private final boolean server;
RegisterTask(
NioSocketChannel channel, ChannelFuture future, boolean server) {
this.channel = channel;
this.future = future;
this.server = server;
}
public void run() {
SocketAddress localAddress = channel.getLocalAddress();
SocketAddress remoteAddress = channel.getRemoteAddress();
if (localAddress == null || remoteAddress == null) {
if (future != null) {
future.setFailure(new ClosedChannelException());
}
close(channel, succeededFuture(channel));
return;
}
try {
if (server) {
channel.channel.configureBlocking(false);
}
//getInternalInterestOps() 这个进去看了一下就是op_read
channel.channel.register(
selector, channel.getInternalInterestOps(), channel);
if (future != null) {
channel.setConnected();
future.setSuccess();
}
if (server || !((NioClientSocketChannel) channel).boundManually) {
fireChannelBound(channel, localAddress);
}
fireChannelConnected(channel, remoteAddress);
} catch (IOException e) {
if (future != null) {
future.setFailure(e);
}
close(channel, succeededFuture(channel));
if (!(e instanceof ClosedChannelException)) {
throw new ChannelException(
"Failed to register a socket to the selector.", e);
}
}
}
}
最后
以上就是勤恳蜡烛为你收集整理的Netty之NioServerSocketChannelFactory的全部内容,希望文章能够帮你解决Netty之NioServerSocketChannelFactory所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复