我是靠谱客的博主 野性微笑,最近开发中收集的这篇文章主要介绍初识Netty六(自定义协议(私有协议)开发),觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

文章目录

    • 实现功能
    • 通信模型
    • Netty协议的编解码规范
      • Netty协议解码
    • 代码实现
      • 依赖
      • 消息结构定义
        • 消息头定义 Header
        • 消息定义 NettyMessage
        • 消息类型定义 MessageType
        • 返回结果定义
      • 端口常量定义 NettyConstant
      • 消息编解码
        • ChannelBufferByteInput
        • ChannelBufferByteOutput
        • MarshallingCodecFactory
        • MarshallingDecoder
        • MarshallingEncoder
        • NettyMessageDecoder
        • NettyMessageEncoder
      • 客户端实现
        • ClientHandler
        • 客户端心跳检测 HeartBeatReqHandler
        • 客户端握手认证 LoginAuthReqHandler
        • 客户端 NettyClient
      • 服务端实现
        • 服务端心跳检测 HeartBeatRespHandler
        • 服务端握手认证 LoginAuthRespHandler
        • ServerHandler
        • NettyServer
    • 测试
    • 源码下载
    • 参考

实现功能

  1. 基于Netty的NIO通信框架,提供高性能的异步通信能力;
  2. 提供消息的编码解码框架,可以实现POJO的序列化和反序列化;
  3. .提供基于IP地址的白名单接入认证机制;
  4. 链路的有效性校验机制;
  5. 链路的断连重连机制;

通信模型

Netty协议通信双方链路建立成功之后,双方可以进行全双工通信,无论客户端还是服务端,都可以主动发送请求消息给对方,通信方式可以是TWO WAY或者ONE WAY。双方之间都心跳采用Ping-Pong机制,当链路处于空闲状态时,客户端主动发送Ping消息给服务端,服务端接收到Ping消息后发送应答消息Pong给客户端,如果客户端连续发送N条Ping消息都没有接收到服务端端Ping消息,说明链路已经挂死或者对方处于异常状态,客户端主动关闭连接,间隔周期T后发起重连操作,知道重连成功

在这里插入图片描述
具体步骤:

  1. Netty协议栈客户端发送握手请求消息,携带节点ID等有效身份认证信息;
  2. Netty协议栈服务端对握手请求消息进行合法性校验,包括节点ID有效性校验、节点重复登录校验和IP地址合法性校验,校验通过之后,返回登录成功的握手应答消息;
  3. 链路建立成功之后,客户端发送业务消息;
  4. 链路建立成功之后,服务端发送心跳消息;
  5. 链路建立成功之后,客户端发送心跳消息;
  6. 链路建立成功之后,服务端发送业务消息
  7. 服务端推出时,服务端关闭连接,客户端感知对方关闭连接后,被动关闭客户端连接。

Netty协议通信双方练了路建立成功后,双方可以进行全双工通信,无论客户端还是服务端,都可以主动发送请求消息给对方,通信方式可以是TWO WAY或者ONE WAY。双方之前的心跳采用Ping-Pong机制,当链路处理控线状态时,客户端主动发送Ping消息给服务端,服务端接收到Ping消息后发送应答消息Pong给客户端,如果客户端连续发送N条Ping消息都没有接收到服务端返回的Pong消息,说明链路已经挂死或者对方处理异常状态,客户端主动关闭连接,间隔周期T后发起重连操作,知道重连成功

Netty协议的编解码规范

