我是靠谱客的博主 慈祥蛋挞,这篇文章主要介绍Apache Mina 入门 (二)—— 异步通信机制,现在分享给大家,希望可以做个参考。

通过前面的Apache mina 入门(一)— 基础知识
我们可以了解到 mina是个异步通信框架,一般使用场景是服务端开发,长连接、异步通信使用mina是及其方便的。不多说,看例子。

本次mina 使用的例子是使用maven构建的,过程中需要用到的jar包如下:

复制代码
1
2
3
4
5
6
7
8
9
10
11
<!-- mina --> <dependency> <groupId>org.apache.mina</groupId> <artifactId>mina-integration-beans</artifactId> <version>2.0.16</version> </dependency> <dependency> <groupId>org.apache.mina</groupId> <artifactId>mina-core</artifactId> <version>2.0.16</version> </dependency>

导入jar包,pom文件会报错,具体错误如下:

复制代码
1
2
Missing artifact org.apache.mina:mina-core:bundle:2.0.16 pom.xml

原因是因为缺少maven-bundle-plugin导入即可

复制代码
1
2
3
4
5
6
7
8
9
<build> <plugins> <plugin> <groupId>org.apache.felix</groupId> <artifactId>maven-bundle-plugin</artifactId> <extensions>true</extensions> </plugin> </plugins> </build>

本次mina机制的消息通讯接口规范主要为:
包头2个字节,包长2个字节,协议类型2个字节,数据包标识码8个字节,报文正文内容,校验码4个字节,包尾2个字节
mina 与spring 结合后,使用更加方便。
spring配置文件如下:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd" default-lazy-init="false"> <bean class="org.springframework.beans.factory.config.CustomEditorConfigurer"> <property name="customEditors"> <map> <entry key="java.net.SocketAddress" value="org.apache.mina.integration.beans.InetSocketAddressEditor"></entry> </map> </property> </bean> <bean id="ioAcceptor" class="org.apache.mina.transport.socket.nio.NioSocketAcceptor" init-method="bind" destroy-method="unbind"> <!--端口号 --> <property name="defaultLocalAddress" value=":8888"></property> <!--绑定自己实现的handler --> <property name="handler" ref="serverHandler"></property> <!--声明过滤器的集合 --> <property name="filterChainBuilder" ref="filterChainBuilder"></property> <property name="reuseAddress" value="true" /> </bean> <bean id="filterChainBuilder" class="org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder"> <property name="filters"> <map> <!--mina自带的线程池filter --> <entry key="executor" value-ref="executorFilter"></entry> <entry key="mdcInjectionFilter" value-ref="mdcInjectionFilter" /> <!--自己实现的编解码器filter --> <entry key="codecFilter" value-ref="codecFilter" /> <!--日志的filter --> <entry key="loggingFilter" value-ref="loggingFilter" /> <!--心跳filter --> <entry key="keepAliveFilter" value-ref="keepAliveFilter" /> </map> </property> </bean> <!-- executorFilter多线程处理 --> <bean id="executorFilter" class="org.apache.mina.filter.executor.ExecutorFilter" /> <bean id="mdcInjectionFilter" class="org.apache.mina.filter.logging.MdcInjectionFilter"> <constructor-arg value="remoteAddress" /> </bean> <!--日志 --> <bean id="loggingFilter" class="org.apache.mina.filter.logging.LoggingFilter" /> <!--编解码 --> <bean id="codecFilter" class="org.apache.mina.filter.codec.ProtocolCodecFilter"> <constructor-arg> <!--构造函数的参数传入自己实现的对象 --> <bean class="com.onion.mina.server.NSMinaCodeFactory"></bean> </constructor-arg> </bean> <!--心跳检测filter --> <bean id="keepAliveFilter" class="org.apache.mina.filter.keepalive.KeepAliveFilter"> <!--构造函数的第一个参数传入自己实现的工厂 --> <constructor-arg> <bean class="com.onion.mina.server.NSMinaKeepAliveMessageFactory"></bean> </constructor-arg> <!--第二个参数需要的是IdleStatus对象,value值设置为读写空闲 --> <constructor-arg type="org.apache.mina.core.session.IdleStatus" value="BOTH_IDLE"> </constructor-arg> <!--心跳频率,不设置则默认5 --> <property name="requestInterval" value="1500" /> <!--心跳超时时间,不设置则默认30s --> <property name="requestTimeout" value="30" /> <!--默认false,比如在心跳频率为5s时,实际上每5s会触发一次KeepAliveFilter中的session_idle事件, 该事件中开始发送心跳包。当此参数设置为false时,对于session_idle事件不再传递给其他filter,如果设置为true, 则会传递给其他filter,例如handler中的session_idle事件,此时也会被触发--> <property name="forwardEvent" value="true" /> </bean> <!--自己实现的handler--> <bean id="serverHandler" class="com.onion.mina.server.NSMinaHandler" /> </beans>

