我是靠谱客的博主 朴素画笔,最近开发中收集的这篇文章主要介绍netty进行超大数据包传输(文件传输)与其他多协议共存1、引用依赖2、一个传输多协议+大数据包的例子,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

如果想利用netty进行大文件或大数据包传输,最好的方法是不要对数据包进行编码解码,直接收发其二进制数据,且推荐分包收发,接收端将这些数据包自行并接即可,这样即使并发传输一个1G的文件也不会让服务端崩溃。

easynetty更新至1.0.6-SNAPSHOT:
1、增加大数据包编码、解码:BigPackageUtil,BigPackageDecoder
2、CustomFrameDecoder已经失效,请用AbstractMultipleDecode代替
3、AbstractMultipleDecode已改良,具有半包、粘包处理能力,也就是能处理客户端批量发过来的不同协议的数据包

1、引用依赖

		<dependency>
            <groupId>io.github.tiger822.netty</groupId>
            <artifactId>easynetty</artifactId>
            <version>1.0.6-SNAPSHOT</version>
        </dependency>

2、一个传输多协议+大数据包的例子

服务端,里面引用了BigPackageChannelInboundHandler

public class Server {
  public static void main(String[] args){
    IGeneralServer server= new GeneralNettyServerFactory().getGeneralServer(9900);
    try{
      server.getServerBootstrap().option(ChannelOption.SO_BACKLOG, 128) // tcp最大缓存链接个数
      .childOption(ChannelOption.SO_KEEPALIVE, true);
      server.run(ch -> {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline
               //.addLast(new LoggingHandler(LogLevel.INFO))
                //不需要了,自带半包粘包处理.addLast(new LengthFieldBasedFrameDecoder(1024*1024, 0, 4, 0, 4))
                //.addLast(new LengthFieldPrepender(4))
                .addLast("orderEncoder", new CustomFrameEncoder<>(OrderInfo.class, CodeConsts.OrderHeader, Utils::toJsonBytes))
                .addLast("userEncoder", new CustomFrameEncoder<>(UserInfo.class, CodeConsts.UserHeader, Utils::toJsonBytes))
                .addLast("responseEncoder",new CustomFrameEncoder<>(JSONData.class,CodeConsts.ResponseHeader,Utils::toJsonBytes))
                //增加大数据包解码
                .addLast(new BigPackageDecoder() {
                  @Override
                  public void onPackageOutput(ChannelHandlerContext ctx, BigPackageProperties properties, byte[] data, List<Object> out) {
                    out.add(new BigPackage(properties,data));
                  }
                })
                .addLast("multiDecoder",new JsonMultipleDecode().registerClass(CodeConsts.UserHeader,UserInfo.class)
                        .registerClass(CodeConsts.OrderHeader,OrderInfo.class).setReDeliverRawData(true))
                .addLast(new SimpleChannelInboundHandler() {
                  @Override
                  protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
                    ctx.channel().eventLoop().schedule(()->{
                      if (msg instanceof BigPackage) {
                        BigPackage bigPackage = (BigPackage) msg;
                        BigPackageProperties properties = bigPackage.getProperties();
                        if (bigPackage.getData() == null) {//新大包通知
                          System.out.println("Start to receive :" + properties.getId());
                        } else {//每个小包收到后请自行处理
                          System.out.println("Receipted "+properties.getId()+", "+bigPackage.getData().length);
                          if (properties.getRt()==properties.getTotal()) { //最后一个包收完
                            System.out.println("Receipte finished:" + properties.getId() + ",total:" + properties.getTotal());
                            if (properties.getId().equalsIgnoreCase("verify")){
                               ctx.channel().close();
                            }
                          }
                        }
                        return;
                      }
                      if (msg instanceof UserInfo) {
                        UserInfo userInfo=(UserInfo)msg;
                        userInfo.setUserName(userInfo.getUserName() + ",srv");
                        ctx.channel().writeAndFlush(msg);
                      }
                      else if (msg instanceof OrderInfo){
                        OrderInfo orderInfo=(OrderInfo) msg;
                        ctx.channel().writeAndFlush(orderInfo);
                      }
                      else{
                        //NettyUtil.reDeliver(pipeline,msg); //只要上面解码正确,不会落到这里
                      }
                    },0,TimeUnit.SECONDS);
                  }
                });
      });
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      server.close();
    }
  }
}

