我是靠谱客的博主 开放蜜粉,最近开发中收集的这篇文章主要介绍Netty 作为TCP server使用Netty框架作为TCP server,做上位机,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

目录

使用Netty框架作为TCP server,做上位机

        1. 引用Netty框架POM.xml ,添加引入

2. 创建NettyServer

3. 创建 NettyChannelService

4. 创建 MyServerHandler 

5. 创建MyChannelInitializer

6. 如何外部发给消息到TCP Client那?

7. 发消息测试:


使用Netty框架作为TCP server,做上位机

1. 引用Netty框架POM.xml ,添加引入

		<dependency>
			<groupId>io.netty</groupId>
			<artifactId>netty-transport</artifactId>
			<version>4.1.55.Final</version>
		</dependency>

2. 创建NettyServer


import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import java.util.HashMap;
import java.util.Map;

/**
 * @author huochengyan
 * @version 1.0
 * @date 2022/5/24 17:09
 */
public class NettyServer {

    static  NettyServer  server;
    static Map sensorIdChannel=new HashMap();
    public static void main(String[] args) {

        //自定义发送任务 给下位机发送的。
        Thread customTaskThread=new Thread(new CustomTaskInfo());
        customTaskThread.start();

         server=new NettyServer();
         //server.bing(8088);
        server.bing(JdbcUtil.serverPort);

    }

    private void bing(int port) {
        //配置服务端NIO线程组
        EventLoopGroup parentGroup = new NioEventLoopGroup(); //NioEventLoopGroup extends MultithreadEventLoopGroup Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
        EventLoopGroup childGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(parentGroup, childGroup)
                    .channel(NioServerSocketChannel.class)    //非阻塞模式
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childHandler(new MyChannelInitializer());
            ChannelFuture f = b.bind(port).sync();
            System.out.println("run........");
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            childGroup.shutdownGracefully();
            parentGroup.shutdownGracefully();
        }

    }

}

3. 创建 NettyChannelService


import io.netty.channel.ChannelHandlerContext;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author huochengyan
 * @version 1.0   保存Netty 链接状态
 * @date 2022/5/24 17:36
 */
public class NettyChannelService {
    private static ConcurrentHashMap<String, ChannelHandlerContext> map = new ConcurrentHashMap<>();

    public static Map<String, ChannelHandlerContext> getChannels() {
        return map;
    }

    public static void saveChannel(String key, ChannelHandlerContext ctx) {
        if (map == null) {
            map = new ConcurrentHashMap<>();
        }
        map.put(key, ctx);
    }

    public static ChannelHandlerContext getChannel(String key) {
        if (map == null || map.isEmpty()) {
            return null;
        }
        return map.get(key);
    }

    public static void removeChannel(String key) {
        map.remove(key);
    }
}

4. 创建 MyServerHandler 

import com.longder.sensor.NettySensorClientMsg;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.CharsetUtil;

import java.awt.*;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.logging.Logger;

/**
 * @author huochengyan
 * @version 1.0
 * @date 2022/5/24 17:08
 */

public class MyServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        SocketChannel channel = (SocketChannel) ctx.channel();
        System.out.println("链接报告开始");
        System.out.println("链接报告信息:有一客户端链接到本服务端");
        System.out.println("链接报告IP:" + channel.localAddress().getHostString());
        System.out.println("链接报告Port:" + channel.localAddress().getPort());
        System.out.println("链接报告完毕");
        //通知客户端链接建立成功
