概述
简单 netty 客户端
连接
public final static EventLoopGroup group = new NioEventLoopGroup();
private void connect(GatewayModel node) throws IOException, InterruptedException {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//添加编码,解码器
p.addLast(new MessageEncoder());
p.addLast(new MessageDecoder());
p.addLast(new IdleStateHandler(5, 10, 10, TimeUnit.SECONDS));
p.addLast(new ClientProcessHandler(smsService));
}
});
ChannelFuture f = b.connect(node.getIp(), node.getPort()).sync();
Channel channel = f.channel();
ServerChannelManager.add(node.getId(), channel);
}
客户端 handler
public class ClientProcessHandler extends SimpleChannelInboundHandler<Packet> {
private static final Logger logger = LoggerFactory.getLogger(ClientProcessHandler.class);
private SmsService smsService;
public ClientProcessHandler(SmsService smsService) {
super();
this.smsService = smsService;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();
logger.info("connect server:" + address.getHostName() + ":" + address.getPort());
}
// @Override
// public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// if (evt instanceof IdleStateEvent) {
// Packet packet = PacketFactory.getInstance().createHeartBeatReqPacket();
// ctx.writeAndFlush(packet);
// InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();
// logger.debug("ping server: " + address.getHostName() + ":" + address.getPort());
// }
// }
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();
String host = address.getHostName();
int port = address.getPort();
String nodeId = address.getHostName() + ":" + address.getPort();
logger.error("disconnect from server:" + nodeId + " ,trying to connect server in 3 seconds...");
ctx.channel().eventLoop().schedule(() -> doReconnect(host, port), 3, TimeUnit.SECONDS);
ctx.close();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Packet packet) throws Exception {
InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();
PacketHeader header = packet.getPacketHeader();
ByteBuf body = packet.getBody();
int commandId = header.getCommandId();
switch (commandId) {
case TransferCommand.HEART_BEAT_REQ:
//心跳响应
logger.debug("pong from server: " + address.getHostName() + ":" + address.getPort());
break;
case TransferCommand.MSG_ACCEPT_REQ:
//短信上行请求
int linkIdLen = body.readShort();
String linkId = body.readBytes(linkIdLen).toString(Charset.forName(Constant.CHARSET_UTF8));
//发送方手机号类型(现在都是联通的,都是3)
int operationType = body.readByte();
int senderLength = body.readShort();
String sender = body.readBytes(senderLength).toString(Charset.forName(Constant.CHARSET_UTF8));
int receiverLen = body.readShort();
String receiver = body.readBytes(receiverLen).toString(Charset.forName(Constant.CHARSET_UTF8));
//短信内容编码方式
byte encoding = body.readByte();
int contentLen = body.readShort();
String content = body.readBytes(contentLen).toString(Charset.forName(Constant.CHARSET_UTF8));
MessageUp up = new MessageUp();
up.setSender(sender);
up.setReceiver(receiver);
up.setMsgContent(content);
up.setSendTime(new Date());
up.setLinkId(linkId);
logger.info("receive msg from client :【{}】 ", sender);
up.setChannel(ctx.channel());
smsService.receiveMsg(up);
break;
default:
logger.error("error commandId....【{}】,the commandId must be : 【{}】", commandId, TransferCommand.HEART_BEAT_REQ + "," + TransferCommand.MSG_SEND_RES + "," + TransferCommand.MSG_STATUS_REQ);
break;
}
}
private void doReconnect(String host, int port) {
String nodeId = host + ":" + port;
EventLoopGroup group = GatewayContext.group;
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//添加编码,解码器
p.addLast(new MessageEncoder());
p.addLast(new MessageDecoder());
p.addLast(new IdleStateHandler(5, 5, 5, TimeUnit.SECONDS));
p.addLast(new ClientProcessHandler(smsService));
}
});
// Start the client.
ChannelFuture f = b.connect(host, port).sync();
if (f.isSuccess()) {
Channel channel = f.channel();
ServerChannelManager.add(nodeId, channel);
logger.info("reconnect server : 【{}】 success ", nodeId);
}
} catch (Exception e) {
ServerChannelManager.remove(nodeId);
logger.error("reconnect server:【{}】 failed,start reconnect in 3 seconds...", nodeId);
group.schedule(() -> doReconnect(host, port), 3, TimeUnit.SECONDS);
}
}
}
传输包
public class Packet {
public static final int HEADER_SIZE = 12;
/*
* 包头
*/
private PacketHeader packetHeader;
/*
* 包体
*/
private ByteBuf body;
public Packet(int commandId, int totalSize) {
this.packetHeader = new PacketHeader();
this.packetHeader.setCommandId(commandId);
this.packetHeader.setTotalSize(totalSize);
}
public ByteBuf getBody() {
return body;
}
public Packet setBody(ByteBuf body) {
this.body = body;
return this;
}
public PacketHeader getPacketHeader() {
return this.packetHeader;
}
public Packet setTotalSize(int totalSize) {
this.packetHeader.setTotalSize(totalSize);
return this;
}
}
编码解码
public class MessageDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if(in == null)
return;
if (in.readableBytes() < Packet.HEADER_SIZE) {
in.clear();
return;
}
in.markReaderIndex();
ByteBuf bb = in.readBytes(in.readableBytes());
int totalSize = bb.readInt();
int commandId = bb.readInt();
int sequenceId = bb.readInt();
Packet packet = new Packet(commandId, totalSize);
packet.setBody(bb);
out.add(packet);
}
}
public class MessageEncoder extends MessageToByteEncoder<Packet> {
@Override
protected void encode(ChannelHandlerContext ctx, Packet packet, ByteBuf out) throws Exception {
PacketHeader header = packet.getPacketHeader();
//写入消息的总长度
out.writeInt(header.getTotalSize());
//写入命令标识符
out.writeInt(header.getCommandId());
//写入序列号
out.writeInt(header.getSequenceId());
//开始写入包体
if (packet.getBody() != null)
out.writeBytes(packet.getBody());
}
}
public static void main(String[] args) {
String s = "buff头部";
String s1 ="buff实体";
ByteBuf in = Unpooled.copyShort(s.getBytes().length);
in.writeBytes(s.getBytes());
in.writeShort(s1.getBytes().length);
in.writeBytes(s1.getBytes());
System.out.println(in.readBytes(in.readShort()).toString(Charset.forName("UTF-8")));
System.out.println(in.readBytes(in.readShort()).toString(Charset.forName("UTF-8")));
}
最后
以上就是有魅力水杯为你收集整理的netty 通讯的全部内容,希望文章能够帮你解决netty 通讯所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复