概述
1.maven依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.25.Final</version>
</dependency>
2.netty代码
部分业务逻辑代码已省略。
后端框架为SpringBoot+MyBatis+Spring MVC
ChatHandler.java
/**
* 处理消息的handler
* TextWebSocketFrame: 在netty中,是用于为websocket专门处理文本的对象,frame是消息的载体
*/
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
/**
* 用来保存所有的客户端连接
*/
private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
* 当Channel中有新的事件消息会自动调用
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
// 当接收到数据后会自动调用
// 获取客户端发送过来的文本消息
String text = msg.text();
System.out.println("接收到消息数据为:" + text);
Message message = JSON.parseObject(text, Message.class);
// 通过SpringUtil工具类获取Spring上下文容器
ChatRecordService chatRecordService = SpringUtil.getBean(ChatRecordService.class);
switch (message.getType()) {
// 处理客户端连接的消息
case 0:
// 建立用户与通道的关联
String userid = message.getChatRecord().getUserid();
UserChannelMap.put(userid, ctx.channel());
System.out.println("建立用户:" + userid + "与通道" + ctx.channel().id() + "的关联");
UserChannelMap.print();
break;
// 处理客户端发送好友消息
case 1:
System.out.println("接收到用户消息");
// 将聊天消息保存到数据库
TbChatRecord chatRecord = message.getChatRecord();
chatRecordService.insert(chatRecord);
// 如果发送消息好友在线,可以直接将消息发送给好友
Channel channel = UserChannelMap.get(chatRecord.getFriendid());
if (channel != null) {
channel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(message)));
} else {
// 如果不在线,暂时不发送
System.out.println("用户" + chatRecord.getFriendid() + "不在线");
}
break;
// 处理客户端的签收消息
case 2:
// 将消息记录设置为已读
chatRecordService.updateStatusHasRead(message.getChatRecord().getId());
break;
case 3:
// 接收心跳消息
System.out.println("接收到心跳消息:" + JSON.toJSONString(message));
break;
default:
}
}
/**
* 当有新的客户端连接服务器之后,会自动调用这个方法
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// 将新的通道加入到clients
clients.add(ctx.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
UserChannelMap.removeByChannelId(ctx.channel().id().asLongText());
ctx.channel().close();
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("关闭通道");
UserChannelMap.removeByChannelId(ctx.channel().id().asLongText());
UserChannelMap.print();
}
}
HearBeatHandler.java
/**
* 有时候Netty并不能在到客户端关闭时,自动关闭对应的通道资源。所以需要一个心跳机制,去检测每个通道是否空闲。
* 如果空闲超过一定时间,就需要将对应客户端的通道资源关闭。客户端需要每隔一段时间发送一条消息,用来保持心跳。
* 该代码中约定message.getType==3为心跳消息,不需要处理
*/
public class HearBeatHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
if (idleStateEvent.state() == IdleState.READER_IDLE) {
System.out.println("读空闲事件触发...");
} else if (idleStateEvent.state() == IdleState.WRITER_IDLE) {
System.out.println("写空闲事件触发...");
} else if (idleStateEvent.state() == IdleState.ALL_IDLE) {
System.out.println("---------------");
System.out.println("读写空闲事件触发");
System.out.println("关闭通道资源");
ctx.channel().close();
}
}
}
}
Message.java
该实体为和前端约定好的格式,通过不同的消息类型,达到不同的功能。
public class Message {
private Integer type; // 消息类型
private TbChatRecord chatRecord; // 聊天消息
private Object ext; // 扩展消息字段
public Integer getType() {
return type;
}
public void setType(Integer type) {
this.type = type;
}
public TbChatRecord getChatRecord() {
return chatRecord;
}
public void setChatRecord(TbChatRecord chatRecord) {
this.chatRecord = chatRecord;
}
public Object getExt() {
return ext;
}
public void setExt(Object ext) {
this.ext = ext;
}
}
TbChatRecord.java
public class TbChatRecord {
private String id;
private String userid;
private String friendid;
private Integer hasRead;
private Date createtime;
private Integer hasDelete;
private String message;
}
NettyListener.java
/**
* 服务启动监听器
*/
@Component
public class NettyListener implements ApplicationListener<ContextRefreshedEvent> {
@Autowired
private WebSocketServer websocketServer;
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (event.getApplicationContext().getParent() == null) {
try {
websocketServer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
UserChannelMap.java
/**
* 建立用户ID与通道的关联
*/
public class UserChannelMap {
// 用户保存用户id与通道的Map对象
private static Map<String, Channel> userChannelMap;
static {
userChannelMap = new HashMap<String, Channel>();
}
/**
* 添加用户id与channel的关联
* @param userid
* @param channel
*/
public static void put(String userid, Channel channel) {
userChannelMap.put(userid, channel);
}
/**
* 根据用户id移除用户id与channel的关联
* @param userid
*/
public static void remove(String userid) {
userChannelMap.remove(userid);
}
/**
* 根据通道id移除用户与channel的关联
* @param channelId 通道的id
*/
public static void removeByChannelId(String channelId) {
if(!StringUtils.isNotBlank(channelId)) {
return;
}
for (String s : userChannelMap.keySet()) {
Channel channel = userChannelMap.get(s);
if(channelId.equals(channel.id().asLongText())) {
System.out.println("客户端连接断开,取消用户" + s + "与通道" + channelId + "的关联");
userChannelMap.remove(s);
break;
}
}
}
// 打印所有的用户与通道的关联数据
public static void print() {
for (String s : userChannelMap.keySet()) {
System.out.println("用户id:" + s + " 通道:" + userChannelMap.get(s).id());
}
}
/**
* 根据好友id获取对应的通道
* @param friendid 好友id
* @return Netty通道
*/
public static Channel get(String friendid) {
return userChannelMap.get(friendid);
}
}
WebsocketInitializer.java
/**
* 用于在某个Channel注册到EventLoop后,对这个Channel执行一些初始化操作
* ChannelInitializer虽然会在一开始会被注册到Channel相关的pipeline里,
* 但是在初始化完成之后,ChannelInitializer会将自己从pipeline中移除,不会影响后续的操作
*/
public class WebsocketInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// ------------------
// 用于支持Http协议
// ------------------
// websocket基于http协议,需要有http的编解码器
pipeline.addLast(new HttpServerCodec());
// 对写大数据流的支持
pipeline.addLast(new ChunkedWriteHandler());
// 添加对HTTP请求和响应的聚合器:只要使用Netty进行Http编程都需要使用
// 对HttpMessage进行聚合,聚合成FullHttpRequest或者FullHttpResponse
// 在netty编程中都会使用到Handler
pipeline.addLast(new HttpObjectAggregator(1024 * 64));
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
// 添加Netty空闲超时检查的支持
// 1. 读空闲超时(超过一定的时间会发送对应的事件消息)
// 2. 写空闲超时
// 3. 读写空闲超时
pipeline.addLast(new IdleStateHandler(4, 8, 12));
pipeline.addLast(new HearBeatHandler());
// 添加自定义的handler
pipeline.addLast(new ChatHandler());
}
}
WebSocketServer.java
/**
* netty的服务器
*/
@Component
public class WebSocketServer {
private EventLoopGroup bossGroup; // 主线程池
private EventLoopGroup workerGroup; // 工作线程池
private ServerBootstrap server; // 服务器
private ChannelFuture future; // 回调
public void start() {
future = server.bind(9001);
System.out.println("netty server - 启动成功");
}
public WebSocketServer() {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
server = new ServerBootstrap();
server.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new WebsocketInitializer());
}
}
SpringUtil.java
/**
* @Description: 提供手动获取被spring管理的bean对象
*/
@Component
public class SpringUtil implements ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
if (SpringUtil.applicationContext == null) {
SpringUtil.applicationContext = applicationContext;
}
}
// 获取applicationContext
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
// 通过name获取 Bean.
public static Object getBean(String name) {
return getApplicationContext().getBean(name);
}
// 通过class获取Bean.
public static <T> T getBean(Class<T> clazz) {
return getApplicationContext().getBean(clazz);
}
// 通过name,以及Clazz返回指定的Bean
public static <T> T getBean(String name, Class<T> clazz) {
return getApplicationContext().getBean(name, clazz);
}
}
最后
以上就是碧蓝导师为你收集整理的使用netty实现一个类似于微信的聊天功能的全部内容,希望文章能够帮你解决使用netty实现一个类似于微信的聊天功能所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复