概述
文章目录
- 一、群聊系统
- 二、单聊系统
- 三、心跳检测
- 四、
【这就是个 demo,实际IM比这复杂N倍 】
一、群聊系统
实现的核心是在 服务端的handler
这里缓存一份Channel
:
只有这样,服务端才能将一个客户端的消息转给其他客户端。
这里使用了 ChannelGroup
这类,看它的doc 说明:
A thread-safe Set that contains open Channels and provides various bulk operations on them. Using
ChannelGroup, you can categorize Channels into a meaningful group (e.g. on a per-service or per-state
basis.) A closed Channel is automatically removed from the collection, so that you don't need to worry
about the life cycle of the added Channel. A Channel can belong to more than one ChannelGroup.
提取要点:
ChannelGroup
缓存了channel,方便对channel 批量操作- 可以将channel 根据 “服务” OR “状态” 分组
- 一个channel 能够属于不同的组
- channel关闭的时候,
ChannelGroup
会自动删除它
// 核心 【服务端】handler 的实现
public class GroupchatServerHandler extends SimpleChannelInboundHandler<String> {
// 单例
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
SocketAddress socketAddress = ctx.channel().remoteAddress();
channelGroup.forEach(u -> {
if (u != ctx.channel()) {
u.writeAndFlush("[客户端]" + socketAddress + " 说:" + msg);
} else {
u.writeAndFlush("[自己] 说了:" + msg);
}
});
}
/**
* Gets called after the ChannelHandler was added to the actual context and it's ready to handle events.
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
channelGroup.add(ctx.channel());
channelGroup.writeAndFlush("[客户端]" + ctx.channel().remoteAddress() + " 进入聊天室");
}
/**
* Gets called after the ChannelHandler was added to the actual context and it's ready to handle events
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
channelGroup.writeAndFlush("[客户端]" + ctx.channel().remoteAddress() + " 离开了聊天室");
//NO NEED TO call remove() explictly
//channelGroup.remove(ctx.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
二、单聊系统
单聊系统,消息也需要经过服务端的转发;因此服务端需要缓存用户状态,这样服务端才可以writeAndFlush
给特定的用户。
三、心跳检测
实现一个心跳检测机制:
- 服务器3秒没有读,提示读空闲
- 服务器5秒没有写,提示写空闲
- 服务器 7秒没有读OR写,提示读写空闲
// 这放出来 核心的 ChannelInitializer
代码。我们可以根据 读OR 写空闲情况,加上特定的业务逻辑,比如 读写空闲 1H+ 就关闭掉连接
private ChannelInitializer<SocketChannel> initialChannel() {
return new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
/**
* readerIdleTime: 多久没读就发一个心跳包
* writerIdleTime:多久没写就发一个心跳包
* allIdleTime:多久没有读写就发一个心跳包
*
* IdleStateEvent 触发之后,就会传递给管道中的下一个 channelHandler.userHandlerTrigger
* 去处理
*/
pipeline.addLast(new IdleStateHandler(3, 5, 7, TimeUnit.SECONDS));
// 这里加入一个自定义的 心跳handler
pipeline.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
IdleState state = event.state();
switch (state) {
...
} }
}
}
});
}
};
}
不知道你也没有类似疑惑,为啥TCP已经心跳了,还要有一个自己实现一个心跳呢?
先来做个粗浅的认识:
看上去 KEEP_ALIVE
很美好,但其实服务端有时并不能准确获取到连接的状态。比如,客户端(手机)改成飞行模式了,连接终止只是客户端的,可能在服务端看来连接还是存在的。
四、
最后
以上就是大方啤酒为你收集整理的Netty(4)聊天室 |心跳一、群聊系统二、单聊系统三、心跳检测四、的全部内容,希望文章能够帮你解决Netty(4)聊天室 |心跳一、群聊系统二、单聊系统三、心跳检测四、所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复