客户端

public class Client {

  public static void main(String[] args) throws InterruptedException {
    IGeneralClient client = new GeneralNettyClientFactory().getClient("localhost", 9900);
    try {
      client.run(false, ch -> {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline
                //.addLast(new LoggingHandler(LogLevel.INFO))
                .addLast("multiDecoder",new JsonMultipleDecode().registerClass(CodeConsts.UserHeader,UserInfo.class)
                .registerClass(CodeConsts.OrderHeader,OrderInfo.class)
                .registerClass(CodeConsts.ResponseHeader, JSONData.class)
                )
                .addLast("userEncoder", new CustomFrameEncoder<>(UserInfo.class, CodeConsts.UserHeader, Utils::toJsonBytes))
                .addLast("orderEncoder",new CustomFrameEncoder<>(OrderInfo.class,CodeConsts.OrderHeader,Utils::toJsonBytes))
                //增加大数据包编码
                .addLast(new BigPackageEncoder())
                .addLast(new SimpleChannelInboundHandler() {
                    @Override
                    @SuppressWarnings("deprecation")
                    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                            throws Exception {
                      super.exceptionCaught(ctx,cause);
                    }
                    @Override
                    public void channelActive(ChannelHandlerContext ctx) throws Exception {
                      super.channelActive(ctx);
                      System.out.println("Connected:"+ctx.channel().remoteAddress());
                    }
                    @Override
                    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                      super.channelInactive(ctx);
                      System.out.println("DisConnected:"+ctx.channel().remoteAddress());
                    }
                  @Override
                  protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
                    System.out.println("Server<<" +msg.getClass().getSimpleName()+","+ msg.toString());
                  }
                });
      });
      BigPackageUtil util=new BigPackageUtil(client.getChannel());
      client.getChannel().write(new UserInfo("A001", "陳大文", 20));
      client.getChannel().write(new OrderInfo("A002", 11));

      try(FileInputStream fis=new FileInputStream("d:/temp/a.txt")){
        util.sendData("ID12345",new Date(),fis,fis.available(),1024,false);
      } catch (IOException e) {
        e.printStackTrace();
      }
      byte[] data={1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16};
      AtomicInteger sentC=new AtomicInteger();
      int dataLen=data.length;
      int finalDataLen1 = dataLen;
      util.sendData("small",null,()->{
         int st=sentC.get();
         int frameLen= finalDataLen1 >=st+5?5: finalDataLen1 -st;
         byte[] bytes= Arrays.copyOfRange(data,st,st+frameLen);
         sentC.set(st+frameLen);
         return bytes;
      },dataLen,false);
      for (int i=0;i<10;i++) {
        try (FileInputStream fis = new FileInputStream("d:/source/goland-2020.3.2.exe")) {
          util.sendData("goland-2020.3.2.exe"+i, new Date(), fis, fis.available(), 1024*100);
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
      client.getChannel().writeAndFlush(new OrderInfo("A003", 11)).sync();
      sentC.set(0);
      dataLen=data.length;
      int finalDataLen = dataLen;
      util.sendData("verify",null,()->{
        int st=sentC.get();
        int frameLen= finalDataLen >=st+5?5: finalDataLen -st;
        byte[] bytes= Arrays.copyOfRange(data,st,st+frameLen);
        sentC.set(st+frameLen);
        return bytes;
      },dataLen);

      client.getChannel().closeFuture().sync();
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      client.close();
    }
  }
}

最后结果是服务端显示,将最后一个verify也完成接收
在这里插入图片描述
客户端显示如下,服务端也能正确将数据回传:
在这里插入图片描述

最后

以上就是朴素画笔为你收集整理的netty进行超大数据包传输(文件传输)与其他多协议共存1、引用依赖2、一个传输多协议+大数据包的例子的全部内容,希望文章能够帮你解决netty进行超大数据包传输(文件传输)与其他多协议共存1、引用依赖2、一个传输多协议+大数据包的例子所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(45)

评论列表共有 0 条评论

立即
投稿
返回
顶部