###Netty协议编码
Netty协议NettyMessage的编码规范如下:

  1. rcCode:java.nio.ByteBuffer.putInt(int value),如果采用其他缓冲区实现,必须与其等价;
    2.length:java.nio.ByteBuffer.putInt(int value),如果采用其他缓冲区实现,必须与其等价;
  2. sessionID:java.nio.ByteBuffer.putLong(long value),如果采用其他缓冲区实现,必须与其等价;
  3. type: java.nio.ByteBuffer.put(byte b),如果采用其他缓冲区实现,必须与其等价;
  4. priority:java.nio.ByteBuffer.put(byte b),如果采用其他缓冲区实现,必须与其等价;
  5. attachment:它的编码规则为——如果attachment长度为0,表示没有可选附件,则将长度编码设为0,java.nio.ByteBuffer.putInt(0);如果大于0,说明有附件需要编码,具体的编码规则如下:首先对附件的个数进行编码,java.nio.ByteBuffer.putInt(attachment.size());然后对Key进行编码,再将它转换成byte数组之后编码内容.
  6. body的编码:通过JBoss Marshalling将其序列化为byte数组,然后调用java.nio.ByteBuffer.put(byte [] src)将其写入ByteBuffer缓冲区中。
    由于整个消息的长度必须等全部字段都编码完成之后才能确认,所以最后需要更新消息头中的length字段,将其重新写入ByteBuffer中。

Netty协议解码

相对于NettyMessage的编码,仍旧以java.nio.ByteBuffer为例,给出Netty协议的解码规范:

  1. crcCode:通过java.nio.ByteBuffer.getInt()获取校验码字段,其他缓冲区需要与其等价;
  2. length:通过java.nio.ByteBuffer.getInt()获取Netty消息的长度,其他缓冲区需要与其等价;
  3. sessionID:通过java.nio.ByteBuffer.getLong()获取会话ID,其他缓冲区需要与其等价;
  4. type:通过java.nio.ByteBuffer.get()获取消息类型,其他缓冲区需要与其等价;
  5. priority:通过java.nio.ByteBuffer.get()获取消息优先级,其他缓冲区需要与其等价;
  6. attachment:它的解码规则为——首先创建一个新的attachment对象,调用java.nio.ByteBuffer.getInt()获取附件的长度,如果为0,说明附件为空,解码结束,继续解消息体;如果非空,则根据长度通过for循环进行解码;
  7. body:通过JBoss的marshaller对其进行解码。

代码实现

依赖

<dependencies>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13</version>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>5.0.0.Alpha1</version>
        </dependency>
        <!--其他可能用到的依赖-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.25</version>
        </dependency>
        <dependency>
            <groupId>org.jboss.marshalling</groupId>
            <artifactId>jboss-marshalling-serial</artifactId>
            <version>2.0.2.Final</version>
        </dependency>
    </dependencies>

消息结构定义

消息头定义 Header

@Data
public final class Header {

    private int crcCode = 0xadaf0105;
    /**
     * 消息长度
     */
    private int length;
    /**
     * 会话ID
     */
    private long sessionId;
    /**
     * 消息类型
     */
    private byte type;
    /**
     * 消息优先级
     */
    private byte priority;
    /**
     * 附件
     */
    private Map<String, Object> attachment = new HashMap<>();

}

消息定义 NettyMessage

@Data
public final class NettyMessage {
    /**
     * 消息头
     */
    private Header header;
    /**
     * 消息体
     */
    private Object body;


}

消息类型定义 MessageType

public enum MessageType {

    /**
     * 业务请求消息
     */
    SERVICE_REQ((byte) 0),
    /**
     * 业务响应(应答)消息
     */
    SERVICE_RESP((byte) 1),
    /**
     * 业务ONE WAY消息(既是请求消息又是响应消息)
     */
    ONE_WAY((byte) 2),
    /**
     * 握手请求消息
     */
    LOGIN_REQ((byte) 3),
    /**
     * 握手响应(应答)消息
     */
    LOGIN_RESP((byte) 4),
    /**
     * 心跳请求消息
     */
    HEARTBEAT_REQ((byte) 5),
    /**
     * 心跳响应(应答)消息
     */
    HEARTBEAT_RESP((byte) 6);


    private byte value;

    MessageType(byte value) {
        this.value = value;
    }

    public byte value() {
        return value;
    }


}

返回结果定义

public enum ResultType {
    /**
     * 认证成功
     */
    SUCCESS((byte) 0),
    /**
     * 认证失败
     */
    FAIL((byte) -1),
    ;


    private byte value;

    private ResultType(byte value) {
        this.value = value;
    }

    public byte value() {
        return this.value;
    }


    }

