我是靠谱客的博主 痴情帽子,这篇文章主要介绍Netty深入理解ServerBootstrap 与 Bootstrap ,现在分享给大家,希望可以做个参考。

 

  从Java1.4开始, Java引入了non-blocking IO,简称NIO。NIO与传统socket最大的不同就是引入了Channel和多路复用selector的概念。传统的socket是基于stream的,它是单向的,有InputStream表示read和OutputStream表示写。而Channel是双工的,既支持读也支持写,channel的读/写都是面向Buffer。 NIO中引入的多路复用Selector机制(如果是linux系统,则应用的epoll事件通知机制)可使一个线程同时监听多个Channel上发生的事件。 虽然Java NIO相比于以往确实是一个大的突破,但是如果要真正上手进行开发,且想要开发出好的一个服务端网络程序,那么你得要花费一点功夫了,毕竟Java NIO只是提供了一大堆的API而已,对于一般的软件开发人员来说只能呵呵了。因此,社区中就涌现了很多基于Java NIO的网络应用框架,其中以Apache的Mina,以及Netty最为出名,从本篇开始我们将深入的分析一下Netty的内部实现细节 。

  本系列是基于Netty4.1.18这个版本。

  在分析源码之前,我们还是先看看Netty官方的样例代码,了解一下Netty一般是如何进行服务端及客户端开发的。

  Netty服务端示例:

 

复制代码

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); // (2) b.group(bossGroup, workerGroup)  // (3) .channel(NioServerSocketChannel.class) // (4) .handler(new LoggingHandler()) // (5) .childHandler(new ChannelInitializer<SocketChannel>() { // (6) @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new DiscardServerHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) // (7) .childOption(ChannelOption.SO_KEEPALIVE, true); // (8) // Bind and start to accept incoming connections. ChannelFuture f = b.bind(port).sync(); // (9) // Wait until the server socket is closed. // In this example, this does not happen, but you can do that to gracefully // shut down your server. f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); }

复制代码

上面这段代码展示了服务端的一个基本步骤:

复制代码
1
2
3
4
5
6
7
8
9
10
(1)、 初始化用于Acceptor的主"线程池"以及用于I/O工作的从"线程池"; (2)、 初始化ServerBootstrap实例, 此实例是netty服务端应用开发的入口,也是本篇介绍的重点, 下面我们会深入分析; (3)、 通过ServerBootstrap的group方法,设置(1)中初始化的主从"线程池"; (4)、 指定通道channel的类型,由于是服务端,故而是NioServerSocketChannel; (5)、 设置ServerSocketChannel的处理器(此处不详述,后面的系列会进行深入分析) (6)、 设置子通道也就是SocketChannel的处理器, 其内部是实际业务开发的"主战场"(此处不详述,后面的系列会进行深入分析) (7)、 配置ServerSocketChannel的选项 (8)、 配置子通道也就是SocketChannel的选项 (9)、 绑定并侦听某个端口 接着,我们再看看客户端是如何开发的:

Netty客户端示例:

