概述
rocketmq的通信框架是一款高性能的通信框架-netty。今天我们学习下netty再rocketmq的应用
在rocketmq启动的过程中,初始化了nettyRemotingClient
初始化过程做了以下几件事儿
1、用操作系统核心数求作为线程数,启动一个线程池publicExecutor。该线程池的线程名称前缀为:NettyClientPublicExecutor_。该线程池有两个用处:
第一个:在远程返回结果ResponseFuture之后,将后续的callback逻辑放置到改线程池当中。
第二个:就是我们在提交一个远程请求的时候会把我们的请求包装成一个task,然后用publicExecutor线程池执行。
2、启动扫描线程池,线程数为:核心线程4,最大线程10,线程最大存活时间60秒(不包括核心线程),最大阻塞队列32个。线程名称前缀为:NettyClientScan_thread_,用法见start里面的2.11
3、初始化一个I/0线程,名称前缀:NettyClientSelector
4、根据https支持,添加ssl上下文sslContext
rocketmq启动过程会调用.NettyRemotingClient#start方法。
1、根据配置文件中的配置com.rocketmq.remoting.client.worker.size启动netty客户端非IO线程数。这些线程的名称前缀是NettyClientWorkerThread_
2、配置NioSocketChannel参数
2.1、TCP_NODELAY=true (发生小包时不再等待之前的包有没有ack,)
2.2、SO_KEEPALIVE=false (不对对端服务器监听心跳,rocketmq使用自己的心跳机制,不适用netty的心跳检测)。
2.3、连接超时时间CONNECT_TIMEOUT_MILLIS为,配置文件当中com.rocketmq.remoting.client.connect.timeout的值,如果没有配置,默认是3秒。
2.4、针对https协议,在pipleline的第一个位置添加SslHandler
2.5、pipleline添加编码handler:NettyEncoder
2.6、pipleline添加解码Handler:NettyDecoder
2.7、pipleline添加netty的心跳检测handler:IdleStateHandler。这里没有对读写空闲时间做检测,只对连接最大空闲时间做检测。可以通过com.rocketmq.remoting.client.channel.maxIdleTimeSeconds配置,默认是120秒。
2.8、NettyConnectManageHandler 跟踪连接状态。
2.9、业务处理handler:NettyClientHandler
2.10、根据netty client端配置修改tcp的配置参数:
SO_SNDBUF(发送缓冲区) (com.rocketmq.remoting.socket.sndbuf.size,默认65535)
SO_RCVBUF (接受缓冲区)(com.rocketmq.remoting.socket.rcvbuf.size,默认65535)
writeBufferLowWaterMark 写缓冲区低水位
WRITE_BUFFER_WATER_MARK 写缓冲区高水位
ALLOCATOR 为PooledByteBufAllocator
2.11、启动周期任务scanResponseTable,每1秒执行一次。对缓冲在本地未获得远程结果的ResponseFuture
未在超时时间内返回的执行定期删除操作。每一个ResponseFuture在初始化的饿时候都设置了该ResponseFuture的超时时间。该超时时间为30秒-该请求从consumer到发送之前的时间。如果在发送之前就超过30秒则不会发送该请求。
2.12、启动周期任务scanAvailableNameSrv,默认每3秒执行一次,时间可以通过com.rocketmq.remoting.client.connect.timeout配置。查询所有namesrv并保持连接状态。
这样netty的所有tcp请求都是用netty框架进行通信,这里面无疑编解码处理和业务处理handler是重点。我们分别讨论下。
NettyEncoder
rocketmq报文有两部分组成:
1、customHeader
2、body
一、写header
1、获取ByteBuf的写指针位置
2、跳过8byte
3、按照序列化工具不同用不同的方式处理
3.1、如果序列化工具为ROCKETMQ,
3.1.1、将customHeader反射转换成为HashMap类型的extFields字段
3.1.2、按照如图所示从2开始的位置写
图1
3.2、如果序列化工具为Json
3.2.1、将customHeader转换成map放置到extFields
3.2.2、使用json工具序列化RemotingCommand对象
图2
NettyDecoder
有编码的方式可以看出netty在编码的时候是按照报文的总长度+报文的形式写到对端。那么NettyDecoder正好使用了LengthFieldBasedFrameDecoder将其解码出来。
LengthFieldBasedFrameDecoder解码类型会按照报文长度将第一个位置后面的数据报文返回给NettyDecoder解密。
具体逻辑如下:
1、获取header长度(是将序列化工具标识取出的真是长度)
2、根据1获取的长度初始化一个byte数组,将相同长度的buteByffer内容写入byte数组当中。将header内容解码。
2.1、如果是json
2.1.1、将byte数组数据用json反序列化成RemotingCommand
2.1.2、将序列化类型放置到serializeTypeCurrentRPC字段里面。
2.2、如果是rocketmq
2.2.1、按照图1,将byte数组包装成RemotingCommand对象。
public static RemotingCommand rocketMQProtocolDecode(final byte[] headerArray) {
RemotingCommand cmd = new RemotingCommand();
ByteBuffer headerBuffer = ByteBuffer.wrap(headerArray);
// int code(~32767)
cmd.setCode(headerBuffer.getShort());
// LanguageCode language
cmd.setLanguage(LanguageCode.valueOf(headerBuffer.get()));
// int version(~32767)
cmd.setVersion(headerBuffer.getShort());
// int opaque
cmd.setOpaque(headerBuffer.getInt());
// int flag
cmd.setFlag(headerBuffer.getInt());
// String remark
int remarkLength = headerBuffer.getInt();
if (remarkLength > 0) {
byte[] remarkContent = new byte[remarkLength];
headerBuffer.get(remarkContent);
cmd.setRemark(new String(remarkContent, CHARSET_UTF8));
}
// HashMap<String, String> extFields
int extFieldsLength = headerBuffer.getInt();
if (extFieldsLength > 0) {
byte[] extFieldsBytes = new byte[extFieldsLength];
headerBuffer.get(extFieldsBytes);
cmd.setExtFields(mapDeserialize(extFieldsBytes));
}
return cmd;
}
过程跟编码顺序完全一致。
我们也可以学习下byte转map的方式,将来自己写通信协议都可以拿来借鉴。
public static HashMap<String, String> mapDeserialize(byte[] bytes) { if (bytes == null || bytes.length <= 0) return null; HashMap<String, String> map = new HashMap<String, String>(); ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); short keySize; byte[] keyContent; int valSize; byte[] valContent; while (byteBuffer.hasRemaining()) { keySize = byteBuffer.getShort(); keyContent = new byte[keySize]; byteBuffer.get(keyContent); valSize = byteBuffer.getInt(); valContent = new byte[valSize]; byteBuffer.get(valContent); map.put(new String(keyContent, CHARSET_UTF8), new String(valContent, CHARSET_UTF8)); } return map; }
2.3、通过len-headLen-4获取body的长度,将其直接放入body字段。
NettyClientHandler
NettyClientHandler是rocketmq consumer端获取远程broker时候处理返回结果的业务入口handler。该Handler具体流程如下:
1、根据返回的报文中opaque字段获取请求时候缓存的ResponseFuture。如果远程长时间没有返回客户端,rocketmq有定时清理任务会定期清理过期的ResponseFuture,就会导致返回的结果找不到ResponseFuture的情况,这种情况下就会在warn日志中看到receive response, but not matched any request。
2、将RemotingCommand返回结果设置到ResponseFuture的responseCommand字段
3、将该ResponseFuture从responseTable中清除。
4、如果ResponseFuture设置了invokeCallback,则将回调invokeCallback.operationComplete
的方法放置到callback线程池中执行,也就是之前说的publicExecutor。
public void executeInvokeCallback() { if (invokeCallback != null) { if (this.executeCallbackOnlyOnce.compareAndSet(false, true)) { invokeCallback.operationComplete(this); } } }
注意这里面也考虑了并发,如果有两次请同时调用回调函数,只能有一个会成功,另一个会被忽略,通过AtomicBoolean实现
private final AtomicBoolean executeCallbackOnlyOnce = new AtomicBoolean(false);
这里面的回调函数具体逻辑可以看《rocketmq client端源码分析(2)-consumer实现》。
好了,总结下今天内容
netty作为高性能通信框架,被rocketmq所采用。netty启动的时候tcp参数可以动态配置,具体版本不同,需要检查文档。同时rocketmq在启动netty的时候还启动了很多针对业务的监听器,还配置了监听信道空闲的IdleStateHandler。rocketmq的通信方面导致的问题都需要我们仔细思考各个参数是否配置合理,如何参数调优提高性能。
另外,rocketmq使用了自己实现的rocketmq序列化工具和json序列化工具。默认是json序列化工具。
参考文献:Netty学习(五)—IdleStateHandler心跳机制_zhenyutu的博客-CSDN博客_idlestatehandler
最后
以上就是怕孤单未来为你收集整理的rocketmq consumer -通信篇的全部内容,希望文章能够帮你解决rocketmq consumer -通信篇所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复