端口常量定义 NettyConstant

public interface NettyConstant {

    public static final String REMOTEIP = "127.0.0.1";
    public static final int PORT = 8080;
    public static final int LOCAL_PORT = 12088;
    public static final String LOCALIP = "127.0.0.1";
}

消息编解码

ChannelBufferByteInput

/**
 * @author WH
 * @version 1.0
 * @date 2020/5/31 13:58
 * @Description 消息编码器
 */
public class ChannelBufferByteInput implements ByteInput {

    private final ByteBuf buffer;

    public ChannelBufferByteInput(ByteBuf buffer) {
        this.buffer = buffer;
    }

    @Override
    public void close() throws IOException {
        // nothing to do
    }

    @Override
    public int available() throws IOException {
        return buffer.readableBytes();
    }

    @Override
    public int read() throws IOException {
        if (buffer.isReadable()) {
            return buffer.readByte() & 0xff;
        }
        return -1;
    }

    @Override
    public int read(byte[] array) throws IOException {
        return read(array, 0, array.length);
    }

    @Override
    public int read(byte[] dst, int dstIndex, int length) throws IOException {
        int available = available();
        if (available == 0) {
            return -1;
        }

        length = Math.min(available, length);
        buffer.readBytes(dst, dstIndex, length);
        return length;
    }

    @Override
    public long skip(long bytes) throws IOException {
        int readable = buffer.readableBytes();
        if (readable < bytes) {
            bytes = readable;
        }
        buffer.readerIndex((int) (buffer.readerIndex() + bytes));
        return bytes;
    }
}

ChannelBufferByteOutput

package codec;

import io.netty.buffer.ByteBuf;
import org.jboss.marshalling.ByteOutput;

import java.io.IOException;

/**
 * @author WH
 * @version 1.0
 * @date 2020/5/31 14:10
 * @Description 消息解码器
 */
public class ChannelBufferByteOutput implements ByteOutput {

    private final ByteBuf buffer;

    /**
     * Create a new instance which use the given {@link ByteBuf}
     */
    public ChannelBufferByteOutput(ByteBuf buffer) {
        this.buffer = buffer;
    }

    @Override
    public void close() throws IOException {
        // Nothing to do
    }

    @Override
    public void flush() throws IOException {
        // nothing to do
    }

    @Override
    public void write(int b) throws IOException {
        buffer.writeByte(b);
    }

    @Override
    public void write(byte[] bytes) throws IOException {
        buffer.writeBytes(bytes);
    }

    @Override
    public void write(byte[] bytes, int srcIndex, int length) throws IOException {
        buffer.writeBytes(bytes, srcIndex, length);
    }

    /**
     * Return the {@link ByteBuf} which contains the written content
     *
     */
    ByteBuf getBuffer() {
        return buffer;
    }

}

MarshallingCodecFactory

package codec;

import org.jboss.marshalling.*;

import java.io.IOException;

/**
 * @author WH
 * @version 1.0
 * @date 2020/5/31 14:12
 * @Description 编码器
 */
public class MarshallingCodecFactory {

    /**
     * 创建Jboss Marshaller
     *
     * @return
     * @throws IOException
     */
    protected static Marshaller buildMarshalling() throws IOException {
        final MarshallerFactory marshallerFactory = Marshalling
                .getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        Marshaller marshaller = marshallerFactory
                .createMarshaller(configuration);
        return marshaller;
    }

    /**
     * 创建Jboss Unmarshaller
     *
     * @return
     * @throws IOException
     */
    protected static Unmarshaller buildUnMarshalling() throws IOException {
        final MarshallerFactory marshallerFactory = Marshalling
                .getProvidedMarshallerFactory("serial");

        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        final Unmarshaller unmarshaller = marshallerFactory
                .createUnmarshaller(configuration);
        return unmarshaller;
    }
}

MarshallingDecoder

package codec;

import io.netty.buffer.ByteBuf;
import org.jboss.marshalling.Unmarshaller;

import java.io.IOException;

/**
 * @author WH
 * @version 1.0
 * @date 2020/5/31 14:26
 * @Description TODO
 */
