概述
基于Mina的服务程序代码:
1 package test.mina.time.server;
2
3 import java.io.IOException;
4 import java.net.InetSocketAddress;
5 import java.util.Date;
6
7 import org.apache.commons.logging.Log;
8 import org.apache.commons.logging.LogFactory;
9 import org.apache.mina.common.IoAcceptor;
10 import org.apache.mina.common.IoHandlerAdapter;
11 import org.apache.mina.common.IoSession;
12 import org.apache.mina.filter.codec.ProtocolCodecFilter;
13 import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
14 import org.apache.mina.filter.logging.LoggingFilter;
15 import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
16
17 public class TimeServer {
18
19 static Log log = LogFactory.getLog( TimeServer.class );
20
21 public static void main( final String[] args ) {
22 final IoAcceptor acceptor = new NioSocketAcceptor();
23
24 acceptor.setHandler( new IoHandlerAdapter() {
25
26 @Override
27 public void messageSent( final IoSession session,
28 final Object message )
29 throws Exception {
30 session.close();
31 }
32
33 @Override
34 public void sessionOpened( final IoSession session )
35 throws Exception {
36 session.write( new Date() );
37 }
38
39 } );
40
41 acceptor.getFilterChain().addLast( "codec",
42 new ProtocolCodecFilter( new TextLineCodecFactory() ) );
43 acceptor.getFilterChain().addLast( "logging", new LoggingFilter() );
44
45 try {
46 acceptor.bind( new InetSocketAddress( 8150 ) );
47 }
48 catch( final IOException e ) {
49 log.error( "Bind error: ", e );
50 }
51 }
52
53 }
建立监听器
22
final IoAcceptor acceptor = new NioSocketAcceptor();
在传统服务端编程中,对于一个 TCP 服务器,我们需要先建立一个监听套接字。在 Mina 中,我们创建的并不是一个监听套接字,而是一个监听套接字工厂,或者称之为“监听器( acceptor )”。该概念映射到 Mina API 中,就是 IoAcceptor 接口及其各个实现类。
传统的 BSD Socket API 中的监听套接字以及其在 Java 中的对等物 java.net.ServerSocket 都是套接字工厂,其任务是在某个本地地址上进行监听,并在有客户端连接到来时产生一个与客户端进行通信的套接字。而 Mina 的 IoAcceptor 作为监听套接字的工厂可以接受一个包含多个本地地址的集合, IoAcceptor 会自行针对这个集合中的每个本地地址分别创建监听套接字并进行监听,并且在监听器销毁时进行适当的资源清理。这样便省去了我们建立自行维护多个监听套接字的麻烦。
监听套接字由 IoAcceptor 来接管,那么服务端接受客户端连接后产生的套接字又由谁接管呢?在 Mina 的术语中,一个 TCP 连接被称作一个“会话( session )”,对应的 Mina API 是 IoSession 接口。每当服务端接受一个客户端连接,便会创建出一个新的 IoSession 对象,通过该对象就可以对新建立的 TCP 连接进行各种操作。
在这个示例中,我们选用基于 Java NIO 的监听器实现 NioSocketAcceptor 。之后,每当服务器接受一个客户端连接, IoAccetpor 都会产生一个代表客户端和服务器 TCP 连接的 IoSession 对象。
设置事件回调
24
acceptor.setHandler( new IoHandlerAdapter() {
25
26
@Override
27
public void messageSent( final IoSession session,
28
final Object message )
29
throws Exception {
30
session.close();
31
}
32
33
@Override
34
public void sessionOpened( final IoSession session )
35
throws Exception {
36
session.write( new Date() );
37
}
38
39
} );
然后,我们要让这个 IoAcceptor 创建出的 IoSession 对象知道在各种事件发生时应该如何进行处理。这实际上就是文章开头所描述的反应式模型的应用层接口。只是反应式模型中的事件分支判断部分被 Mina 封装了起来,只暴露出包含了各种事件回调的 IoHandler 接口。可供处理的事件回调包括:
- sessionCreated(IoSession)
IoSession 对象被创建时的回调,一般用于进行会话初始化操作。注意,与 sessionOpened(IoSession) 回调不同, IoSession 对象的创建并不意味着对应的底层 TCP 连接的建立,而仅仅代表它的字面意思:一个 IoSession 对象被创建出来了。
- sessionOpened(IoSession)
IoSession 对象被打开时的回调。在 TCP 中,该事件是在 TCP 连接建立时触发的,一般可用于发起连接建立后的握手、认证等操作。
- sessionClosed(IoSession)
IoSession 对象被关闭时的回调。在 TCP 中,该事件是在 TCP 连接断开时触发的。一般可用于会话资源的清理等操作。
- sessionIdle(IoSession, IdleStatus)
IoSession 对象超时时的回调。当一个 IoSession 对象在指定的超时时长内没有读写事件发生,就会触发该事件,一般可用于通知服务器断开长时间闲置的连接等处理。具体的超时设置可由 IoService.setWriteIdleTime(int) 、 IoService.setReadIdleTime(int) 和 IoService.setBothIdleTime(int) 设置。
- messageReceived(IoSession, Object)
当接收到 IoSession 对端发送的数据时的回调。
- messageSent(IoSession, Object)
当发送给 IoSession 对端的数据发送成功时的回调。
- exceptionCaught(IoSession, Throwable)
当会话过程中出现异常时的回调。通常用于错误处理。
然而,并非每个应用都对所有这些事件感兴趣,要实现所有这些方法未免繁琐,因此 Mina 提供了抽象类 IoHandlerAdapter ,它实现了各个事件的默认处理——也就是不处理。因此,通常我们只需要继承 IoHandlerAdapter 并覆盖需要处理的事件回调就可以了。在 Timer Server 示例中,回想一下我们设计的功能——当有客户端连接建立时便向客户端以文本方式发送当前时间,并关闭连接。为了实现这个功能,我们需要实现两个事件回调:首先,在 sessionOpened(IoSession) 事件回调中向对端发送当前日期,其次,在 messageSent(IoSession, Object) 事件回调中关闭连接。以上便是第 24 至 39 行所创建的匿名类完成的事情。
需要注意的是 sessionOpened 方法中的这一行:
36
session.write( new Date() );
这里的 IoSession.write(Object) 方法便是一个异步方法。对该方法的调用并不会阻塞,而是向 Mina 投递了一个异步写操作,并返回一个可用于对已投递异步写操作进行控制的 WriteFuture 对象。例如,通过调用 WriteFuture 的 await 方法或 awaitUninterruptibly() ,我们就可以同步等待该异步操作的完成。
配置过滤器链
41
acceptor.getFilterChain().addLast( "codec",
42
new ProtocolCodecFilter( new TextLineCodecFactory() ) );
43
acceptor.getFilterChain().addLast( "logging", new LoggingFilter() );
接下来,是对过滤器链的配置。过滤器链可以被当作一条两端分别连接服务器和客户端的管道,管道中首尾相接地装上零个或多个过滤器。每个过滤器都可对通过的数据进行任意的操作,包括增加、删除、更新、类型转换等。先装上的过滤器更靠近远程端点(客户端),后装上的更靠近本地端点(服务器)。 41 至 43 行先后向 IoAcceptor 的过滤器链中添加了两个过滤器,分别名为“ codec ”和“ logging ”。后者很好理解,其作用就是对 IoSession 对象上发生的各种事件进行日志记录。而前者就要多费一些口舌来解释了。
协议编解码器
我们知道, TCP 本身只是一个可靠字节流协议, TCP 层面上的二进制数据流不具备任何的边界和结构,只是纯粹的字节流。而在应用层面上,我们在不同通讯节点间处理和交换的——也就是应用构建人员直接关心的——是应用域对象(application domain object)。这就产生了矛盾:应用构建人员需要具有特定类型的域对象来适应具体问题域的需求,而在 TCP 层面,我们手里只有一股股死板的二进制数据流。为了解决这种矛盾,为底层的二进制流赋予个性,向上层应用提供鲜活的域对象,我们就需要将二者互相转换。于是就引入了一对自古以来就繁琐乏味的工作:打包和拆包。
打包,就是将域对象转换为二进制数据包,各个数据包首尾相接,形成二进制数据流;拆包,就是从无包边界的二进制数据流中将数据包一个个拆分出来并转换成相应的域对象。由于 TCP 没有包边界,相对于打包而言,拆包的工作尤其乏味和易错。为了对这些操作进行适度的封装以便重用, Mina 提供了一个有力的工具——协议编解码器( protocol codec )。简而言之,协议编解码器的职责,就是打包和拆包。针对一种类型的域对象,我们需要编写一个编码器( encoder )和一个解码器( decoder ),分别用于打包和拆包。将这对编码器和解码器通过一个 ProtocolCodecFactory 包装起来,就组成了一个协议编解码器。最后,再用一个 ProtocolCodecFilter 将这个协议编解码器包装成一个过滤器,就可以将之插入过滤器链中,来实现二进制 TCP 数据流与应用域对象的自动转换了。
解释完了原理,我们再回到 Time Server 的示例中来。我们的 Time Server 很简单,但是麻雀虽小五脏俱全,这里也同样存在着打包和拆包的问题。首先我们来确定一下域对象。仔细观察一下 24 至 39 行中构造的 IoHandlerAdapter 匿名子类,我们就可以发现,在整个客户端服务器会话过程中,除了 TCP 建立和断开过程中的握手消息以外,唯一的数据 I/O 就是在 sessionOpened(IoSession) 事件回调中由服务器向客户端发送的 Date 对象。但是,为了利用 Mina 本身提供的 TextLineCodecFactory ,我们并不采用 Date 作为域对象类型,而采用 String ,借助于 Date.toString() 方法,这个选择并不会导致什么问题。 TextLineCodecFactory 提供了一套面向字符串文本行的协议编解码器。它将每个传入编码器的字符串作为单独的一行文本打包进 TCP 流,并通过解码器将 TCP 流中的文本以行尾单位转换为 String 对象。这样,就方便地实现了 Time Server 的打包和拆包。
至此,我们已经完成了 Time Server 的大部分编码工作:我们通过 IoAcceptor 创建了监听套接字,为后续将要产生的 IoSession 对象设置了相应的事件回调处理,还配置了过滤器链,并在过滤器链中嵌入了 TextLineCodecFactory 协议编解码器。图 1 描述出了 Time Server 的结构与数据流向:
图 1. Apache Mina 2.0.x Time Server |
绑定监听套接字
45
try {
46
acceptor.bind( new InetSocketAddress( 8150 ) );
47
}
48
catch( final IOException e ) {
49
log.error( "Bind error: ", e );
50
}
好了,万事俱备只欠东风。最后,我们只需要打开监听端口,就万事大吉了。上文中我们提到过可以为 IoAcceptor 设置多个监听地址,但这里我们只需要监听通配地址 0.0.0.0 上的 8150 端口就可以了,因此直接在 IoAcceptor.bind(SocketAddress) 中指定该监听地址即可。
IoAcceptor.bind(SocketAddress) 并不仅仅是传统 BSD Socket API 中的 socket / bind / listen / accept 经典操作序列中的 bind ,而是集四者于一身,以达到简化编程的目的。
Run!
编译后,一个热腾腾的 Time Server 就新鲜出炉了!让我们来跑跑看。首先配置一下日志策略,将日志输出指向标准输出, log4j.xml 内容如下:
1 <?xml version="1.0" encoding="UTF-8"?>
2 <!DOCTYPE log4j:configuration SYSTEM "log4j.dtd" >
3 <log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
4
<appender name="stdout" class="org.apache.log4j.ConsoleAppender">
5
<layout class="org.apache.log4j.PatternLayout">
6
<param name="ConversionPattern" value="%p - %c{1} - %m%n" />
7
</layout>
8
</appender>
9
<root>
10
<level value="info" />
11
<appender-ref ref="stdout" />
12
</root>
13 </log4j:configuration>
现在运行服务器,再打开终端,用 telnet 连接服务器:
$ telnet localhost 8150
一切正常的话,将获得类似如下的输出:
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Tue Jan 15 15:50:38 CST 2008
Connection closed by foreign host.
标注出的这行,就是服务器发送过来的当前服务器时间。同时,服务器端将在标准输出上有类似如下的日志输出:
INFO - LoggingFilter - [/127.0.0.1:3080] CREATED
INFO - LoggingFilter - [/127.0.0.1:3080] OPENED
INFO - LoggingFilter - [/127.0.0.1:3080] SENT: Tue Jan 15 15:50:38 CST 2008
INFO - LoggingFilter - [/127.0.0.1:3080] CLOSED
可以看到标注出的与客户端输出对应的日志输出行。
我们还可以做一个小小的改动来详细地看一下协议编解码器的工作过程:将两个过滤器的添加顺序对掉一下。对掉之前,日志过滤器在协议编解码过滤器之上(见图 1 ),因此,在日志中输出的是 Date 对象,更具体的说,是 Date.toString() 的结果。对掉之后,日志过滤器位于协议编解码过滤器之下,我们便可以看到由编码器编码后的日期字符串的字节序列(对应的时间字符串是“ Tue Jan 15 16:34:36 CST 2008 ”):
INFO - LoggingFilter - [/192.168.80.180:60144] CREATED
INFO - LoggingFilter - [/192.168.80.180:60144] OPENED
INFO - LoggingFilter - [/192.168.80.180:60144] SENT: HeapBuffer[pos=0 lim=29 cap=32: 54 75 65 20 4A 61 6E 20 31 35 20 31 36 3A 33 34...]
INFO - LoggingFilter - [/192.168.80.180:60144] SENT: HeapBuffer[pos=0 lim=0 cap=0: empty]
INFO - LoggingFilter - [/192.168.80.180:60144] CLOSED
小结
区区 53 行代码,我们便用 Mina 实现了一个全功能的并发网络时间服务器。值得注意的是,这并不是一个如同 UNP 中的第一个 Echo Server 示例那样的以同步方式串行处理客户端请求的迭代式服务器,而是一个基于 Java NIO 多路复用机制的高性能异步并发服务器。
Time Server 在并发策略上采用的是默认的单线程策略。我们可以通过在过滤器链中插入一个 ExecutorFilter 来启用线程池来完成 IoHandler 中定义的事件回调操作。当在事件处理过程中存在文件 I/O 或数据库操作等耗时较长的同步阻塞操作时,采用多线程的并发策略可以获取更高的并发度。在 Mina 1.1.x 中,除了 ExecutorFilter 的方式,每个 IoService (各种 IoAcceptor 和以后将要介绍的 IoConnector 都是 IoService 的一种)具备一个 ThreadModel 域,可以使用特定的线程模型来制定并发策略。然而这种方式增加了编程的复杂度,因此在 2.0.x 中被去除了。
最后
以上就是能干灰狼为你收集整理的Mina2.0.x 服务器端的全部内容,希望文章能够帮你解决Mina2.0.x 服务器端所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复