声明:此博客为学习笔记,学习自极客学院ZooKeeper相关视频;非常感谢众多大牛们的知识分享。
相关概念:
负载均衡(相关节点)架构图:
说明:每当往集群中新增一个工作服务器时,都会再/server节点下创建一个对应的临时节点,该节点中应含有该服务器
的连接信息以及均衡标识等。当客户端需要连接worker server时,就会先读取/servers节点下的所有子节点,并获
取每个子节点的相关信息,通过均衡算法来确认要连接哪一个worker server进行连接。
负载均衡之server服务端流程图:
负载均衡之client客户端流程图:
软硬件环境:Windows10、IntelliJ IDEA、SpringBoot、ZkClient、Netty
准备工作:在pom.xml中引入相关依赖
<!-- https://mvnrepository.com/artifact/com.101tec/zkclient -->
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha2</version>
</dependency>
ZooKeeper负载均衡实现示例:
服务端总体说明:
IncreaseOrDecreaseLoadOperator:对节点的负载量进行增加/减少的封装类。
RegistAndUnregistOperator:server服务器往zookeeper注册/注销的封装类。
ServerData:server服务器节点的数据的数据模型。
ServerHandler:netty用于处理信道事件的自定义逻辑。
ServerLogicImpl:server服务器的总体逻辑。
ServerRunnerTest:服务端的测试类。
服务端各类细节:
IncreaseOrDecreaseLoadOperator:
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkBadVersionException;
import org.apache.zookeeper.data.Stat;
/**
* 服务节 负载大小的增/减 实现逻辑
*
* @author JustryDeng
* @date 2018/12/4 9:55
*/
public class IncreaseOrDecreaseLoadOperator {
/** 服务器节点路径 */
private String serverNodePath;
/** zkClient客户端, 此类中的代码逻辑要求:此客户端必须是以SerializableSerializer序列化器创建的实例 */
private ZkClient zkClient;
public IncreaseOrDecreaseLoadOperator(String serverNodePath, ZkClient zkClient) {
this.serverNodePath = serverNodePath;
this.zkClient = zkClient;
}
/**
* 增加负载 --- 记数
*
* @author JustryDeng
* @date 2018/12/4 11:25
*/
public boolean increaseLoad(Integer step) {
Stat stat = new Stat();
ServerData serverData;
while (true) {
try {
serverData = zkClient.readData(this.serverNodePath, stat);
serverData.setLoad(serverData.getLoad() + step);
// 为避免并发出错,修改数据时,要校验版本号
zkClient.writeData(this.serverNodePath, serverData, stat.getVersion());
return true;
} catch (ZkBadVersionException e) {
// 版本不对则重试
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
}
/**
* 降低负载 --- 记数
*
* @author JustryDeng
* @date 2018/12/4 11:25
*/
public boolean decreaseLoad(Integer step) {
Stat stat = new Stat();
ServerData serverData;
while (true) {
try {
serverData = zkClient.readData(this.serverNodePath, stat);
final Integer currentLoad = serverData.getLoad();
serverData.setLoad(currentLoad > step ? currentLoad - step : 0);
// 为避免并发出错,修改数据时,要校验版本号
zkClient.writeData(this.serverNodePath, serverData, stat.getVersion());
return true;
} catch (ZkBadVersionException e) {
// 版本不对则重试
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
}
}
RegistAndUnregistOperator:
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
/**
* server服务器的注册与注销
*
* @author JustryDeng
* @date 2018/12/4 9:56
*/
public class RegistAndUnregistOperator {
private String serverNodePath;
private ZkClient zkClient;
private ServerData serverData;
public RegistAndUnregistOperator(String serverNodePath, ZkClient zkClient, ServerData serverData) {
this.serverNodePath = serverNodePath;
this.zkClient = zkClient;
this.serverData = serverData;
}
/**
* 注册
*/
public void regist() {
try {
// 创建server对应的 临时节点
zkClient.createEphemeral(serverNodePath, serverData);
} catch (ZkNoNodeException e) { // 若serverNodePath的父节点不存在,那么创建持久的父节点
String parentDir = serverNodePath.substring(0, serverNodePath.lastIndexOf('/'));
zkClient.createPersistent(parentDir, true);
regist();
} catch (ZkNodeExistsException e) {
System.out.println("该节点已存在!");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 注销
*/
public void unRegist() {
try {
// 由于我们一般都不会在server节点下创建子节点,所以delete/deleteRecursive都行
zkClient.delete(serverNodePath);
} catch (Exception e) {
e.printStackTrace();
}
}
}
ServerData:
import java.io.Serializable;
/**
* 服务器数据模型
* 注:由于其他逻辑需要,这里要保证其可实例化,可比较
*
* @author JustryDeng
* @date 2018/12/4 11:28
*/
public class ServerData implements Serializable,Comparable<ServerData> {
private static final long serialVersionUID = 4402636779492870030L;
/** 当前服务器的负载大小 */
private Integer load;
/** 当前服务器的地址 */
private String host;
/** 当前服务器的端口 */
private Integer port;
public Integer getLoad() {
return load;
}
public void setLoad(Integer load) {
this.load = load;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public Integer getPort() {
return port;
}
public void setPort(Integer port) {
this.port = port;
}
/**
* A.compareTo(B)
* A > B返回 1
* A == B返回 0
* A < B返回 -1
*/
@Override
public int compareTo(ServerData anotherServerData) {
return this.getLoad().compareTo(anotherServerData.getLoad());
}
@Override
public String toString() {
return "ServerData{load=" + load + ", host='" + host + "', port=" + port + '}';
}
}
ServerHandler:
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
/**
* 创建一个处理器,来处理信道事件
*
* @author JustryDeng
* @date 2018/12/4 11:02
*/
public class ServerHandler extends ChannelHandlerAdapter{
/** 增加/减少 负载大小的实现逻辑封装 */
private final IncreaseOrDecreaseLoadOperator loadOperator;
/** 步进,即:server负载因子增长步进 */
private static final Integer BALANCE_STEP = 1;
public ServerHandler(IncreaseOrDecreaseLoadOperator loadOperator){
this.loadOperator = loadOperator;
}
public IncreaseOrDecreaseLoadOperator getLoadOperator() {
return loadOperator;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("one client connect...");
loadOperator.increaseLoad(BALANCE_STEP);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
loadOperator.decreaseLoad(BALANCE_STEP);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable throwable) {
throwable.printStackTrace();
ctx.close();
}
}
ServerLogicImpl:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
/**
* server服务器的总体逻辑
*
* @author JustryDeng
* @date 2018/12/4 15:46
*/
public class ServerLogicImpl {
/** 连接ZooKeeper的ip和端口 */
private String ipAndPort;
/** servers节点路径 */
private String serversNodePath;
/** 要在servers节点下创建的子节点的全路径 */
private String currentServerPath;
/** 当前节点数据 */
private ServerData currentServerData;
/** zkclient会话超时时间 */
private static final Integer SESSION_TIME_OUT = 10000;
/** zkclient连接超时时间 */
private static final Integer CONNECT_TIME_OUT = 10000;
/** zookeeper节点路径分隔符 */
private final String NODE_SEPARATOR = "/";
/** zkClient客户端 */
private final ZkClient zkClient;
/** server服务器注册与注销的逻辑封装类 */
private final RegistAndUnregistOperator registAndUnregistOperator;
/**
* netty相关
*/
private EventLoopGroup bossGroup = new NioEventLoopGroup();
private EventLoopGroup workGroup = new NioEventLoopGroup();
private ServerBootstrap bootStrap = new ServerBootstrap();
private ChannelFuture channelFuture;
/** netty相关 之 ServerBootstrap与当前server的端口 的绑定标识*/
private volatile boolean binded = false;
/**
* 构造器
*
* @param ipAndPort
* 连接ZooKeeper的ip和端口
* @param serversNodePath
* 当前服务器的节点路径
* @param serverData
* 当前服务器节点的数据
* @author JustryDeng
* @date 2018/12/4 12:26
*/
public ServerLogicImpl(String ipAndPort, String serversNodePath, ServerData serverData){
this.ipAndPort = ipAndPort;
this.serversNodePath = serversNodePath;
this.currentServerData = serverData;
// 这里以端口号作为当前服务器对应的节点名,那么全路径节点名
this.currentServerPath = serversNodePath.concat(NODE_SEPARATOR).concat(serverData.getPort().toString());
// 根据逻辑需要,这里创建以SerializableSerializer为序列化工具的zkClient
this.zkClient = new ZkClient(this.ipAndPort, SESSION_TIME_OUT, CONNECT_TIME_OUT, new SerializableSerializer());
this.registAndUnregistOperator = new RegistAndUnregistOperator(currentServerPath, zkClient, serverData);
}
/**
* server的逻辑实现
*
* @author JustryDeng
* @date 2018/12/4 12:43
*/
public void bind() {
// 如果当前server对应的端口已经bind过了,那么进行短路
if (binded){
return;
}
System.out.println(currentServerData.getPort() + ":binding...");
try {
// 将当前server,注册到zookeeper
registAndUnregistOperator.regist();
} catch (Exception e) {
e.printStackTrace();
return;
}
// ServerBootstrap负责初始化netty服务器
bootStrap.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
// 向pipeline中添加 处理业务的handler
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel socketChannel) {
ChannelPipeline channelPipeline = socketChannel.pipeline();
// 我们添加自定义的 ServerHandler来处理相关事件
channelPipeline.addLast(new ServerHandler(new IncreaseOrDecreaseLoadOperator(currentServerPath,zkClient)));
}
});
try {
// 绑定端口,开始监听端口的socket请求;
// 当这个方法执行后,ServerBootstrap就可以接受指定端口上的socket连接了
channelFuture = bootStrap.bind(currentServerData.getPort()).sync();
binded = true;
System.out.println(currentServerData.getPort() + ":binded...");
// 使当前线程进入阻塞,当对应的子线程退出后,再往下执行;类似于CountDownLatch的await方法
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
ServerRunnerTest:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* server服务端测试函数
*
* @author JustryDeng
* @date 2018/12/4 15:52
*/
public class ServerRunnerTest {
private static final int SIZE = 2;
/** 连接zookeeper的ip和端口 */
private static final String IP_PORT = "10.8.109.60:2181";
/** /servers节点路径 */
private static final String SERVERS_NODE_PATH = "/servers";
/**
* 程序入口
*/
public static void main(String[] args) throws InterruptedException {
List<Thread> threadList = new ArrayList<>(4);
for(int i = 0; i < SIZE; i++){
final Integer index = i;
Thread thread = new Thread(new Runnable() {
public void run() {
ServerData serverData = new ServerData();
serverData.setLoad(0);
serverData.setHost("127.0.0.1");
serverData.setPort(6000 + index);
ServerLogicImpl server = new ServerLogicImpl(IP_PORT, SERVERS_NODE_PATH, serverData);
server.bind();
}
});
threadList.add(thread);
thread.start();
}
// 为观察其他输出,主线程阻塞一段时间
TimeUnit.SECONDS.sleep(60);
}
}
启动对应的ZooKeeper服务器,开放端口(或关闭防火墙),然后再运行测试函数,控制台输出:
客户端总体说明:
ClientHandler:netty用于处理信道事件的自定义逻辑。
ClientLogicImpl:client服务器的总体逻辑。
LoadBalanceImplProvider:负载均衡算法逻辑的封装。
ClientRunnerTest:客户端的测试类,要与服务端的测试类搭配着进行测试。
客户端各类细节:
ClientHandler:
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
/**
* 创建一个处理器,来处理信道事件
*
* @author JustryDeng
* @date 2018/12/4 11:02
*/
public class ClientHandler extends ChannelHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
ClientLogicImpl:
import com.aspire.zkclient.balance.server.ServerData;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.List;
/**
* client客户端的总体逻辑
*
* @author JustryDeng
* @date 2018/12/4 15:46
*/
public class ClientLogicImpl {
/** 实现负载均衡 方法的封装 */
private final LoadBalanceImplProvider loadBalanceImplProvider;
/** netty相关 */
private EventLoopGroup eventLoopGroup = null;
private Channel channel = null;
/**
* 构造器
*/
public ClientLogicImpl(LoadBalanceImplProvider loadBalanceImplProvider) {
this.loadBalanceImplProvider = loadBalanceImplProvider;
}
/**
* 连接
*/
public void connect(){
try{
// 获取候选节点 并 负载均衡算出 该用哪一个server
List<ServerData> candidateServerDataList = loadBalanceImplProvider.getBalanceItems();
ServerData serverData = loadBalanceImplProvider.balanceAlgorithm(candidateServerDataList);
System.out.println("connecting to " + serverData.getHost() + ":"+serverData.getPort() + ", it's load:" + serverData.getLoad());
// 初始化netty
eventLoopGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline channelPipeline = ch.pipeline();
// 设置信道事件处理器
channelPipeline.addLast(new ClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect(serverData.getHost(),serverData.getPort()).syncUninterruptibly();
channel = channelFuture.channel();
System.out.println("started success!");
}catch(Exception e){
System.out.println("连接异常:" + e.getMessage());
}
}
/**
* 断开连接
*/
public void disConnect(){
try{
if (channel!=null) {
channel.close().syncUninterruptibly();
}
eventLoopGroup.shutdownGracefully();
eventLoopGroup = null;
System.out.println("disconnected!");
}catch(Exception e){
e.printStackTrace();
}
}
}
LoadBalanceImplProvider:
import com.aspire.zkclient.balance.server.ServerData;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* 负载均衡实现 的封装类
*
* @author JustryDeng
* @date 2018/12/4 13:47
*/
public class LoadBalanceImplProvider<T extends ServerData> {
/** ip 端口 */
private final String ipAndPort;
/** /servers节点路径 */
private final String serversNodePath;
/** ZkClient客户端 */
private final ZkClient zkClient;
/** 会话超时时间 */
private static final Integer SESSION_TIME_OUT = 10000;
/** 连接超时时间 */
private static final Integer CONNECT_TIME_OUT = 10000;
/** 节点路径分隔符 */
private static final String NODE_SEPARATOR = "/";
/**
* 构造器
*/
public LoadBalanceImplProvider(String ipAndPort, String serversNodePath) {
this.serversNodePath = serversNodePath;
this.ipAndPort = ipAndPort;
this.zkClient = new ZkClient(this.ipAndPort, SESSION_TIME_OUT, CONNECT_TIME_OUT, new SerializableSerializer());
}
/**
* 获取列表中的元素 升序排序后的第一个
*/
protected T balanceAlgorithm(List<T> items) {
if (items.size() > 0) {
Collections.sort(items);
return items.get(0);
} else {
return null;
}
}
/**
* 返回/servers节点下的所有子节点 的 数据 集合
*/
protected List<T> getBalanceItems() {
List<T> serverDataList = new ArrayList<>();
// 获取/servers节点的所有子节点
List<String> children = zkClient.getChildren(this.serversNodePath);
for (int i = 0; i < children.size(); i++) {
// 这里将取出来的数据反序列化为T,那么注意:在网节点里面存数据的时候,对象也应该为T
T serverData = zkClient.readData(serversNodePath + NODE_SEPARATOR + children.get(i));
serverDataList.add(serverData);
}
return serverDataList;
}
}
ClientRunnerTest:
import com.aspire.zkclient.balance.server.ServerData;
import java.util.ArrayList;
import java.util.List;
/**
* client客户端测试函数
*
* @author JustryDeng
* @date 2018/12/4 15:52
*/
public class ClientRunnerTest {
private static final int SIZE = 5;
/** 连接zookeeper的ip和端口 */
private static final String IP_PORT = "10.8.109.60:2181";
/** /servers节点路径 */
private static final String SERVERS_PATH = "/servers";
/**
* 程序入口
*/
public static void main(String[] args) {
List<Thread> threadList = new ArrayList<>(SIZE);
final List<ClientLogicImpl> clientList = new ArrayList<>(8);
// 负载均衡实现 的封装类
final LoadBalanceImplProvider<ServerData> balanceProvider = new LoadBalanceImplProvider<>(IP_PORT, SERVERS_PATH);
try {
for (int i = 0; i < SIZE; i++) {
Thread thread = new Thread(new Runnable() {
public void run() {
ClientLogicImpl clientLogicImpl = new ClientLogicImpl(balanceProvider);
clientList.add(clientLogicImpl);
try {
clientLogicImpl.connect();
} catch (Exception e) {
e.printStackTrace();
}
}
});
threadList.add(thread);
thread.start();
//延时
Thread.sleep(2000);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//关闭客户端
for (ClientLogicImpl clientLogicImpl : clientList) {
try {
clientLogicImpl.disConnect();
} catch (Exception e) {
e.printStackTrace();
}
}
//关闭线程
for (Thread thread : threadList) {
// 将线程置为打断状态
thread.interrupt();
try {
// 若果线程处于打断状态,那么当其 调用wait()、join()、sleep()方法时,会抛出InterruptedException异常
thread.join();
System.out.println("线程" + thread.getName() + "关闭了!");
} catch (InterruptedException e) {
System.out.println("线程" + thread.getName() + "关闭了!");
}
}
}
}
}
启动对应的ZooKeeper服务器,开放端口(或关闭防火墙),然后再启动服务端的测试函数,最后再运行客户端的测试函数,控制台输出:
注:两个server的ip都写死的为127.0.0.1,所以主要看的是端口以及负载量。
由此可见,负载均衡成功!
所有zookeeper示例内容有(代码链接见本人末):
^_^ 如有不当之处,欢迎指正
^_^ zookeeper示例代码托管链接
https://github.com/JustryDeng/CommonRepository
^_^ 参考书籍
《ZooKeeper-分布式过程协同技术详解》
Flavio Junqueira Benjamin Reed 著, 谢超 周贵卿 译
^_^ 学习视频(推荐观看学习)
极客学院ZooKeeper相关视频
^_^ 本文已经被收录进《程序员成长笔记(三)》,笔者JustryDeng
最后
以上就是糟糕便当最近收集整理的关于ZooKeeper实战之ZkClient客户端实现负载均衡相关概念:准备工作:在pom.xml中引入相关依赖ZooKeeper负载均衡实现示例:的全部内容,更多相关ZooKeeper实战之ZkClient客户端实现负载均衡相关概念:准备工作:在pom内容请搜索靠谱客的其他文章。
发表评论 取消回复