public class MarshallingDecoder {

    private final Unmarshaller unmarshaller;


    public MarshallingDecoder() throws IOException {
        unmarshaller = MarshallingCodecFactory.buildUnMarshalling();
    }

    protected Object decode(ByteBuf in) throws Exception {
        //1 首先读取4个长度(实际body内容长度)
        int objectSize = in.readInt();
        //2 获取实际body的缓冲内容
        int readIndex = in.readerIndex();
        ByteBuf buf = in.slice(readIndex, objectSize);
        //3 转换
        ChannelBufferByteInput input = new ChannelBufferByteInput(buf);
        try {
            //4 读取操作:
            unmarshaller.start(input);
            Object obj = unmarshaller.readObject();
            unmarshaller.finish();
            //5 读取完毕以后, 更新当前读取起始位置:
            //因为使用slice方法,原buf的位置还在readIndex上,故需要将位置重新设置一下
            in.readerIndex(in.readerIndex() + objectSize);
            return obj;
        } finally {
            unmarshaller.close();
        }
    }
}

MarshallingEncoder

package codec;

import io.netty.buffer.ByteBuf;
import org.jboss.marshalling.Marshaller;

import java.io.IOException;

/**
 * @author WH
 * @version 1.0
 * @date 2020/5/31 14:21
 * @Description TODO
 */
public class MarshallingEncoder {

    //空白占位: 用于预留设置 body的数据包长度
    private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
    Marshaller marshaller;

    public MarshallingEncoder() throws IOException {
        marshaller = MarshallingCodecFactory.buildMarshalling();
    }

    protected void encode(Object msg, ByteBuf out) throws Exception {
        try {
            //必须要知道当前的数据位置是哪: 起始数据位置
            //长度属性的位置索引
            int lengthPos = out.writerIndex();
            //占位写操作:先写一个4个字节的空的内容,记录在起始数据位置,用于设置内容长度
            out.writeBytes(LENGTH_PLACEHOLDER);
            ChannelBufferByteOutput output = new ChannelBufferByteOutput(out);
            marshaller.start(output);
            marshaller.writeObject(msg);
            marshaller.finish();
            //总长度(结束位置) - 初始化长度(起始位置) - 预留的长度  = body数据长度
            int endPos = out.writerIndex();
            out.setInt(lengthPos, endPos - lengthPos - 4);
        } finally {
            marshaller.close();
        }
    }

}

NettyMessageDecoder

package codec;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import message.Header;
import message.NettyMessage;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
 * @author WH
 * @version 1.0
 * @date 2020/5/31 13:48
 * @Description TODO
 */
public final class NettyMessageDecoder extends LengthFieldBasedFrameDecoder {

    MarshallingDecoder marshallingDecoder;

    /**
     *
     * @param maxFrameLength 第一个参数代表最大的序列化长度
     * @param lengthFieldOffset 代表长度属性的偏移量 简单来说就是message中 总长度的起始位置(Header中的length属性的起始位置)   本例中为4
     * @param lengthFieldLength 代表长度属性的长度 整个属性占多长(length属性为int,占4个字节)  4
     * @throws IOException
     */
    public NettyMessageDecoder(int maxFrameLength, int lengthFieldOffset,
                               int lengthFieldLength) throws IOException {
        super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
        marshallingDecoder = new MarshallingDecoder();
}

    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in)
            throws Exception {
        ByteBuf frame = (ByteBuf) super.decode(ctx, in);
        if (frame == null) {
            return null;
        }

        NettyMessage message = new NettyMessage();
        Header header = new Header();
        // 加通信标记认证逻辑
        header.setCrcCode(frame.readInt());
        header.setLength(frame.readInt());
        header.setSessionId(frame.readLong());
        header.setType(frame.readByte());
        header.setPriority(frame.readByte());

        int size = frame.readInt();
        //附件个数大于0,则需要解码操作
        if (size > 0) {
            Map<String, Object> attch = new HashMap<String, Object>(size);
            int keySize = 0;
            byte[] keyArray = null;
            String key = null;
            for (int i = 0; i < size; i++) {
                keySize = frame.readInt();
                keyArray = new byte[keySize];
                frame.readBytes(keyArray);
                key = new String(keyArray, "UTF-8");
                attch.put(key, marshallingDecoder.decode(frame));
            }
            keyArray = null;
            key = null;
            //解码完成放入attachment
            header.setAttachment(attch);
        }
        message.setHeader(header);
        //对于ByteBuf来说,读一个数据,就会少一个数据,所以读完header,剩下的应该就是body了
        if (frame.readableBytes() > 4) {
            message.setBody(marshallingDecoder.decode(frame));
        }

        return message;
    }
}