复制代码

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class TimeClient { public static void main(String[] args) throws Exception { String host = args[0]; int port = Integer.parseInt(args[1]); EventLoopGroup workerGroup = new NioEventLoopGroup(); // (1) try { Bootstrap b = new Bootstrap(); // (2) b.group(workerGroup); // (3) b.channel(NioSocketChannel.class); // (4) b.option(ChannelOption.SO_KEEPALIVE, true); // (5) b.handler(new ChannelInitializer<SocketChannel>() { // (6) @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeClientHandler()); } }); // Start the client. ChannelFuture f = b.connect(host, port).sync(); // (7) // Wait until the connection is closed. f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); } } }

复制代码

 客户端的开发步骤和服务端都差不多:

复制代码
1
2
3
4
5
6
7
8
9
(1)、 初始化用于连接及I/O工作的"线程池"; (2)、 初始化Bootstrap实例, 此实例是netty客户端应用开发的入口,也是本篇介绍的重点, 下面我们会深入分析; (3)、 通过Bootstrap的group方法,设置(1)中初始化的"线程池"; (4)、 指定通道channel的类型,由于是客户端,故而是NioSocketChannel; (5)、 设置SocketChannel的选项(此处不详述,后面的系列会进行深入分析); (6)、 设置SocketChannel的处理器, 其内部是实际业务开发的"主战场"(此处不详述,后面的系列会进行深入分析); (7)、 连接指定的服务地址; 通过对上面服务端及客户端代码分析,Bootstrap是Netty应用开发的入口,如果想要理解Netty内部的实现细节,那么有必要先了解一下Bootstrap内部的实现机制。 首先我们先看一下ServerBootstrap及Bootstrap的类继承结构图:

通过类图我们知道AbstractBootstrap类是ServerBootstrap及Bootstrap的基类,我们先看一下AbstractBootstrap类的主要代码:

复制代码

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable { volatile EventLoopGroup group; private volatile ChannelFactory<? extends C> channelFactory; private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>(); private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>(); private volatile ChannelHandler handler; public B group(EventLoopGroup group) { if (group == null) { throw new NullPointerException("group"); } if (this.group != null) { throw new IllegalStateException("group set already"); } this.group = group; return self(); } private B self() { return (B) this; } public B channel(Class<? extends C> channelClass) { if (channelClass == null) { throw new NullPointerException("channelClass"); } return channelFactory(new ReflectiveChannelFactory<C>(channelClass)); } @Deprecated public B channelFactory(ChannelFactory<? extends C> channelFactory) { if (channelFactory == null) { throw new NullPointerException("channelFactory"); } if (this.channelFactory != null) { throw new IllegalStateException("channelFactory set already"); } this.channelFactory = channelFactory; return self(); } public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) { return channelFactory((ChannelFactory<C>) channelFactory); } public <T> B option(ChannelOption<T> option, T value) { if (option == null) { throw new NullPointerException("option"); } if (value == null) { synchronized (options) { options.remove(option); } } else { synchronized (options) { options.put(option, value); } } return self(); } public <T> B attr(AttributeKey<T> key, T value) { if (key == null) { throw new NullPointerException("key"); } if (value == null) { synchronized (attrs) { attrs.remove(key); } } else { synchronized (attrs) { attrs.put(key, value); } } return self(); } public B validate() { if (group == null) { throw new IllegalStateException("group not set"); } if (channelFactory == null) { throw new IllegalStateException("channel or channelFactory not set"); } return self(); } public ChannelFuture bind(int inetPort) { return bind(new InetSocketAddress(inetPort)); } public ChannelFuture bind(SocketAddress localAddress) { validate(); if (localAddress == null) { throw new NullPointerException("localAddress"); } return doBind(localAddress); } private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } } final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; } abstract void init(Channel channel) throws Exception; private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); } public B handler(ChannelHandler handler) { if (handler == null) { throw new NullPointerException("handler"); } this.handler = handler; return self(); }public abstract AbstractBootstrapConfig<B, C> config(); }

复制代码

 

现在我们以示例代码为出发点,来详细分析一下引导类内部实现细节:

1、 首先看看服务端的b.group(bossGroup, workerGroup):

    调用ServerBootstrap的group方法,设置react模式的主线程池 以及 IO 操作线程池,ServerBootstrap中的group代码如下:

复制代码

复制代码
1
2
3
4
5
6
7
8
9
10
11
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { super.group(parentGroup); if (childGroup == null) { throw new NullPointerException("childGroup"); } if (this.childGroup != null) { throw new IllegalStateException("childGroup set already"); } this.childGroup = childGroup; return this; }

复制代码

在group方法中,会继续调用父类的group方法,而通过类继承图我们知道,super.group(parentGroup)其实调用的就是AbstractBootstrap的group方法。AbstractBootstrap中group代码如下:

复制代码

复制代码
1
2
3
4
5
6
7
8
9
10
public B group(EventLoopGroup group) { if (group == null) { throw new NullPointerException("group"); } if (this.group != null) { throw new IllegalStateException("group set already"); } this.group = group; return self(); }

