概述
前言:
上文中了解完Sentinel中NettyClient的创建之后,本文就来学习下NettyServer是如何创建的。
相对NettyClient而言,server相对容易些。
1.定义服务端接口
同样的,面向接口编程。服务端也是首先创建一个接口,如下所示:
public interface ClusterTokenServer {
/**
* Start the Sentinel cluster server.
*
* @throws Exception if any error occurs
*/
void start() throws Exception;
/**
* Stop the Sentinel cluster server.
*
* @throws Exception if any error occurs
*/
void stop() throws Exception;
}
相对客户端的接口而言,简单许多,只有start()和stop()方法
2.定义基于Netty的服务端实现类
同样的,基于接口的创建方式,可以很方便的切换到其他方式(如Mina)的服务端创建方式。
2.1 NettyTransportServer基本参数
public class NettyTransportServer implements ClusterTokenServer {
// 定义EventLoop线程数,默认与当前服务器核数相关,支持用户自定义配置
private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1,
SystemPropertyUtil.getInt("io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
// 支持连接创建失败重试
private static final int MAX_RETRY_TIMES = 3;
private static final int RETRY_SLEEP_MS = 2000;
// 当前服务绑定端口
private final int port;
// NettyServer创建时的bossGroup 和 workGroup
private NioEventLoopGroup bossGroup;
private NioEventLoopGroup workerGroup;
// 这里是对连接上来的客户端连接的一个管理
private final ConnectionPool connectionPool = new ConnectionPool();
// 跟客户端一样,也是一个状态位的判断,防止多次启动
private final AtomicInteger currentState = new AtomicInteger(SERVER_STATUS_OFF);
private final AtomicInteger failedTimes = new AtomicInteger(0);
// 构造函数,只需要提供一个port端口信息即可
public NettyTransportServer(int port) {
this.port = port;
}
...
}
除了常规的port和bossGroup、workerGroup之外,这里的参数还有些骚操作是值得我们学习的。
1)state状态位的判断,在客户端创建时也有用到,可以防止多次创建
2)失败重试参数的设置。设置最大重试次数和失败重试时间
3)有关于EventLoopGroup的线程数,支持用于自定义,否则使用默认值(线程数与当前机器的核数相关)
4)提供ConnectionPool用于管理连接上来的客户端连接
2.2 NettyTransportServer.start() 启动服务端
public class NettyTransportServer implements ClusterTokenServer {
public void start() {
// 首先进行状态位的设置,避免多次启动
if (!currentState.compareAndSet(SERVER_STATUS_OFF, SERVER_STATUS_STARTING)) {
return;
}
ServerBootstrap b = new ServerBootstrap();
// reactor模式,设置workerGroup线程数
this.bossGroup = new NioEventLoopGroup(1);
this.workerGroup = new NioEventLoopGroup(DEFAULT_EVENT_LOOP_THREADS);
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
// server tcp的基本参数
.option(ChannelOption.SO_BACKLOG, 128)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
p.addLast(new NettyRequestDecoder());
p.addLast(new LengthFieldPrepender(2));
p.addLast(new NettyResponseEncoder());
// 最主要的是TokenServerHandler这个Handler,用于处理客户端请求
p.addLast(new TokenServerHandler(connectionPool));
}
})
// 设置客户端连接的基本TCP参数
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.SO_SNDBUF, 32 * 1024)
.childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
.childOption(ChannelOption.SO_TIMEOUT, 10)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_RCVBUF, 32 * 1024);
b.bind(port).addListener(new GenericFutureListener<ChannelFuture>() {
// 监听server绑定是否成功,如果失败,则继续重试,最大重试次数不超过MAX_RETRY_TIMES
public void operationComplete(ChannelFuture future) {
if (future.cause() != null) {
RecordLog.info("[NettyTransportServer] Token server start failed (port=" + port + "), failedTimes: " + failedTimes.get(),
future.cause());
currentState.compareAndSet(SERVER_STATUS_STARTING, SERVER_STATUS_OFF);
int failCount = failedTimes.incrementAndGet();
if (failCount > MAX_RETRY_TIMES) {
return;
}
try {
// 这里的sleep,类似于指数避退的方式,很有意思
Thread.sleep(failCount * RETRY_SLEEP_MS);
start();
} catch (Throwable e) {
RecordLog.info("[NettyTransportServer] Failed to start token server when retrying", e);
}
} else {
RecordLog.info("[NettyTransportServer] Token server started success at port " + port);
currentState.compareAndSet(SERVER_STATUS_STARTING, SERVER_STATUS_STARTED);
}
}
});
}
}
这里的重试操作,还是值得我们学习下的,一般来说绑定本地port应该不会失败,但如果因为某些原因失败时,我们应该在代码中可以自动重试
2.3 NettyTransportServer.stop() 停止服务端
public class NettyTransportServer implements ClusterTokenServer {
public void stop() {
// 还是先判断状态位
while (currentState.get() == SERVER_STATUS_STARTING) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
// Ignore.
}
}
if (currentState.compareAndSet(SERVER_STATUS_STARTED, SERVER_STATUS_OFF)) {
try {
// 关闭线程资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
// 这里还会关闭所有的客户端连接
connectionPool.shutdownAll();
failedTimes.set(0);
RecordLog.info("[NettyTransportServer] Sentinel token server stopped");
} catch (Exception ex) {
RecordLog.warn("[NettyTransportServer] Failed to stop token server (port=" + port + ")", ex);
}
}
}
}
2.4 管理客户端连接
这个操作是之前没有考虑过的,看了这里之后,发现还可以直接管理客户端连接。
简单思考一下:我们获取了所有的客户端连接之后可以做什么呢?我们可以对长时间空闲的连接直接关闭;也可以定向关闭某一个ip过来的连接;
下面来看下其是如何获取到所有的客户端连接的。
2.4.1 定义ConnectionPool连接管理池
public class ConnectionPool {
@SuppressWarnings("PMD.ThreadPoolCreationRule")
private static final ScheduledExecutorService TIMER = Executors.newScheduledThreadPool(2);
// 实际就是一个Map
private final Map<String, Connection> CONNECTION_MAP = new ConcurrentHashMap<String, Connection>();
...
}
2.4.2 客户端连接创建时添加到连接池中
public class TokenServerHandler extends ChannelInboundHandlerAdapter {
private final ConnectionPool globalConnectionPool;
public TokenServerHandler(ConnectionPool globalConnectionPool) {
this.globalConnectionPool = globalConnectionPool;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 在这里调用的
globalConnectionPool.createConnection(ctx.channel());
String remoteAddress = getRemoteAddress(ctx);
}
}
// ConnectionPool
public class ConnectionPool {
@SuppressWarnings("PMD.ThreadPoolCreationRule")
private static final ScheduledExecutorService TIMER = Executors.newScheduledThreadPool(2);
// 实际就是一个Map
private final Map<String, Connection> CONNECTION_MAP = new ConcurrentHashMap<String, Connection>();
public void createConnection(Channel channel) {
if (channel != null) {
// 创建完NettyConnection后,直接保存到Map中
Connection connection = new NettyConnection(channel, this);
// 这里的key以客户端的ip:port
String connKey = getConnectionKey(channel);
CONNECTION_MAP.put(connKey, connection);
}
}
private String getConnectionKey(Channel channel) {
InetSocketAddress socketAddress = (InetSocketAddress) channel.remoteAddress();
String remoteIp = socketAddress.getAddress().getHostAddress();
int remotePort = socketAddress.getPort();
return remoteIp + ":" + remotePort;
}
}
// NettyConnection
public class NettyConnection implements Connection {
// 基本属性
private String remoteIp;
private int remotePort;
private Channel channel;
// 这里记录了当前channel最后一次read时间,用于后续操作
private long lastReadTime;
private ConnectionPool pool;
}
这里创建NettyConnection对象,封装了Channel的基本信息
2.4.2 ConnectionPool清理长期未read NettyConnection
// TokenServerHandler 请求处理类
public class TokenServerHandler extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 当发生read调用时,先将connection的 readTime更新下
globalConnectionPool.refreshLastReadTime(ctx.channel());
...
}
}
// ConnectionPool
public class ConnectionPool {
public void refreshLastReadTime(Channel channel) {
if (channel != null) {
String connKey = getConnectionKey(channel);
Connection connection = CONNECTION_MAP.get(connKey);
if (connection != null) {
// 更新为当前时间
connection.refreshLastReadTime(System.currentTimeMillis());
}
}
}
// 定时器清理长时间未read的connection
private ScheduledFuture scanTaskFuture = null;
private synchronized void startScan() {
if (scanTaskFuture == null
|| scanTaskFuture.isCancelled()
|| scanTaskFuture.isDone()) {
// 启动定时器,最终任务交由ScanIdleConnectionTask执行
scanTaskFuture = TIMER.scheduleAtFixedRate(
new ScanIdleConnectionTask(this), 10, 30, TimeUnit.SECONDS);
}
}
}
// ScanIdleConnectionTask
public class ScanIdleConnectionTask implements Runnable {
// 将ConnectionPool当做入参传递过来
private final ConnectionPool connectionPool;
public ScanIdleConnectionTask(ConnectionPool connectionPool) {
this.connectionPool = connectionPool;
}
@Override
public void run() {
try {
int idleSeconds = ClusterServerConfigManager.getIdleSeconds();
long idleTimeMillis = idleSeconds * 1000;
if (idleTimeMillis < 0) {
idleTimeMillis = ServerTransportConfig.DEFAULT_IDLE_SECONDS * 1000;
}
long now = System.currentTimeMillis();
// 获取所有连接,判断当前时间与连接的上次读时间的差值,如果超过我们规定的值,则直接关闭connection
List<Connection> connections = connectionPool.listAllConnection();
for (Connection conn : connections) {
if ((now - conn.getLastReadTime()) > idleTimeMillis) {
RecordLog.info(
String.format("[ScanIdleConnectionTask] The connection <%s:%d> has been idle for <%d>s. "
+ "It will be closed now.", conn.getRemoteIP(), conn.getRemotePort(), idleSeconds)
);
conn.close();
}
}
} catch (Throwable t) {
RecordLog.warn("[ScanIdleConnectionTask] Failed to clean-up idle tasks", t);
}
}
}
总结:
一个简单的NettyServer创建过程,经过框架的升级之后还是有很多不同点的。
多看源码多学习,向大佬们致敬!
最后
以上就是专注吐司为你收集整理的Netty实战之Sentinel框架应用(二)的全部内容,希望文章能够帮你解决Netty实战之Sentinel框架应用(二)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复