NettyMessageEncoder

package codec;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.util.CharsetUtil;
import message.Header;
import message.NettyMessage;

import java.io.IOException;
import java.util.Map;

/**
 * @author WH
 * @version 1.0
 * @date 2020/5/31 14:15
 * @Description TODO
 */
public class NettyMessageEncoder extends MessageToByteEncoder<NettyMessage> {

    private MarshallingEncoder marshallingEncoder;

    public NettyMessageEncoder() throws IOException {
        this.marshallingEncoder = new MarshallingEncoder();
    }
    @Override
    protected void encode(ChannelHandlerContext ctx, NettyMessage msg, ByteBuf sendBuf) throws Exception {
        if(msg == null || msg.getHeader() == null){
            throw new Exception("编码失败,没有数据信息!");
        }

        //Head:
        Header header = msg.getHeader();
        sendBuf.writeInt(header.getCrcCode());//校验码
        sendBuf.writeInt(header.getLength());//总长度
        sendBuf.writeLong(header.getSessionId());//会话id
        sendBuf.writeByte(header.getType());//消息类型
        sendBuf.writeByte(header.getPriority());//优先级

        //对附件信息进行编码
        //编码规则为:如果attachment的长度为0,表示没有可选附件,则将长度	编码设置为0
        //如果attachment长度大于0,则需要编码,规则:
        //首先对附件的个数进行编码
        sendBuf.writeInt((header.getAttachment().size())); //附件大小
        String key = null;
        byte[] keyArray = null;
        Object value = null;
        //然后对key进行编码,先编码长度,然后再将它转化为byte数组之后编码内容
        for (Map.Entry<String, Object> param : header.getAttachment()
                .entrySet()) {
            key = param.getKey();
            keyArray = key.getBytes(CharsetUtil.UTF_8);
            sendBuf.writeInt(keyArray.length);//key的字符编码长度
            sendBuf.writeBytes(keyArray);
            value = param.getValue();
            marshallingEncoder.encode(value, sendBuf);
        }
        key = null;
        keyArray = null;
        value = null;

        //Body:
        Object body = msg.getBody();
        //如果不为空 说明: 有数据
        if(body != null){
            //使用MarshallingEncoder
            this.marshallingEncoder.encode(body, sendBuf);
        } else {
            //如果没有数据 则进行补位 为了方便后续的 decoder操作
            sendBuf.writeInt(0);
        }

        //最后我们要获取整个数据包的总长度 也就是 header +  body 进行对 header length的设置

        // TODO:  解释: 在这里必须要-8个字节 ,是因为要把CRC和长度本身占的减掉了
        //(官方中给出的是:LengthFieldBasedFrameDecoder中的lengthFieldOffset+lengthFieldLength)
        //总长度是在header协议的第二个标记字段中
        //第一个参数是长度属性的索引位置
        sendBuf.setInt(4, sendBuf.readableBytes() - 8);
    }
}

客户端实现

ClientHandler

package client;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
import message.NettyMessage;

/**
 * @author WH
 * @version 1.0
 * @date 2020/6/1 21:30
 * @Description TODO
 */
@Slf4j
public class ClientHandler  extends ChannelHandlerAdapter {

    // 连接成功监听
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {

            NettyMessage message = (NettyMessage)msg;
            System.err.println("Client receive message from server: " + message.getBody());

        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

    // 客户端断开连接监听
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        log.info("----------客户端断开连接-----------");
        ctx.close();
    }

}

