我是靠谱客的博主 朴素画笔,最近开发中收集的这篇文章主要介绍netty进行超大数据包传输(文件传输)与其他多协议共存1、引用依赖2、一个传输多协议+大数据包的例子,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
如果想利用netty进行大文件或大数据包传输,最好的方法是不要对数据包进行编码解码,直接收发其二进制数据,且推荐分包收发,接收端将这些数据包自行并接即可,这样即使并发传输一个1G的文件也不会让服务端崩溃。
easynetty更新至1.0.6-SNAPSHOT:
1、增加大数据包编码、解码:BigPackageUtil,BigPackageDecoder
2、CustomFrameDecoder已经失效,请用AbstractMultipleDecode代替
3、AbstractMultipleDecode已改良,具有半包、粘包处理能力,也就是能处理客户端批量发过来的不同协议的数据包
1、引用依赖
<dependency>
<groupId>io.github.tiger822.netty</groupId>
<artifactId>easynetty</artifactId>
<version>1.0.6-SNAPSHOT</version>
</dependency>
2、一个传输多协议+大数据包的例子
服务端,里面引用了BigPackageChannelInboundHandler
public class Server {
public static void main(String[] args){
IGeneralServer server= new GeneralNettyServerFactory().getGeneralServer(9900);
try{
server.getServerBootstrap().option(ChannelOption.SO_BACKLOG, 128) // tcp最大缓存链接个数
.childOption(ChannelOption.SO_KEEPALIVE, true);
server.run(ch -> {
ChannelPipeline pipeline = ch.pipeline();
pipeline
//.addLast(new LoggingHandler(LogLevel.INFO))
//不需要了,自带半包粘包处理.addLast(new LengthFieldBasedFrameDecoder(1024*1024, 0, 4, 0, 4))
//.addLast(new LengthFieldPrepender(4))
.addLast("orderEncoder", new CustomFrameEncoder<>(OrderInfo.class, CodeConsts.OrderHeader, Utils::toJsonBytes))
.addLast("userEncoder", new CustomFrameEncoder<>(UserInfo.class, CodeConsts.UserHeader, Utils::toJsonBytes))
.addLast("responseEncoder",new CustomFrameEncoder<>(JSONData.class,CodeConsts.ResponseHeader,Utils::toJsonBytes))
//增加大数据包解码
.addLast(new BigPackageDecoder() {
@Override
public void onPackageOutput(ChannelHandlerContext ctx, BigPackageProperties properties, byte[] data, List<Object> out) {
out.add(new BigPackage(properties,data));
}
})
.addLast("multiDecoder",new JsonMultipleDecode().registerClass(CodeConsts.UserHeader,UserInfo.class)
.registerClass(CodeConsts.OrderHeader,OrderInfo.class).setReDeliverRawData(true))
.addLast(new SimpleChannelInboundHandler() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.channel().eventLoop().schedule(()->{
if (msg instanceof BigPackage) {
BigPackage bigPackage = (BigPackage) msg;
BigPackageProperties properties = bigPackage.getProperties();
if (bigPackage.getData() == null) {//新大包通知
System.out.println("Start to receive :" + properties.getId());
} else {//每个小包收到后请自行处理
System.out.println("Receipted "+properties.getId()+", "+bigPackage.getData().length);
if (properties.getRt()==properties.getTotal()) { //最后一个包收完
System.out.println("Receipte finished:" + properties.getId() + ",total:" + properties.getTotal());
if (properties.getId().equalsIgnoreCase("verify")){
ctx.channel().close();
}
}
}
return;
}
if (msg instanceof UserInfo) {
UserInfo userInfo=(UserInfo)msg;
userInfo.setUserName(userInfo.getUserName() + ",srv");
ctx.channel().writeAndFlush(msg);
}
else if (msg instanceof OrderInfo){
OrderInfo orderInfo=(OrderInfo) msg;
ctx.channel().writeAndFlush(orderInfo);
}
else{
//NettyUtil.reDeliver(pipeline,msg); //只要上面解码正确,不会落到这里
}
},0,TimeUnit.SECONDS);
}
});
});
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
server.close();
}
}
}
客户端
public class Client {
public static void main(String[] args) throws InterruptedException {
IGeneralClient client = new GeneralNettyClientFactory().getClient("localhost", 9900);
try {
client.run(false, ch -> {
ChannelPipeline pipeline = ch.pipeline();
pipeline
//.addLast(new LoggingHandler(LogLevel.INFO))
.addLast("multiDecoder",new JsonMultipleDecode().registerClass(CodeConsts.UserHeader,UserInfo.class)
.registerClass(CodeConsts.OrderHeader,OrderInfo.class)
.registerClass(CodeConsts.ResponseHeader, JSONData.class)
)
.addLast("userEncoder", new CustomFrameEncoder<>(UserInfo.class, CodeConsts.UserHeader, Utils::toJsonBytes))
.addLast("orderEncoder",new CustomFrameEncoder<>(OrderInfo.class,CodeConsts.OrderHeader,Utils::toJsonBytes))
//增加大数据包编码
.addLast(new BigPackageEncoder())
.addLast(new SimpleChannelInboundHandler() {
@Override
@SuppressWarnings("deprecation")
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
super.exceptionCaught(ctx,cause);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
System.out.println("Connected:"+ctx.channel().remoteAddress());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
System.out.println("DisConnected:"+ctx.channel().remoteAddress());
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("Server<<" +msg.getClass().getSimpleName()+","+ msg.toString());
}
});
});
BigPackageUtil util=new BigPackageUtil(client.getChannel());
client.getChannel().write(new UserInfo("A001", "陳大文", 20));
client.getChannel().write(new OrderInfo("A002", 11));
try(FileInputStream fis=new FileInputStream("d:/temp/a.txt")){
util.sendData("ID12345",new Date(),fis,fis.available(),1024,false);
} catch (IOException e) {
e.printStackTrace();
}
byte[] data={1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16};
AtomicInteger sentC=new AtomicInteger();
int dataLen=data.length;
int finalDataLen1 = dataLen;
util.sendData("small",null,()->{
int st=sentC.get();
int frameLen= finalDataLen1 >=st+5?5: finalDataLen1 -st;
byte[] bytes= Arrays.copyOfRange(data,st,st+frameLen);
sentC.set(st+frameLen);
return bytes;
},dataLen,false);
for (int i=0;i<10;i++) {
try (FileInputStream fis = new FileInputStream("d:/source/goland-2020.3.2.exe")) {
util.sendData("goland-2020.3.2.exe"+i, new Date(), fis, fis.available(), 1024*100);
} catch (IOException e) {
e.printStackTrace();
}
}
client.getChannel().writeAndFlush(new OrderInfo("A003", 11)).sync();
sentC.set(0);
dataLen=data.length;
int finalDataLen = dataLen;
util.sendData("verify",null,()->{
int st=sentC.get();
int frameLen= finalDataLen >=st+5?5: finalDataLen -st;
byte[] bytes= Arrays.copyOfRange(data,st,st+frameLen);
sentC.set(st+frameLen);
return bytes;
},dataLen);
client.getChannel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
client.close();
}
}
}
最后结果是服务端显示,将最后一个verify也完成接收
客户端显示如下,服务端也能正确将数据回传:
最后
以上就是朴素画笔为你收集整理的netty进行超大数据包传输(文件传输)与其他多协议共存1、引用依赖2、一个传输多协议+大数据包的例子的全部内容,希望文章能够帮你解决netty进行超大数据包传输(文件传输)与其他多协议共存1、引用依赖2、一个传输多协议+大数据包的例子所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复