复制代码

通过以上分析,我们知道了AbstractBootstrap中定义了主线程池group的引用,而子线程池childGroup的引用是定义在ServerBootstrap中。

当我们查看客户端Bootstrap的group方法时,我们发现,其是直接调用的父类AbstractBoostrap的group方法。

2、示例代码中的 channel()方法

无论是服务端还是客户端,channel调用的都是基类的channel方法,其实现细节如下:

复制代码
1
2
3
4
5
6
public B channel(Class<? extends C> channelClass) { if (channelClass == null) { throw new NullPointerException("channelClass"); } return channelFactory(new ReflectiveChannelFactory<C>(channelClass)); }

 

复制代码

复制代码
1
2
3
4
5
6
7
8
9
10
public B channelFactory(ChannelFactory<? extends C> channelFactory) { if (channelFactory == null) { throw new NullPointerException("channelFactory"); } if (this.channelFactory != null) { throw new IllegalStateException("channelFactory set already"); } this.channelFactory = channelFactory; return self(); }

复制代码

我们发现,其实channel方法内部,只是初始化了一个用于生产指定channel类型的工厂实例。

3、option / handler / attr 方法

     option: 设置通道的选项参数, 对于服务端而言就是ServerSocketChannel, 客户端而言就是SocketChannel;

  handler: 设置主通道的处理器, 对于服务端而言就是ServerSocketChannel,也就是用来处理Acceptor的操作;

      对于客户端的SocketChannel,主要是用来处理 业务操作;

    attr: 设置通道的属性;

 option / handler / attr方法都定义在AbstractBootstrap中, 所以服务端和客户端的引导类方法调用都是调用的父类的对应方法。

4、childHandler / childOption / childAttr 方法(只有服务端ServerBootstrap才有child类型的方法)

   对于服务端而言,有两种通道需要处理, 一种是ServerSocketChannel:用于处理用户连接的accept操作, 另一种是SocketChannel,表示对应客户端连接。而对于客户端,一般都只有一种channel,也就是SocketChannel。

   因此以child开头的方法,都定义在ServerBootstrap中,表示处理或配置服务端接收到的对应客户端连接的SocketChannel通道。

  childHandler / childOption / childAttr 在ServerBootstrap中的对应代码如下:

复制代码

复制代码
1
2
3
4
5
6
7
public ServerBootstrap childHandler(ChannelHandler childHandler) { if (childHandler == null) { throw new NullPointerException("childHandler"); } this.childHandler = childHandler; return this; }

复制代码

复制代码

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) { if (childOption == null) { throw new NullPointerException("childOption"); } if (value == null) { synchronized (childOptions) { childOptions.remove(childOption); } } else { synchronized (childOptions) { childOptions.put(childOption, value); } } return this; }

复制代码

复制代码

复制代码
1
2
3
4
5
6
7
8
9
10
11
public <T> ServerBootstrap childAttr(AttributeKey<T> childKey, T value) { if (childKey == null) { throw new NullPointerException("childKey"); } if (value == null) { childAttrs.remove(childKey); } else { childAttrs.put(childKey, value); } return this; }

复制代码

至此,引导类的属性配置都设置完毕了。

本篇总结:

1、服务端由两种线程池,用于Acceptor的React主线程和用于I/O操作的React从线程池; 客户端只有用于连接及IO操作的React的主线程池;

2、ServerBootstrap中定义了服务端React的"从线程池"对应的相关配置,都是以child开头的属性。 而用于"主线程池"channel的属性都定义在AbstractBootstrap中;

 

本篇只是简单介绍了一下引导类的配置属性, 下一篇我将详细介绍服务端引导类的Bind过程分析。

 

最后

以上就是痴情帽子最近收集整理的关于Netty深入理解ServerBootstrap 与 Bootstrap 的全部内容,更多相关Netty深入理解ServerBootstrap内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(54)

评论列表共有 0 条评论

立即
投稿
返回
顶部