客户端心跳检测 HeartBeatReqHandler

package client;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.ScheduledFuture;
import lombok.extern.slf4j.Slf4j;
import message.Header;
import message.MessageType;
import message.NettyMessage;

import java.util.concurrent.TimeUnit;

/**
 * @author WH
 * @version 1.0
 * @date 2020/5/31 15:29
 * @Description 客户端心跳检测
 */
@Slf4j
public class HeartBeatReqHandler extends ChannelHandlerAdapter {

    private volatile ScheduledFuture<?> heartBeat;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        NettyMessage message = (NettyMessage) msg;
        // 握手成功,主动发送心跳消息
        if (message.getHeader() != null
                && message.getHeader().getType() == MessageType.LOGIN_RESP
                .value()) {
            heartBeat = ctx.executor().scheduleAtFixedRate(
                    new HeartBeatReqHandler.HeartBeatTask(ctx), 0, 5000,
                    TimeUnit.MILLISECONDS);
            log.info("客户端发送心跳包");
        } else if (message.getHeader() != null
                && message.getHeader().getType() == MessageType.HEARTBEAT_RESP
                .value()) {
            log.info("Client receive server heart beat message : ---> {}", message);
        } else
            ctx.fireChannelRead(msg);
    }

    private class HeartBeatTask implements Runnable {
        private final ChannelHandlerContext ctx;

        public HeartBeatTask(final ChannelHandlerContext ctx) {
            this.ctx = ctx;
        }

        @Override
        public void run() {
            NettyMessage heatBeat = buildHeatBeat();
            log.info("Client send heart beat messsage to server : ---> {}", heatBeat);

            ctx.writeAndFlush(heatBeat);
        }

        private NettyMessage buildHeatBeat() {
            NettyMessage message = new NettyMessage();
            Header header = new Header();
            header.setType(MessageType.HEARTBEAT_REQ.value());
            message.setHeader(header);
            return message;
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        if (heartBeat != null) {
            heartBeat.cancel(true);
            heartBeat = null;
        }
        ctx.fireExceptionCaught(cause);
    }
}

客户端握手认证 LoginAuthReqHandler

package client;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import message.Header;
import message.MessageType;
import message.NettyMessage;

/**
 * @author WH
 * @version 1.0
 * @date 2020/5/31 15:23
 * @Description 客户端握手认证
 */
@Slf4j
public class LoginAuthReqHandler extends ChannelHandlerAdapter {


    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(buildLoginReq());
    }


    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        NettyMessage message = (NettyMessage) msg;

        // 如果是握手应答消息,需要判断是否认证成功
        if (message.getHeader() != null
                && message.getHeader().getType() == MessageType.LOGIN_RESP
                .value()) {
            byte loginResult = (byte) message.getBody();
            if (loginResult != (byte) 0) {
                // 握手失败,关闭连接
                ctx.close();
            } else {
                log.info("Login is ok : {}", message);
                ctx.fireChannelRead(msg);
            }
        } else
            ctx.fireChannelRead(msg);
    }

    private NettyMessage buildLoginReq() {
        NettyMessage message = new NettyMessage();
        Header header = new Header();
        header.setType(MessageType.LOGIN_REQ.value());
        message.setHeader(header);
        return message;
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        ctx.fireExceptionCaught(cause);
    }
}

客户端 NettyClient

package client;

import codec.NettyMessageDecoder;
import codec.NettyMessageEncoder;
import constant.NettyConstant;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.ReadTimeoutHandler;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * @author WH
 * @version 1.0
 * @date 2020/5/31 15:33
 * @Description 客户端
 */
public class NettyClient {

    public static void main(String[] args) throws Exception {
        new NettyClient().connect(NettyConstant.PORT, NettyConstant.REMOTEIP);
    }

    private ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
    //创建工作线程组
    EventLoopGroup group = new NioEventLoopGroup();

