我是靠谱客的博主 可爱小霸王,最近开发中收集的这篇文章主要介绍Netty——群聊系统与心跳机制与WebSocket开发Netty——群聊系统与心跳机制与WebSocket开发一、群聊系统二、Netty的心跳机制三、WebSocket开发,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
JAVA后端开发知识总结(持续更新…)
Netty——群聊系统与心跳机制与WebSocket开发
文章目录
- Netty——群聊系统与心跳机制与WebSocket开发
- 一、群聊系统
- 1.1 服务端
- 1.2 客户端
- 1.3 点对点聊天功能
- 二、Netty的心跳机制
- 2.1 Netty 的 IdleStateHandler
- 2.2 实现
- 三、WebSocket开发
一、群聊系统
要求:
1.1 服务端
- GroupChatServer
public class GroupChatServer {
private int port; //监听端口
public GroupChatServer(int port){
this.port = port;
}
// 编写 run 方法,处理客户端的请求
public void run() throws InterruptedException {
// 创建两个线程组
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 向 pipeline 加入解码器
pipeline.addLast("decoder", new StringDecoder());
// 加入编码器
pipeline.addLast("encoder", new StringEncoder());
// 加入自定义的 Handler
pipeline.addLast(new GroupChatServerHandler());
}
});
System.out.println("Netty服务器启动");
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
// 监听关闭事件
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
// 启动
public static void main(String[] args) throws InterruptedException {
GroupChatServer groupChatServer = new GroupChatServer(7005);
groupChatServer.run();
}
}
- GroupChatServerHandler
public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
// 定义一个 Channel 组,管理所有的 Channel
// GlobalEventExecutor.INSTANCE 是全局事件执行器,是一个单例
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// 此方法表示连接建立,一旦建立连接,就第一个被执行
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
// 该方法会自动将 channelGroup 中所有 channel 遍历,并发送消息
channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress()
+ sdf.format(new Date()) + "加入聊天n");
// 将当前的 Channel 加入到 ChannelGroup
channelGroup.add(channel);
}
// 表示 channel 处于活动状态,提示 xxx 上线
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + " "
+ sdf.format(new Date()) + "上线了~");
}
// 表示 channel 处于不活动状态,提示 xxx 离线
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + " "
+ sdf.format(new Date()) + "离线了~");
}
// 表示 channel 断开连接,将 xx 客户离开信息推送给当前在线客户
// 直接自动将当前 channel 从 Group 中去除
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() +" "
+ sdf.format(new Date()) + "离开了n");
System.out.println("当前channelGroup大小 :" + channelGroup.size());
}
// 读取数据,并进行消息转发
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
// 获取当前 channel
Channel channel = ctx.channel();
// 这时,遍历 channelGroup,根据不同的情况,回送不同的消息
channelGroup.forEach(item -> {
if (item != channel) {
item.writeAndFlush("[客户]" + channel.remoteAddress()
+ "发送了消息:" + msg + "n");
} else {
// 把自己发送的消息发送给自己
item.writeAndFlush("[自己]发送了消息:" + msg + "n");
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
1.2 客户端
- GroupChatClient
public class GroupChatClient {
private final String host;
private final int port;
public GroupChatClient(String host, int port) {
this.port = port;
this.host = host;
}
public void run() throws InterruptedException {
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap()
.group(eventExecutors)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 加入 Handler
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast(new GroupChatClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
// 得到 channel
Channel channel = channelFuture.channel();
System.out.println("--------" + channel.localAddress() + "---------");
// 客户端需要输入信息,创建一个扫描器
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()){
String msg = scanner.nextLine();
// 通过 channel 发送到服务器端
channel.writeAndFlush(msg + "rn");
}
} finally {
eventExecutors.shutdownGracefully();
}
}
// 启动
public static void main(String[] args) throws InterruptedException {
new GroupChatClient("127.0.0.1", 7005).run();
}
}
- GroupChatClientHandler
public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(msg.trim());
}
}
1.3 点对点聊天功能
- 改用Map来管理 Channel
二、Netty的心跳机制
要求:
- 当服务器超过3秒没有读时,就提示读空闲。
- 当服务器超过5秒没有写操作时,就提示写空闲。
- 实现当服务器超过7秒没有读或者写操作时,就提示读写空闲。
2.1 Netty 的 IdleStateHandler
- IdleStateHandler 是 Netty 提供的检测空闲状态的处理器。
- long readerIdleTime:表示多长时间没有读,就会发送一个心跳检测包检测是否还是连接的状态。
- long writerIdleTime:表示多长时间没有写,就会发送一个心跳检测包检测是否还是连接的状态。
- long allIdleTime:表示多长时间没有读写,就会发送一个心跳检测包检测是否还是连接的状态。
- 当 IdleStateEvent 触发后,就会传递给Pipeline的下一个 Handler,通过触发下一个Handler的 userEventTriggered,在该方法处理这个(读空闲、写空闲、读写空闲)事件。
2.2 实现
- MyServer
public class MyServer {
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
// 为 BossGroup 中的请求添加日志处理 Handler
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 加入一个 netty 提供的 IdleStateHandler
pipeline.addLast(new IdleStateHandler(3, 5, 7, TimeUnit.SECONDS));
// 加入一个对空闲检测进一步处理的 Handler
pipeline.addLast(new MyServerHandler());
}
});
// 启动服务器,设置为同步模式
ChannelFuture channelFuture = serverBootstrap.bind(7005).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
- MyServerHandler
public class MyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
// 将 evt 向下转型 IdleStateEvent
IdleStateEvent event = (IdleStateEvent)evt;
String eventTye = null;
switch (event.state()) {
case READER_IDLE:
eventTye = "读空闲";
break;
case WRITER_IDLE:
eventTye = "写空闲";
break;
case ALL_IDLE:
eventTye = "读写空闲";
break;
}
System.out.println(ctx.channel().remoteAddress() +"---超时时间--" + eventTye);
System.out.println("服务器做相应处理。。");
}
}
}
三、WebSocket开发
要求:
-
TextWebSocketFrame:表示一个文本帧,消息以该形式传递。
-
HttpServerCodec:基于 Http 协议,所以要使用 Http 的编/解码器。
-
ChunkedWriteHandler:以块方式写,需要添加 ChunkedWriter 处理器。
-
HttpObjectAggregator:HTTP 数据在传输过程中是分段的,HttpObjectAggregator 可以将多个段聚合。
-
WebSocketServerProtocolHandler:核心功能是将 HTTP 协议升级为 ws 协议,保持长连接。
-
Server
public class MyServer {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 因为是基于 Http 协议,所以要使用 Http 的编/解码器
pipeline.addLast(new HttpServerCodec());
// 以块方式写,添加 ChunkedWriter 处理器
pipeline.addLast(new ChunkedWriteHandler());
// http 数据在传输过程中是分段的, HttpObjectAggregator 可以将多个段聚合
// 这就是为什么当浏览器发送大量数据时,会发出多次 http 请求
pipeline.addLast(new HttpObjectAggregator(8192));
/**
* 1、对于 websocket,它的数据是以帧的形式传递的
* 2、WebsocketFrame 下面有六个子类
* 3、浏览器请求时:ws://localhost:7005/hello 表示请求的 uri
* 4、WebSocketServerProtocolHandler 核心功能是将 http 协议升级为 ws 协议,保持长连接
* 5、从 Http 协议升级到 Websocket 协议,是通过 StatusCode 101(Switching Protocols)来切换的
*/
pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));
// 自定义 Handler,处理业务逻辑
pipeline.addLast(new MyTextWebSocketFrameHandler());
}
});
ChannelFuture sync = serverBootstrap.bind(7005).sync();
sync.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
- Handler
// TextWebSocketFrame 表示一个文本帧
public class MyTextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
System.out.println("服务器收到消息" + msg.text());
// 回复消息
ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器时间" + LocalDateTime.now() + " " + msg.text()));
}
// 当 web 客户端连接后,触发方法
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().id().asLongText());
System.out.println(ctx.channel().id().asShortText());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println(cause.getMessage());
ctx.close();
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().id().asLongText());
}
}
《websocket框架netty和springboot的集成使用》
最后
以上就是可爱小霸王为你收集整理的Netty——群聊系统与心跳机制与WebSocket开发Netty——群聊系统与心跳机制与WebSocket开发一、群聊系统二、Netty的心跳机制三、WebSocket开发的全部内容,希望文章能够帮你解决Netty——群聊系统与心跳机制与WebSocket开发Netty——群聊系统与心跳机制与WebSocket开发一、群聊系统二、Netty的心跳机制三、WebSocket开发所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复