概述
Spring 的解决方案
Spring 的解决方案是把原来的 WebSocketSession 封了一层,即 org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator
,下面我们就看看大厂是怎么做的。
类成员
首先来看一下这个类中都有哪些属性
/**
这两个限制条件默认值参见 - org.springframework.web.socket.messaging.SubProtocolWebSocketHandler;
// private int sendTimeLimit = 10 * 1000;
// private int sendBufferSizeLimit = 512 * 1024;
**/
// 发送消息最大时长(milliseconds)
private final int sendTimeLimit;
// 缓存消息队列占用最大空间大小(bytes)
private final int bufferSizeLimit;
// 缓存消息队列
private final Queue<WebSocketMessage<?>> buffer = new LinkedBlockingQueue<WebSocketMessage<?>>();
// 缓存消息队列占用空间(bytes)
private final AtomicInteger bufferSize = new AtomicInteger();
// 记录当前发送消息的开始时间
private volatile long sendStartTime;
// 任务是否已超过(空间/时间)限制
private volatile boolean limitExceeded;
// session 已关闭
private volatile boolean closeInProgress;
// flush message buffer 锁
private final Lock flushLock = new ReentrantLock();
// close session 锁
private final Lock closeLock = new ReentrantLock();
稍微写了一下注释,我的表达能力有限,可能让人有些一头雾水,下面就结合具体代码来解释一下这些变量都是干嘛的,怎么用它们实现 sendMessage 方法的线程安全和正确的业务逻辑的。
sendMessage 方法
这是之前的问题所在处,来看下这个方法的具体实现。
public void sendMessage(WebSocketMessage<?> message) throws IOException {
if (shouldNotSend()) {
return;
}
this.buffer.add(message);
this.bufferSize.addAndGet(message.getPayloadLength());
do {
if (!tryFlushMessageBuffer()) {
if (logger.isTraceEnabled()) {
String text = String.format("Another send already in progress: " +
"session id '%s':, "in-progress" send time %d (ms), buffer size %d bytes",
getId(), getTimeSinceSendStarted(), this.bufferSize.get());
logger.trace(text);
}
checkSessionLimits();
break;
}
}
while (!this.buffer.isEmpty() && !shouldNotSend());
}
private boolean shouldNotSend() {
return (this.limitExceeded || this.closeInProgress);
}
首先检查了当前 session 的状态,有没有超限和关闭。
如果没有,继续往下走,将 message 丢进 buffer 里,并累加更新 bufferSize。
下面进到循环中,判断条件是 buffer 为空即缓存的消息都发送完了或者超限了。 循环体是去 tryFlushMessageBuffer, 如果成功继续循环,如果失败则执行 checkSessionLimits() 方法,退出循环。
接下来看看 tryFlushMessageBuffer 里干了什么。
private boolean tryFlushMessageBuffer() throws IOException {
if (this.flushLock.tryLock()) {
try {
while (true) {
WebSocketMessage<?> message = this.buffer.poll();
if (message == null || shouldNotSend()) {
break;
}
this.bufferSize.addAndGet(message.getPayloadLength() * -1);
this.sendStartTime = System.currentTimeMillis();
getDelegate().sendMessage(message);
this.sendStartTime = 0;
}
}
finally {
this.sendStartTime = 0;
flushLock.unlock();
}
return true;
}
return false;
}
首先去尝试获取 flushLock,没获取到就返回 false, 返回 sendMessage 方法里的 if 代码块,即打印 trace 日志和 checkSessionLimits, 整个 sendMessage 方法就执行完了。
那。。。消息咋整?
没关系,已经丢到 buffer 里了,拿到锁的线程会替它发送的,咱们接着往下看。
这里是一个 while 循环,只要 buffer 里还有 Message 且当前 session 没有超限,就会一直执行。
然后更新 bufferSize 的值,减去当前发送消息的大小,更新 sendStartTime 为当前时间,供 checkSessionLimits 方法做超时判断,说到这里,顺便把 checkSessionLimits 方法的代码也贴一下吧。
private void checkSessionLimits() throws IOException {
if (!shouldNotSend() && this.closeLock.tryLock()) {
try {
if (getTimeSinceSendStarted() > this.sendTimeLimit) {
String format = "Message send time %d (ms) for session '%s' exceeded the allowed limit %d";
String reason = String.format(format, getTimeSinceSendStarted(), getId(), this.sendTimeLimit);
setLimitExceeded(reason);
}
else if (this.bufferSize.get() > this.bufferSizeLimit) {
String format = "The send buffer size %d bytes for session '%s' exceeded the allowed limit %d";
String reason = String.format(format, this.bufferSize.get(), getId(), this.bufferSizeLimit);
setLimitExceeded(reason);
}
}
finally {
this.closeLock.unlock();
}
}
}
那。。。要是进入 finally 代码块,在unlock 之前 buffer 里添了东西咋整?
没关系,退出 tryFlushMessageBuffer 回到 sendMessage 里头,还有一层循环在等着它,会再次去检查 buffer 和 是否超限,符合条件就再进入循环,尝试拿锁,拿到了就继续发,没拿到说明已经有线程进去发消息了,那您就甭操心了。
看完代码,悄悄把叉腰的手放了下来,改为立正姿势, ConcurrentWebSocketSessionDecorator
一共不到 200 行代码,还实现了拥塞控制,不愧是大厂,我们还是要学习一个啊。。。
最后
以上就是跳跃烤鸡为你收集整理的tomcat websocket 并发问题解决(三)的全部内容,希望文章能够帮你解决tomcat websocket 并发问题解决(三)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复