    public void connect(int port, String host) throws Exception {
        // 配置客户端NIO线程组
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new NettyMessageDecoder(1024 * 1024, 4, 4));
                            ch.pipeline().addLast(new NettyMessageEncoder());
                            ch.pipeline().addLast("readTimeoutHandler", new ReadTimeoutHandler(50));
                            ch.pipeline().addLast("LoginAuthHandler", new LoginAuthReqHandler());
                            ch.pipeline().addLast("HeartBeatHandler", new HeartBeatReqHandler());
                        }
                    });
            // 发起异步连接操作
            ChannelFuture future = b.connect(new InetSocketAddress(host, port),
                    new InetSocketAddress(NettyConstant.LOCALIP, NettyConstant.LOCAL_PORT)).sync();

            //手动发测试数据,验证是否会产生TCP粘包/拆包情况
//			Channel c = future.channel();
//
//			for (int i = 0; i < 500; i++) {
//				NettyMessage message = new NettyMessage();
//				Header header = new Header();
//				header.setSessionID(1001L);
//				header.setPriority((byte) 1);
//				header.setType((byte) 0);
//				message.setHeader(header);
//				message.setBody("我是请求数据" + i);
//				c.writeAndFlush(message);
//			}

            future.channel().closeFuture().sync();
        } finally {
            // 所有资源释放完成之后,清空资源,再次发起重连操作
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                        try {
                            connect(NettyConstant.PORT, NettyConstant.REMOTEIP);// 发起重连操作
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
}

服务端实现

服务端心跳检测 HeartBeatRespHandler

package server;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import message.Header;
import message.MessageType;
import message.NettyMessage;

/**
 * @author WH
 * @version 1.0
 * @date 2020/5/31 15:31
 * @Description TODO
 */
@Slf4j
public class HeartBeatRespHandler extends ChannelHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        NettyMessage message = (NettyMessage) msg;
        // 返回心跳应答消息
        if (message.getHeader() != null
                && message.getHeader().getType() == MessageType.HEARTBEAT_REQ
                .value()) {
            log.info("Receive client heart beat message : ---> {}", message);

            NettyMessage heartBeat = buildHeatBeat();
            log.info("Send heart beat response message to client : ---> {}", heartBeat);

            ctx.writeAndFlush(heartBeat);
        } else
            ctx.fireChannelRead(msg);
    }

    private NettyMessage buildHeatBeat() {
        NettyMessage message = new NettyMessage();
        Header header = new Header();
        header.setType(MessageType.HEARTBEAT_RESP.value());
        message.setHeader(header);
        return message;
    }
}

服务端握手认证 LoginAuthRespHandler

package server;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import message.Header;
import message.MessageType;
import message.NettyMessage;

import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author WH
 * @version 1.0
 * @date 2020/5/31 15:26
 * @Description 服务端握手和安全认证
 */
@Slf4j
public class LoginAuthRespHandler extends ChannelHandlerAdapter {

    private Map<String, Boolean> nodeCheck = new ConcurrentHashMap<String, Boolean>();
    private String[] whitekList = { "127.0.0.1", "192.168.56.1" };

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        NettyMessage message = (NettyMessage) msg;

        // 如果是握手请求消息,处理,其它消息透传
        if (message.getHeader() != null
                && message.getHeader().getType() == MessageType.LOGIN_REQ
                .value()) {
            String nodeIndex = ctx.channel().remoteAddress().toString();
            NettyMessage loginResp = null;
            // 重复登陆,拒绝
            if (nodeCheck.containsKey(nodeIndex)) {
                loginResp = buildResponse((byte) -1);
            } else {
                InetSocketAddress address = (InetSocketAddress) ctx.channel()
                        .remoteAddress();
                String ip = address.getAddress().getHostAddress();
                boolean isOK = false;
                for (String WIP : whitekList) {
                    if (WIP.equals(ip)) {
                        isOK = true;
                        break;
                    }
                }
                loginResp = isOK ? buildResponse((byte) 0)
                        : buildResponse((byte) -1);
                if (isOK)
                    nodeCheck.put(nodeIndex, true);
            }
            log.info("The login response is : {} body [ {} ]", loginResp, loginResp.getBody());
            ctx.writeAndFlush(loginResp);
        } else {
            ctx.fireChannelRead(msg);
        }
    }

    private NettyMessage buildResponse(byte result) {
        NettyMessage message = new NettyMessage();
        Header header = new Header();
        header.setType(MessageType.LOGIN_RESP.value());
        message.setHeader(header);
        message.setBody(result);
        return message;
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        nodeCheck.remove(ctx.channel().remoteAddress().toString());// 删除缓存
        ctx.close();
        ctx.fireExceptionCaught(cause);
    }
}