mina 核心包括 IOhandler处理器,编码工厂(包含编码器,解码器)等核心。

服务端代码如下:

编码工厂 (NSMinaCodeFactory)如下所示:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class NSMinaCodeFactory implements ProtocolCodecFactory { private final NSProtocalEncoder encoder; private final NSProtocalDecoder decoder; public NSMinaCodeFactory() { this(Charset.forName("utf-8")); } public NSMinaCodeFactory(Charset charset) { encoder = new NSProtocalEncoder(); decoder = new NSProtocalDecoder(); } public ProtocolDecoder getDecoder(IoSession arg0) throws Exception { // TODO Auto-generated method stub return decoder; } public ProtocolEncoder getEncoder(IoSession arg0) throws Exception { // TODO Auto-generated method stub return encoder; } }

编码器——负责将需要发送给客户端的数据进行编码,然后发送给客户端

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class NSProtocalEncoder extends ProtocolEncoderAdapter { private static final Logger logger = Logger.getLogger(NSProtocalEncoder.class); @SuppressWarnings("unused") private final Charset charset = Charset.forName("GBK"); /** * 在此处实现包的编码工作,并把它写入输出流中 */ public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception { // TODO Auto-generated method stub if(message instanceof BaseMessageForClient){ BaseMessageForClient clientmessage = (BaseMessageForClient)message; byte[] packhead_arr = clientmessage.getPackHead().getBytes(charset);//包头2个字节 byte[] length_arr = ByteTools.intToByteArray(clientmessage.getLength()+19, 2);//包长 byte[] funcid_arr = ByteTools.intToByteArray(clientmessage.getFuncid(), 1);//协议类型 byte[] packetIdCode_arr = ByteTools.longToByteArray(clientmessage.getPacketIdCode(), 8);//数据包标识码 byte[] content_arr = clientmessage.getContent().getBytes(charset);//内容 byte[] checkcode_arr = ByteTools.longToByteArray(clientmessage.getCheckCode(), 4);//校验码 byte[] packtail_arr = clientmessage.getPackTail().getBytes();//包尾 IoBuffer buffer = IoBuffer.allocate(packhead_arr.length + length_arr.length + funcid_arr.length + packetIdCode_arr.length+ content_arr.length + checkcode_arr.length + packtail_arr.length); buffer.setAutoExpand(true); buffer.put(packhead_arr); buffer.put(length_arr); buffer.put(funcid_arr); buffer.put(packetIdCode_arr); buffer.put(content_arr); buffer.put(checkcode_arr); buffer.put(packtail_arr); buffer.flip(); out.write(buffer); out.flush(); buffer.free(); }else{ String value = (String)message; logger.warn("encode message:" + message); IoBuffer buffer = IoBuffer.allocate(value.getBytes().length); buffer.setAutoExpand(true); if(value != null){ buffer.put(value.trim().getBytes()); } buffer.flip(); out.write(buffer); out.flush(); buffer.free(); } } }

解码器——负责将客户端发送过来的数据进行解码变换为对象,传输给IoHandler处理器进行处理。本解码器包含了断包问题解决。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
public class NSProtocalDecoder implements ProtocolDecoder { private static final Logger logger = Logger.getLogger(NSProtocalDecoder.class); private final AttributeKey context = new AttributeKey(getClass(), "context"); private final Charset charset = Charset.forName("GBK"); private final String PACK_HEAD = "$$"; //包头 private final String PACK_TAIL = "rn"; //包尾 // 请求报文的最大长度 100k private int maxPackLength = 102400; public int getMaxPackLength() { return maxPackLength; } public void setMaxPackLength(int maxPackLength) { if (maxPackLength <= 0) { throw new IllegalArgumentException("请求报文最大长度:" + maxPackLength); } this.maxPackLength = maxPackLength; } private Context getContext(IoSession session) { Context ctx; ctx = (Context) session.getAttribute(context); if (ctx == null) { ctx = new Context(); session.setAttribute(context, ctx); } return ctx; } public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { Long start = System.currentTimeMillis(); // 报文前缀长度 包头2个字节,包长2个字节,协议类型2个字节,数据包标识码8个字节,校验码4个字节,包尾2个字节 final int packHeadLength = 19; // 先获取上次的处理上下文,其中可能有未处理完的数据 Context ctx = getContext(session); // 先把当前buffer中的数据追加到Context的buffer当中 ctx.append(in); // 把position指向0位置,把limit指向原来的position位置 IoBuffer buf = ctx.getBuffer(); buf.flip(); // 然后按数据包的协议进行读取 while (buf.remaining() >= packHeadLength) { logger.debug("test 长度1:" + buf.remaining()); buf.mark(); // 读取包头 2个字节 String packhead = new String(new byte[]{buf.get(),buf.get()}); logger.debug("包头:" + packhead); if(PACK_HEAD.equals(packhead)){ //读取包的长度 2个字节 报文的长度,不包含包头和包尾 byte[] length_byte = new byte[]{buf.get(),buf.get()}; byte[] length_byte_arr = new byte[]{0,0,0,0}; length_byte_arr[2] = length_byte[0]; length_byte_arr[3] = length_byte[1]; int length = ByteTools.byteArrayToInt(length_byte_arr); logger.debug("长度:" + length); logger.debug("test 长度1:" + buf.remaining()); // 检查读取是否正常,不正常的话清空buffer if (length < 0 || length > maxPackLength) { logger.debug("报文长度[" + length + "] 超过最大长度:" + maxPackLength + "或者小于0,清空buffer"); buf.clear(); break; //packHeadLength - 2 :减去包尾的长度, //length - 2 <= buf.remaining() :代表length-本身长度占用的两个字节-包头长度 }else if(length >= packHeadLength && length - 4 <= buf.remaining()){ //读取协议类型2个字节 byte[] funcid_byte = new byte[]{buf.get()}; byte[] funcid_byte_arr = new byte[]{0,0,0,0}; //funcid_byte_arr[2] = funcid_byte[0]; funcid_byte_arr[3] = funcid_byte[0]; int funcid = ByteTools.byteArrayToInt(funcid_byte_arr); logger.warn("协议类型:" + funcid); //读取数据包标识码8个字节 byte[] packetIdCode_byte = new byte[]{buf.get(),buf.get(),buf.get(),buf.get(),buf.get(),buf.get(),buf.get(),buf.get()}; long packetIdCode = ByteTools.byteArrayToLong(packetIdCode_byte); logger.debug("数据包标识码:" + packetIdCode); //读取报文正文内容 int oldLimit = buf.limit(); logger.debug("limit:" + (buf.position() + length)); //当前读取的位置 + 总长度 - 前面读取的字节长度 - 校验码 buf.limit(buf.position() + length - 19); String content = buf.getString(ctx.getDecoder()); buf.limit(oldLimit); logger.debug("报文正文内容:" + content); CRC32 crc = new CRC32(); crc.update(content.getBytes("GBK")); //读取校验码 4个字节 byte[] checkcode_byte = new byte[]{buf.get(),buf.get(),buf.get(),buf.get()}; byte[] checkcode_byte_arr = new byte[]{0,0,0,0,0,0,0,0}; checkcode_byte_arr[4] = checkcode_byte[0]; checkcode_byte_arr[5] = checkcode_byte[1]; checkcode_byte_arr[6] = checkcode_byte[2]; checkcode_byte_arr[7] = checkcode_byte[3]; long checkcode = ByteTools.byteArrayToLong(checkcode_byte_arr); logger.debug("校验码:" + checkcode); //验证校验码 if(checkcode != crc.getValue()){ // 如果消息包不完整,将指针重新移动消息头的起始位置 buf.reset(); break; } //读取包尾 2个字节 String packtail = new String(new byte[]{buf.get(),buf.get()}); logger.debug("包尾:" + packtail); if(!PACK_TAIL.equals(packtail)){ // 如果消息包不完整,将指针重新移动消息头的起始位置 buf.reset(); break; } BaseMessageForServer message = new BaseMessageForServer(); message.setLength(length); message.setCheckCode(checkcode); message.setFuncid(funcid); message.setPacketIdCode(packetIdCode); message.setContent(content); out.write(message); }else{ // 如果消息包不完整,将指针重新移动消息头的起始位置 buf.reset(); break; } }else{ // 如果消息包不完整,将指针重新移动消息头的起始位置 buf.reset(); break; } } if (buf.hasRemaining()) { // 将数据移到buffer的最前面 IoBuffer temp = IoBuffer.allocate(maxPackLength).setAutoExpand(true); temp.put(buf); temp.flip(); buf.clear(); buf.put(temp); } else {// 如果数据已经处理完毕,进行清空 buf.clear(); } } public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception { // TODO Auto-generated method stub } public void dispose(IoSession session) throws Exception { // TODO Auto-generated method stub } // 记录上下文,因为数据触发没有规模,很可能只收到数据包的一半 // 所以,需要上下文拼起来才能完整的处理 private class Context { private final CharsetDecoder decoder; private IoBuffer buf; private int matchCount = 0; private int overflowPosition = 0; private Context() { decoder = charset.newDecoder(); buf = IoBuffer.allocate(3000).setAutoExpand(true); } public CharsetDecoder getDecoder() { return decoder; } public IoBuffer getBuffer() { return buf; } @SuppressWarnings("unused") public int getOverflowPosition() { return overflowPosition; } @SuppressWarnings("unused") public int getMatchCount() { return matchCount; } @SuppressWarnings("unused") public void setMatchCount(int matchCount) { this.matchCount = matchCount; } @SuppressWarnings("unused") public void reset() { overflowPosition = 0; matchCount = 0; decoder.reset(); } public void append(IoBuffer in) { getBuffer().put(in); } } }

NSMinaHandler——处理器,处理业务数据。继承IoHandlerAdapter 接口,主要重写messageReceived 方法

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public class NSMinaHandler extends IoHandlerAdapter { private final Logger logger = Logger.getLogger(NSMinaHandler.class); public static ConcurrentHashMap<Long, IoSession> sessionHashMap = new ConcurrentHashMap<Long, IoSession>(); @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { session.closeOnFlush(); logger.error("session occured exception, so close it." + cause.getMessage()); } @Override public void messageReceived(IoSession session, Object message) throws Exception { BaseMessageForServer basemessage = (BaseMessageForServer) message; logger.debug("客户端" + ((InetSocketAddress) session.getRemoteAddress()).getAddress() .getHostAddress() + "连接成功!"); session.setAttribute("type", message); String remoteAddress = ((InetSocketAddress) session.getRemoteAddress()) .getAddress().getHostAddress(); session.setAttribute("ip", remoteAddress); // 组装消息内容,返回给客户端 BaseMessageForClient messageForClient = new BaseMessageForClient(); messageForClient.setFuncid(2); if (basemessage.getContent().indexOf("hello") > 0) { // 内容 messageForClient.setContent("hello,我收到您的消息了! "); } else { // 内容 messageForClient.setContent("恭喜,您已经入门! "); } // 校验码生成 CRC32 crc32 = new CRC32(); crc32.update(messageForClient.getContent().getBytes()); // crc校验码 messageForClient.setCheckCode(crc32.getValue()); // 长度 messageForClient .setLength(messageForClient.getContent().getBytes().length); // 数据包标识码 messageForClient.setPacketIdCode(basemessage.getPacketIdCode()); session.write(messageForClient); } @Override public void messageSent(IoSession session, Object message) throws Exception { logger.debug("messageSent:" + message); } @Override public void sessionCreated(IoSession session) throws Exception { logger.debug("remote client [" + session.getRemoteAddress().toString() + "] connected."); Long time = System.currentTimeMillis(); session.setAttribute("id", time); sessionHashMap.put(time, session); } @Override public void sessionClosed(IoSession session) throws Exception { logger.debug("sessionClosed"); session.closeOnFlush(); sessionHashMap.remove(session.getAttribute("id")); } @Override public void sessionIdle(IoSession session, IdleStatus status) throws Exception { logger.debug("session idle, so disconnecting......"); session.closeOnFlush(); logger.warn("disconnected"); } @Override public void sessionOpened(IoSession session) throws Exception { logger.debug("sessionOpened."); } }

还有个就是心跳工厂:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class NSMinaKeepAliveMessageFactory implements KeepAliveMessageFactory { private final Logger logger = Logger .getLogger(NSMinaKeepAliveMessageFactory.class); private BaseMessageForServer basemessage; /** 心跳包内容 */ private static long packetIdCode = 0; /** * 判断是否心跳请求包 是的话返回true */ public boolean isRequest(IoSession session, Object message) { // TODO Auto-generated method stub if (message instanceof BaseMessageForServer) { basemessage = (BaseMessageForServer) message; // 心跳包方法协议类型 if (basemessage.getFuncid() == 3) { // 为3,代表是一个心跳包, packetIdCode = basemessage.getPacketIdCode(); return true; } else { return false; } } else { return false; } } /** * 由于被动型心跳机制,没有请求当然也就不关注反馈 因此直接返回false */ public boolean isResponse(IoSession session, Object message) { // TODO Auto-generated method stub return false; } /** * 被动型心跳机制无请求 因此直接返回nul */ public Object getRequest(IoSession session) { // TODO Auto-generated method stub return null; } /** * 根据心跳请求request 反回一个心跳反馈消息 */ public Object getResponse(IoSession session, Object request) { // 组装消息内容,返回给客户端 BaseMessageForClient messageForClient = new BaseMessageForClient(); messageForClient.setFuncid(4); // 内容 messageForClient.setContent("2222"); // 校验码生成 CRC32 crc32 = new CRC32(); crc32.update(messageForClient.getContent().getBytes()); // crc校验码 messageForClient.setCheckCode(crc32.getValue()); // 长度 messageForClient .setLength(messageForClient.getContent().getBytes().length); // 数据包标识码 messageForClient.setPacketIdCode(packetIdCode); return messageForClient; } }

到此服务端代码结束。其实服务端与客户端都存在相似之处。编码,解码器都是一样的。客户端启动程序如下:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class ClientTest { public static void main(String[] args) { NioSocketConnector connector = new NioSocketConnector(); //添加过滤器 connector.getFilterChain().addLast("logger", new LoggingFilter()); //设置编码,解码过滤器 connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ByteArrayCodecFactory())); //connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("utf-8"))));//设置编码过滤器 connector.setHandler(new ClientHandler());//设置事件处理器 ConnectFuture cf = connector.connect(new InetSocketAddress("127.0.0.1",8888)); //建立连接 cf.awaitUninterruptibly(); //等待连接创建完成 BaseMessageForServer message = new BaseMessageForServer(); String content = "hello world!"; CRC32 crc = new CRC32(); try { crc.update(content.getBytes("GBK")); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } message.setFuncid(5); message.setPacketIdCode(10000); message.setContent(content); message.setCheckCode(crc.getValue()); message.setLength(content.getBytes().length); cf.getSession().write(message); } }

到此mina基本的已完成。
源码下载地址:http://download.csdn.net/download/u012151597/10165045

mina 入门实例

最后

以上就是慈祥蛋挞最近收集整理的关于Apache Mina 入门 (二)—— 异步通信机制的全部内容,更多相关Apache内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(58)

评论列表共有 0 条评论

立即
投稿
返回
顶部