概述
zookeeper
介绍
分布式是指多台不同的服务器中部署不同的服务模块,通过远程调用协同工作,对外提供服务,由于远程调用会出现网络故障等问题,如何保持数据一致性和可用性则成为要解决的问题。而zookeeper是一个分布式服务协调组件,是一个高性能的分布式数据一致性的解决方案。
特性
一致性:数据一致性,数据按照顺序分批入库。
原子性:事务要么成功要么失败,不会局部化
单一视图:客户端连接集群中的任一zk节点,数据都是一致的
可靠性:每次对zk的操作状态都会保存在服务端
实时性:客户端可以读取到zk服务端的最新数据
安装及简单运行
去官网下载压缩包到本地解压,然后把zoo_sample.cfg改为zoo.cfg并按实际情况填写配置文件,在bin目录下运行启动脚本
zookeeper主要目录结构
bin:主要的一些运行命令
conf:存放配置文件
contrib:附加的一些功能
dist-maven:mvn编译后的目录
docs:文档
lib:需要依赖的jar包
recipes:案例demo代码
src:源码
zoo.cfg配置
tickTime:用于计算的时间单位。比如session超时:N*tickTime
initLimit:用于集群,允许从节点连接并同步到master节点的初始化连接时间,以tickTime的倍数表示
syncLimit:用于集群,master主节点与从节点之间发送消息,请求和应答时间长度。(心跳机制)
dataDir:必须配置,数据目录
dataLogDir:日志目录,如果不配置会和dataDir同一个公用目录
clientPort:连接服务器的端口,默认2181
zookeeper基本数据模型
是一个树形结构,类似linux的文件目录结构
每个节点称为znode,它可以有子节点,也可以有数据
节点分为临时节点和永久节点,临时节点在客户端断开后失效
每个zk节点都有各自的版本号,可以通过命令行来显示节点信息
每当节点数据发生变化,那么该节点的版本会累加(乐观锁)
删除/修改过时节点,版本号不匹配会报错
每个zk节点存储的数据不宜过大,几k即可
节点可以设置权限acl,可以通过权限来限制用户的访问、区分环境等等,acl这里就不做介绍了
master节点选举,主节点挂了以后,从节点就会接受工作,并且保证这个节点是唯一的,这也是所谓首脑模式,从而保证我们的集群是高可用的
常见运用
统一配置文件管理,即只需要部署一台服务器,则可以把相同的配置文件同步更新到其他所有服务器,此操作在云计算中用的特别多。
发布与订阅,类似于消息队列,dubbo发布者把数据存在znode上,订阅者读取这个数据
提供分布式锁,分布式环境中不同进程之间争夺资源,类似于多线程中的锁
集群管理,保证集群中数据的强一致性
命令
ls 子节点
ls2 ls+stat
stat 状态信息
czxid:节点被创建的事务ID
ctime: 节点创建时间
mzxid: 最后一次被更新的事务ID
mtime: 节点修改时间
pzxid:子节点列表最后一次被更新的事务ID
cversion:子节点的版本号
dataversion:数据版本号
aclversion:权限版本号
ephemeralOwner:用于临时节点,代表临时节点的事务ID,如果为持久节点则为0
dataLength:节点存储的数据的长度 numChildren:当前节点的子节点个数
get 数据+stat
session基本原理
客户端与服务端之间的链接存在会话
每个会话都会可以设置一个超时时间
心跳结束,session则过期
session过期,则临时节点znode会被抛弃
心跳机制:客户端向服务端的ping包请求
create:创建 -e临时节点 -s 有序节点
set:修改
delete:删除
watcher机制:
针对每个节点的操作,都有要给监督者->watcher
当监控的某个对象(znode)发生了变换,则触发了watcher事件
zk中的watcher是一次性的,触发后立即销毁
父节点,子节点 增删改都能够触发其watcher
针对不同类型的操作,触发的watcher事件也不同:
节点创建、删除、数据变化事件
注意watcher只可以使用一次,stat、get、ls、ls2后面均可以加watcher进行监听。ls为父节点设置watcher,创建、删除子节点触发:NodeChildrenChanged,修改不会触发
关于zookeeper更详细介绍可以看看这篇文章https://www.cnblogs.com/luxiaoxun/p/4887452.html
下面开始编码的环节
这里的格式也是像前一篇文章一样 {interface:{url:impl}},而在zookeeper上面的格式为 /interface/url:data(具体实现类的名字),
而心跳机制的实现,是通过服务提供者定时向注册中心发送本机地址(心跳数据包),而注册中心的监控则维持一个channelId和具体地址的map,并且通过IdleHandler监听空闲事件,到达一定的空闲次数则认为不活跃,当不活跃时(这里的不活跃条件是5分钟内3次以上没有发送心跳包),zookeeper删除相应的url节点,但后续的逻辑没有继续做,比如:服务提供方在网络稳定后尝试重新发送心跳包,注册中心通过一定的计算(比如在一定时间内的心跳发送率达到一定的值)认为该ip可用了,就尝试重新向zookeeper注册该ip,而且也可以在本地维持一个map存放接口信息,并添加监听事件去更新可用列表,可以优化的点还很多,这里暂时先接入zookeeper并简单地演示通过心跳来移除不稳定服务
这里采用curator作为zookeeper的客户端
首先编写关于zookeeper的业务操作
/**
* @author lulu
* @Date 2019/11/18 21:17
* 负责实现注册中心具体的业务功能
*/
public class ZkRegister {
//{接口:{URL:实现类名}},这里可以为每个接口建立子节点,节点名为url地址,值为className
private static CuratorFramework client = null;
//通过静态代码块初始化
static{
init();
}
//初始化链接客户端
private static void init() {
RetryPolicy retryPolicy = new RetryNTimes(ZKConsts.RETRYTIME, ZKConsts.SLEEP_MS_BEWTEENR_RETRY);
client = CuratorFrameworkFactory.builder()
.connectString(ZKConsts.ZK_SERVER_PATH)
.sessionTimeoutMs(ZKConsts.SESSION_TIMEOUT_MS).retryPolicy(retryPolicy)
.namespace(ZKConsts.WORK_SPACE).build();
client.start();
}
//注册接口、对应服务ip及其实现类
public static void register(String interfaceName, URL url, Class implClass) {
try {
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(getPath(interfaceName, url.toString()), implClass.getCanonicalName().getBytes());
} catch (Exception e) {
e.printStackTrace();
}
}
//hostname:port,遍历所有interface节点,把对应的url节点去掉
public static void remove(String url) {
try {
List<String> interfaces = client.getChildren().forPath("/");
for (String anInterface : interfaces) {
List<String> urlList = client.getChildren().forPath(getPath(anInterface));
for (String s : urlList) {
if (s.equals(url)) {
client.delete().forPath(getPath(anInterface, url));
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
//获取具体实现类的类名,这里还可以添加一个内部缓存,不用每次都去访问,
public static String get(String interfaceName, URL url) {
String res = null;
try {
byte[] bytes = client.getData().forPath(getPath(interfaceName, url.toString()));
res = new String(bytes);
} catch (Exception e) {
e.printStackTrace();
}
return res;
}
//通过接口名获取具体的实现类
public static URL random(String interfaceName) {
try {
List<String> urlList = client.getChildren().forPath(getPath(interfaceName));
String[] url = urlList.get(0).split(":");
return new URL(url[0], Integer.valueOf(url[1]));
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
//生成节点路径
private static String getPath(String... args) {
StringBuilder builder = new StringBuilder();
for (String arg : args) {
builder.append("/").append(arg);
}
return builder.toString();
}
public static void closeZKClient() {
if (client != null) {
client.close();
}
}
}
心跳监听,在提供服务方使用
package com.gdut.rpcstudy.demo.register.zk.heartbeat;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import java.util.Random;
import java.util.concurrent.*;
/**
* @author lulu
* @Date 2019/11/18 23:30
*/
public class BeatDataSender {
private BeatDataSender() {
}
public static void send(String url, String hostName, Integer port) {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
try {
Bootstrap bootstrap = new Bootstrap();
ChannelFuture connect = bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new StringEncoder())
.addLast(new StringEncoder())
.addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("由于不活跃次数在5分钟内超过2次,链接被关闭");
}
});
}
})
.connect(hostName, port).sync();
System.out.println("心跳客户端绑定" + "hostname:" + hostName + "port:" + port);
//这里只是演示心跳机制不活跃的情况下重连,普通的做法只需要定时发送本机地址即可
service.scheduleAtFixedRate(() -> {
if (connect.channel().isActive()) {
int time = new Random().nextInt(5);
System.out.println(time);
if(time >3){
System.out.println("发送本机地址:" + url);
connect.channel().writeAndFlush(url);
}
}
}, 60, 60, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
}
}
}
注册中心检查心跳
/**
* @author lulu
* @Date 2019/11/18 22:17
* 注册中心心跳检查服务器,通过查看心跳来查看各server是否存活
*/
public class ZkServer {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
ConcurrentHashMap<String,String> ChannalIdUrlMap=new ConcurrentHashMap();
try {
bootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
//存放已完成三次握手的请求的队列的最大长度
.option(ChannelOption.SO_BACKLOG, 128)
//启用心跳保活
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//string编码器
ch.pipeline().addLast(new StringEncoder())
//string解码器
.addLast(new StringDecoder())
//监听链接空闲时间
.addLast(new IdleStateHandler(0,0,60))
//hearbeat处理器
.addLast(new HeartbeatHandler(ChannalIdUrlMap));
}
});
//bind初始化端口是异步的,但调用sync则会同步阻塞等待端口绑定成功
ChannelFuture future = bootstrap.bind("127.0.0.1",8888).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
在nettyserver的start方法添加发送心跳
BeatDataSender.send(hostName + ":" + port, "127.0.0.1", 8888);
心跳监听处理逻辑,用于监听心跳服务器的处理,需要和监听链接空闲时间的IdleHandler一起使用,复写eventTriger方法当链接符合给的空闲条件时,对其进行逻辑处理
/**
* @author lulu
* @Date 2019/11/18 22:29
*/
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
//维护channelId和具体地址的map,当发生变化时对其进行删除
private static ConcurrentHashMap<String, String> channelUrlMap;
//活跃次数
private int inActiveCount = 0;
//开始计数时间
private long start;
public HeartbeatHandler(ConcurrentHashMap<String, String> map) {
HeartbeatHandler.channelUrlMap = map;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String url = msg.toString();
String id = ctx.channel().id().asShortText();
System.out.println("收到channelId:" + id + "发来信息:" + url);
if (channelUrlMap.get(id) == null) {
channelUrlMap.put(id, url);
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent state = (IdleStateEvent) evt;
if (state.state().equals(IdleState.READER_IDLE)) {
System.out.println("读空闲");
} else if (state.state().equals(IdleState.WRITER_IDLE)) {
System.out.println("写空闲");
}
//在一定时间内读写空闲才会关闭链接
else if (state.state().equals(IdleState.ALL_IDLE)) {
if (++inActiveCount == 1) {
start = System.currentTimeMillis();
}
int minute = (int) ((System.currentTimeMillis() - start) / (60 * 1000))+1;
System.out.printf("第%d次读写都空闲,计时分钟数%d%n", inActiveCount,minute);
//5分钟内出现2次以上不活跃现象,有的话就把它去掉
if (inActiveCount > 2 && minute <= 5) {
System.out.println("移除不活跃的ip");
removeAndClose(ctx);
} else {
//重新计算
if (minute >= 5) {
System.out.println("新周期开始");
start = 0;
inActiveCount = 0;
}
}
}
}
}
//通过ID获取地址,并删除zk上相关的
private void removeAndClose(ChannelHandlerContext ctx) {
String id = ctx.channel().id().asShortText();
String url = channelUrlMap.get(id);
//移除不活跃的节点
ZkRegister.remove(url);
channelUrlMap.remove(id);
ctx.channel().close();
}
//当出现异常时关闭链接
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
removeAndClose(ctx);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().id().asShortText() + "注册");
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().id().asShortText() + "注销");
}
}
由于存放的是classImplName,所以要在handler处理逻辑里加载该类,后面接入spring后可以从一个服务注册的类上获取相应的实现类
String serviceImplName= ZkRegister.get(invocation.getInterfaceName(),new URL(hostAddress,8080));
Class<?> serviceImpl = Class.forName(serviceImplName);
Method method=serviceImpl.getMethod(invocation.getMethodName(),invocation.getParamsTypes());
客户端获取url
URL url= ZkRegister.random(interfaceClass.getName());
此致更改完毕,地址为https://github.com/97lele/rpcstudy/tree/withzk,接下来是把服务端和客户端整合到spring
最后
以上就是狂野书包为你收集整理的基于netty、zookeeper手写RPC框架之二——接入zookeeper作为注册中心,添加心跳机制的全部内容,希望文章能够帮你解决基于netty、zookeeper手写RPC框架之二——接入zookeeper作为注册中心,添加心跳机制所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复