ServerHandler

package server;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import message.Header;
import message.NettyMessage;

/**
 * @author WH
 * @version 1.0
 * @date 2020/6/1 21:31
 * @Description TODO
 */
@Slf4j
public class ServerHandler extends ChannelHandlerAdapter {

    /**
     * 当我们通道进行激活的时候 触发的监听方法
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("通道激活");
    }

    /**
     * 当我们的通道里有数据进行读取的时候 触发的监听方法
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx /*NETTY服务上下文*/, Object msg /*实际的传输数据*/) throws Exception {

        NettyMessage requestMessage = (NettyMessage) msg;

        System.err.println("Server receive message from Client: " + requestMessage.getBody());

        NettyMessage responseMessage = new NettyMessage();
        Header header = new Header();
        header.setSessionId(2002L);
        header.setPriority((byte) 2);
        header.setType((byte) 1);
        responseMessage.setHeader(header);
        responseMessage.setBody("我是响应数据: " + requestMessage.getBody());
        ctx.writeAndFlush(responseMessage);

    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        log.info("--------数据读取完毕----------");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        log.info("--------服务器数据读异常----------:");
        cause.printStackTrace();
        ctx.close();
    }
}

NettyServer

package server;

import codec.NettyMessageDecoder;
import codec.NettyMessageEncoder;
import constant.NettyConstant;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;

/**
 * @author WH
 * @version 1.0
 * @date 2020/5/31 15:35
 * @Description TODO
 */
@Slf4j
public class NettyServer {

    public static void main(String[] args) throws Exception {
        new NettyServer().bind();
    }

    public void bind() throws Exception {
        //1 用于接受客户端连接的线程工作组
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //2 用于对接受客户端连接读写操作的线程工作组
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        //3 辅助类。用于帮助我们创建NETTY服务
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)//绑定两个工作线程组
                .channel(NioServerSocketChannel.class) //设置NIO的模式
                .option(ChannelOption.SO_BACKLOG, 1024) //设置TCP缓冲区
                .handler(new LoggingHandler(LogLevel.INFO))
                // 初始化绑定服务通道
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch)
                            throws IOException {
                        ch.pipeline().addLast(
                                new NettyMessageDecoder(1024 * 1024, 4, 4));
                        ch.pipeline().addLast(new NettyMessageEncoder());
                        ch.pipeline().addLast("readTimeoutHandler",
                                new ReadTimeoutHandler(50));
                        ch.pipeline().addLast(new LoginAuthRespHandler());
                        ch.pipeline().addLast("HeartBeatHandler",
                                new HeartBeatRespHandler());
                        ch.pipeline().addLast(new ServerHandler());
                    }
                });

        // 绑定端口,同步等待成功
        ChannelFuture cf = b.bind(NettyConstant.REMOTEIP, NettyConstant.PORT).sync();
        log.info("Netty server start ok : {} : {}",NettyConstant.REMOTEIP, NettyConstant.PORT);

        //释放连接
        cf.channel().closeFuture().sync();
        workerGroup.shutdownGracefully();
        bossGroup.shutdownGracefully();

    }


}

测试

  • 服务端
    在这里插入图片描述

  • 客户端
    在这里插入图片描述

  • 其他测试:服务端宕机重连,客户端宕机重连

源码下载

下载

参考

Netty权威指南
博客

最后

以上就是野性微笑为你收集整理的初识Netty六(自定义协议(私有协议)开发)的全部内容,希望文章能够帮你解决初识Netty六(自定义协议(私有协议)开发)所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(74)

评论列表共有 0 条评论

立即
投稿
返回
顶部