//        String str = "通知客户端链接建立成功" + " " + new Date() + " " + channel.localAddress().getHostString() + "rn";
//        ByteBuf buf = Unpooled.buffer(str.getBytes().length);
//        buf.writeBytes(str.getBytes("GBK"));
//        ctx.writeAndFlush(buf);


        String uuid = ctx.channel().id().asLongText();
        NettyChannelService.saveChannel(uuid, ctx);
        System.out.println("连接请求进入: " + uuid + " 地址: " + ctx.channel().remoteAddress());
    }

    /**
     * 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客户端断开链接" + ctx.channel().localAddress().toString());
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息:" + msg);
        //通知客户端链消息发送成功

        ByteBuf in = (ByteBuf)msg;
        int readableBytes = in.readableBytes();
        byte[] bytes =new byte[readableBytes];
        in.readBytes(bytes);
        System.out.println("[client->server]:"+new String(bytes));
        System.out.println("[server ip]:"+ctx.channel().remoteAddress());
        System.out.print(in.toString(CharsetUtil.UTF_8));

        //业务
        String clientIp=ctx.channel().remoteAddress().toString().replace("/","").split(":")[0];

        String clientMsg=new String(bytes);
        String deviceId="";
        if(clientMsg.substring(0,1).equals("$")) {
            String[] arr = clientMsg.split(",");
            deviceId = arr[4];
        }
        NettyServer.sensorIdChannel.put(deviceId,ctx.channel().id());
        NettySensorClientMsg.uploadSensorMsg(clientIp,clientMsg);

        String resultClientMsg=NettySensorClientMsg.getSensorResult(clientMsg);

//        String resultClientMsg = "服务端收到:" + new Date() + " " + msg + "rn";
        ByteBuf buf = Unpooled.buffer(resultClientMsg.getBytes().length);
        buf.writeBytes(resultClientMsg.getBytes("UTF-8"));
        ctx.writeAndFlush(buf);

    }

    /**
     * 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
        System.out.println("异常信息:rn" + cause.getMessage());
    }


}


5. 创建MyChannelInitializer

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;

import java.nio.charset.Charset;

/**
 * @author huochengyan
 * @version 1.0
 * @date 2022/5/24 17:06
 */

public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel channel) {
        // 基于换行符号
        //channel.pipeline().addLast(new LineBasedFrameDecoder(1024));
        // 解码转String,注意调整自己的编码格式GBK、UTF-8
        //channel.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
        // 在管道中添加我们自己的接收数据实现方法
        channel.pipeline().addLast(new MyServerHandler());
    }

}

6. 如何外部发给消息到TCP Client那?

看下面的方法:

/**
     * 发送消息给指定的tcp Client
     *
     * @throws InterruptedException
     */
    public static void sendToClient(String clientIp, String deviceId, String exeStr) throws InterruptedException {
        for (String key : NettyChannelService.getChannels().keySet()) {
            ChannelHandlerContext ctx = NettyChannelService.getChannel(key);
            if (ctx == null) {
                continue;
            }
            if (ctx.channel().isActive()) {

                if (!clientIp.equals(ctx.channel().remoteAddress().toString().replace("/", "").split(":")[0])) {
                    continue;
                }
                String reqStr = exeStr; // "OKr";
                byte[] reqStrBytes = reqStr.getBytes(); //getHexBytes(reqStr);
                ByteBuf reqStrByteBuf = ctx.alloc().buffer(reqStrBytes.length);
                reqStrByteBuf.writeBytes(reqStrBytes);
                ctx.writeAndFlush(reqStrByteBuf);
                // System.out.println("[server --> client: " + ctx.channel().remoteAddress() + "] channel id: " + key);
                System.out.println("[server --> client" + ctx.channel().remoteAddress() + "]: " + reqStr);
                // 这里暂停一下是防止channelRead收到的数据粘包
                Thread.sleep(1000);
            } else {
                NettyChannelService.removeChannel(key);
            }
        }
    }

7. 发消息测试:

最后

以上就是开放蜜粉为你收集整理的Netty 作为TCP server使用Netty框架作为TCP server,做上位机的全部内容,希望文章能够帮你解决Netty 作为TCP server使用Netty框架作为TCP server,做上位机所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(66)

评论列表共有 0 条评论

立即
投稿
返回
顶部