概述
Netty本人也多少有些疑问,说的不对的地方可以指出。也希望这边文章能够提供思路解决实际的问题。
最近公司有一个需求,就是使用Python写的图片识别程序,需要把识别的程序识别出的图片发送到页面展示。由于展示的应用采用Java开发,这样就涉及到了跨语言。通过沟通最后采用TCP方式传输。这样只需要按照指定的格式,任意语言都可以往Java开发的应用传输图片。
发送的数据格式:START[|patrol#id#type1_type2_…|img_length|IMG_START:*********]END。img_length代表图片长度,****代表图片数据,img_length之前的包含了别的数据,根据需求可以自定义。
程序采用springboot搭建,随服务启动。部分关键代码如下:
服务端代码:
package cn.ubi.video.netty;
/**
* @author panbing
* @date 2022/10/19 11:31
*/
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelOption;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Slf4j
@Component
public class VideoSocketServer {
@Resource
private SocketInitializer socketInitializer;
@Getter
private ServerBootstrap serverBootstrap;
/**
* netty服务监听端口
*/
@Value("${netty.port:8088}")
private int port;
/**
* 主线程组数量
*/
@Value("${netty.bossThread:1}")
private int bossThread;
/**
* 启动netty服务器
*/
public void start() {
this.init();
this.serverBootstrap.bind(this.port);
log.info("Netty started on port: {} (TCP) with boss thread {}", this.port, this.bossThread);
}
/**
* 初始化netty配置
*/
private void init() {
// 创建两个线程组,bossGroup为接收请求的线程组,一般1-2个就行
NioEventLoopGroup bossGroup = new NioEventLoopGroup(bossThread);
// 实际工作的线程组
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
this.serverBootstrap = new ServerBootstrap();
this.serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
// 初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。
// 多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
.option(ChannelOption.SO_BACKLOG, 1024)
// 设置缓冲区大小
.option(ChannelOption.RCVBUF_ALLOCATOR,new FixedRecvByteBufAllocator(1024))
// 设置保持连接
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(this.socketInitializer);
}
}
此处根据数据格式,我们知道END结尾,这样可以正常解析出完整数据,避免沾包和分包等现象。同时长度指定能够容纳发送的数据
@Component
public class SocketInitializer extends ChannelInitializer<SocketChannel> {
@Autowired
NettyServerHandlerV2 v2;
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 添加上处理器,根据数据格式END结尾,防止粘包分包,长度指定能够容纳自己的数据
pipeline.addLast(new DelimiterBasedFrameDecoder(102400000, Unpooled.copiedBuffer("END"
.getBytes())));
// 此处指定byte解码器
pipeline.addLast(new ByteArrayDecoder());
pipeline.addLast(new ByteArrayEncoder());
pipeline.addLast(v2);
}
}
服务端接收到数据,把字节数据转化为字符数据,然后根据字符的格式指定截取图片部分
@Slf4j
@Component
@ChannelHandler.Sharable
public class NettyServerHandlerV2 extends ChannelInboundHandlerAdapter {
@Autowired
UbiHandler ubiHandler;
public static final ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
public static ConcurrentHashMap<String,Integer> count = new ConcurrentHashMap<String,Integer>();
/**
* 读取到客户端发来的消息
*
* @param ctx ChannelHandlerContext
* @param msg msg
* @throws Exception e
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//START[|patrol#id#type1_type2_...|img_length|IMG_START:*********]END
byte[] bytes = (byte[]) msg;
String result = new String(bytes,StandardCharsets.ISO_8859_1);
String[] split = result.split("\|");
// 该部分是数据部分
String value = split[1];
// 处理图片
int indexOf = result.indexOf(IMG_START);
StringBuffer sb = new StringBuffer(result);
sb.delete(0,indexOf+IMG_START.length());
try {
String suffix = DateFormatUtils.format(new Date(),"yyyyMMdd");
String fileName = UUID.randomUUID()+".jpg";
String savePath = path + "/" + suffix + "/" +fileName;
File file = new File(path+"/"+suffix);
if(!file.exists()){
file.mkdirs();
}
try {
IOUtils.write(sb.toString().replace(IMG_END,"").getBytes(StandardCharsets.ISO_8859_1.displayName()),new FileOutputStream(savePath));
} catch (Exception e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
log.info("新的客户端链接:" + ctx.channel().id().asShortText());
clients.add(ctx.channel());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
log.info("客户端连接断开:" + ctx.channel().id().asShortText());
clients.remove(ctx.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.info("连接异常:",cause);
cause.printStackTrace();
ctx.channel().close();
clients.remove(ctx.channel());
}
客户端代码:
public class ClientTest {
public static void main(String[] args) throws Exception {
// for(int i=0;i<5;i++){
String host = "127.0.0.1";
int port = 8088;
// String host = "192.168.0.4";
// int port = 6000;
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ByteArrayDecoder());
pipeline.addLast(new ByteArrayEncoder());
pipeline.addLast(new NettyClientHandler());
}
});
// 启动客户端.
ChannelFuture f = b.connect(host, port).sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
// }
}
}
注意此处客户端发送数据的是字节数据。
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 通道就绪触发
*
* @param ctx 上下文对象,含有管道pipeline,通道channel,地址
*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("客户端连接成功....");
//START[|patrol#id#type1_type2_...|img_length|IMG_START:*********]END
try{
for(int i=1;i<=3;i++){
File file = new File("E:/m"+i+".jpg");
BufferedReader in = new BufferedReader(new InputStreamReader(new FileInputStream(file),StandardCharsets.ISO_8859_1));
byte[] img = IOUtils.toByteArray(in,StandardCharsets.ISO_8859_1);
// 发送非图片部分
String value ="START[|patrol#01#0_1|"+img.length+"|IMG_START:";
ByteBuf buf = Unpooled.copiedBuffer(value.getBytes(CharsetUtil.ISO_8859_1));
ctx.writeAndFlush(buf);
BufferedInputStream bufferedInputStream = new BufferedInputStream(new FileInputStream(file));
byte[] buffer = new byte[1024];
int bytesRead = 0;
//从文件中按字节读取内容,到文件尾部时read方法将返回-1 发送图片数据
while ((bytesRead = bufferedInputStream.read(buffer)) != -1) {
ctx.writeAndFlush(Unpooled.copiedBuffer(buffer));
}
// 最后添加]END结束一次数据发送
ctx.writeAndFlush( Unpooled.copiedBuffer("]END".getBytes(CharsetUtil.ISO_8859_1)));
}
}catch (Exception e){
e.printStackTrace();
}
}
/**
* @param ctx 上下文对象,含有管道pipeline,通道channel,地址
* @param msg 服务端发送的数据
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// ByteBuf buf = (ByteBuf) msg;
System.out.println("服务端回复的消息:" + msg);
// System.out.println("服务器的地址:" + ctx.channel().remoteAddress());
}
/**
* 处理异常,关闭通道
*
* @param ctx 上下文对象,含有管道pipeline,通道channel,地址
* @param cause 异常
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
最后
以上就是朴素路灯为你收集整理的Netty接收发送的图片数据的全部内容,希望文章能够帮你解决Netty接收发送的图片数据所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复