概述
上一篇使用自定义xsd定义了发布、订阅的标签
这一篇就是用标签的解析并想zk上注册服务
废话不多说,直接上注册中心的代码
package com.kaer.rpc.netty.register.zk;
import java.util.Iterator;
import java.util.concurrent.ConcurrentMap;
import org.apache.curator.framework.CuratorFramework;
...
import org.slf4j.LoggerFactory;
import com.kaer.rpc.demo.config.ProviderConfig;
/**
* Zookeeper注册中心处理类
*
* @author kaer
*
*/
public class ZkRegistryCenter {
public static final Logger logger = LoggerFactory.getLogger(ZkRegistryCenter.class);日志打印
private static String basePath = "/kaer-rpc";//zk是以目录的形式创建节点
private static CuratorFramework client;//Curator操作zk的客户端
private final ConcurrentMap<String, ProviderConfig> services = Maps.newConcurrentMap();//服务的集合,暂时只发布一个服务,后面可扩展
private static final ConnectionStateListener connectionStateListener;//监听器对zk的状态实时处理,暂不实现
static {
connectionStateListener = new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
// 状态的变化处理
logger.info("zkClient state change to " + newState);
}
};
}
public static void start(String zookeeperUrl) {
//序列化工具
//初始化连接客户端,即连接到了本地zk的127.0.0.1:2181
client = CuratorFrameworkFactory.newClient(zookeeperUrl, new ExponentialBackoffRetry(1000, 3));
client.getConnectionStateListenable().addListener(ZkRegistryCenter.connectionStateListener);
client.start();//zk客户端启动
try {
//zk是以目录的形式存储数据,而且顶向下一节一节实现的
if((Stat) client.checkExists().forPath(basePath) == null) {
((ACLBackgroundPathAndBytesable<?>) client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)).forPath(basePath);
}
} catch (Exception e) {
e.printStackTrace();
}
logger.info("start service register......");
}
//注册服务,这个发生的时间是在容器的bean加载,还记得之前自定义的标签么,有对provider标签的解析类ProviderBean
public static void registerService(String serviceId, String alias, byte[] info) {
//创建目录节点,类似kaer-rpc/{api},数据目录为kaer-rpc/{api}/data
String nodePath = ZKPaths.makePath(basePath, serviceId);
String fullPath = ZKPaths.makePath(nodePath, "data");
//存放目录数据
try {
Stat stat = (Stat) client.checkExists().forPath(nodePath);
if(stat == null) {
((ACLBackgroundPathAndBytesable<?>) client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)).forPath(nodePath);
((ACLBackgroundPathAndBytesable<?>) client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)).forPath(fullPath);
}else {
stat = (Stat) client.checkExists().forPath(fullPath);
if(stat == null) {
((ACLBackgroundPathAndBytesable<?>) client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)).forPath(fullPath);
}
}
client.setData().forPath(fullPath, info);//这里的info使用Json序列化的,具体要见ProviderConfig
} catch (Exception e) {
e.printStackTrace();
}
}
//删除服务,暂不实现
public void unRegisterService(String serviceId) {
String path = basePath + "serviceId";
try {
((ChildrenDeletable) client.delete().guaranteed()).forPath(path);
} catch (Exception e) {
e.printStackTrace();
logger.error("Could not unregister instance: s%", serviceId);
}
}
public void close() {
ExceptionAccumulator accumulator = new ExceptionAccumulator();
Iterator<ProviderConfig> arg2 = this.services.values().iterator();
while (arg2.hasNext()) {
ProviderConfig service = (ProviderConfig) arg2.next();
unRegisterService(service.getApi());
}
client.getConnectionStateListenable().removeListener(ZkRegistryCenter.connectionStateListener);
CloseableUtils.closeQuietly(client);
accumulator.propagate();
}
}
由上一篇中分别对三个自定义标签进行了解析处理
下面是注册服务的处理逻辑
package com.kaer.rpc.demo.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
import com.kaer.rpc.netty.register.zk.ZkRegistryCenter;
/**
* 服务提供者配置
*
* @author kaer
*
*/
public class ProviderConfig {
public static Logger logger = LoggerFactory.getLogger(ProviderConfig.class);
private String api;
private String mapper;
private String alias;
//发布,假设模拟一个服务
protected void doExport() {
logger.info("生产者信息:端口[" + api + "],映射[" + mapper + "],别名[" + alias + "]。");
RpcProviderConfig rpcProviderConfig = new RpcProviderConfig();
rpcProviderConfig.setApi(api);
rpcProviderConfig.setMapper(mapper);
rpcProviderConfig.setAlias(alias);
rpcProviderConfig.setHost("127.0.0.1");
rpcProviderConfig.setPort(9999);
//注册生产者
// long count = RedisRegistryCenter.registryProvider(api, alias, JSON.toJSONString(rpcProviderConfig));
ZkRegistryCenter.registerService(api, alias, JSON.toJSONBytes(rpcProviderConfig));
logger.info("注册生产者:{} {} {}", api, alias, null);
}
public String getApi() {
return api;
}
public void setApi(String api) {
this.api = api;
}
public String getMapper() {
return mapper;
}
public void setMapper(String mapper) {
this.mapper = mapper;
}
public String getAlias() {
return alias;
}
public void setAlias(String alias) {
this.alias = alias;
}
}
消费者的配置(ConsumerConfig)类似提供者的这个配置,暂不实现。
/**
* 实现服务提供的依赖注入
* @author kaer
*
*/
public class ProviderBean extends ProviderConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 发布
doExport();
}
}
其中有需要依赖的包(Curator对ZK的支持):
<!-- Curator对ZK的支持 -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
本地启动zookeeper
测试类如下:
public class ApiTest {
@SuppressWarnings("resource")
public static void main(String[] args) throws UnknownHostException, IOException {
String[] configs = { "rpc/test.xml" };
new ClassPathXmlApplicationContext(configs);
}
}
执行完了,通过可视化界面去看看是否注册成功(当然也可以用命令去操作,zkCli.exe)。
这里用到了zkUI的一个工具,需要的在附件中获取。
可以登陆进去可以看到有代码中创建的目录了
一层一层点进去,可以看到最终注册上去的服务的信息,在实际开发中,应获取应用本机的ip和端口,再附加服务的唯一性数据,订阅就是个获取并解析然后再进行netty通讯的步骤
本文结束,后面使用consul进行服务注册玩玩
附件:zkUI可视化工具
最后
以上就是大方画笔为你收集整理的【RPC系列】5、向Zookeeper上注册服务(用netty、zk手写RPC第二步)的全部内容,希望文章能够帮你解决【RPC系列】5、向Zookeeper上注册服务(用netty、zk手写RPC第二步)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复