概述
github连接: https://github.com/mcl973/Netty_ZooKeeper_Rpc
项目脑图
主要实现的功能
1.Server端实现自动检测需要暴露的接口(使用注解),并实现自动注册
2.Cleint实现自动的拉取收数据,并构建一个完善的代理机制,使得用户可以直接使用接口中的方法,就像是使用本地的方法来实现远程的方法。
3.方法的参数不能包含没有实现序列化的可变对象或是服务器自定义的一些类,其他的均可。
4.在IPPort中不再使用Bio,使用Netty作为底层的连接,添加了tcp的粘包拆包的功能(新添)。
5.添加了一个MyDeEncoderProtocol,用于支持多种分隔器,如自定义分隔符分割器(如分隔符为#####),自适应长度分割器(每一次接受一个int长度,表示要接收的数据的长度,然后开始接收数据)
第三点证明实验
具体的可以通过一下的实现证明这一点:
//需要传输的类
public class Srialtest implements Serializable {
private long serialVersionUID = 1L;
private List<Integer> list;
public void printlist(List<Integer> list){
this.list = list;
}
public void printlist(){
for (Integer integer : this.list) {
System.out.println(integer);
}
}
}
//序列化类
public class MySerial {
public static Object ByteToObject(byte[] bytes) throws Exception{
ByteInputStream byteInputStream = new ByteInputStream(bytes,bytes.length);
ObjectInputStream objectInputStream=null;
objectInputStream = new ObjectInputStream(byteInputStream);
return objectInputStream.readObject();
}
public static byte[] ObjectToByte(Object object) throws IOException {
ByteOutputStream byteOutputStream = new ByteOutputStream();
ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteOutputStream);
objectOutputStream.writeObject(object);
return byteOutputStream.getBytes();
}
}
//创建对象
Srialtest st = new Srialtest();
ArrayList<Integer> list = new ArrayList<>();
list.add(1);
list.add(2);
list.add(3);
/将这个应用传递给st,其实就是付给了st中对的list实例变量。
st.printlist(list);
//序列化st,已保存数据
byte[] bytes = MySerial.ObjectToByte(st);
//删除list中的数据,防止和st中list中的数据混淆,不知道是哪一个实例的数据
list.remove(0);
list.remove(0);
list.remove(0);
list = null;
//清除list
System.gc();
//反序列化st
Srialtest o = (Srialtest)MySerial.ByteToObject(bytes);
//打印其中的数据
o.printlist();
结果:
1
2
3
由此证明在调用远程函数时将参数序列化并传递过去,在远程反序列化出来其实是可以提取出数据的,所以不用担心这个问题。
扯到底层原理上就是在序列化一个对象是会保存当前对象的值,以及对象中各种引用所指向的数据,这个从逻辑上是说的通的,其实保存的就是当前对象的一个快照,快照里当然需要将对象所需要的数据都要保存一边,不然恢复的时候就会出现问题。
首先请看技术架构:
ZooKeeper的主要内容
1.ZooKeeper的安装,配置
ZooKeeper的安装和配置请看如下链接:
https://www.cnblogs.com/zhaoshizi/p/12105143.html
2.将zk集群启动了后就可以开始工作了,首先就是有一个父节点就是:/ZK_Netty_Rpc,这是一个主节点,服务器需要在这个节点下创建节点,主要是以自己的ip:port为命名的方式来创建永久节点,如:127.0.0.1:8888,然后就可以在这个节点下书写需要暴露的接口的代码,主要以字符串的形式上传。
3.客户端运行后,第一步需要做的就是想zk集群的父节点下拉取所有的数据,并为之注册监听器,以防后续服务节点的数据的更新。
服务端(Server)的主要内容
1.构建起一个ioc容器,ioc容器里存放的是被自定义注解Export标注的需要暴露的接口的实现类(反射创建),key为接口名,value为实现类。
2.向zk集群在父节点下创建一个以自己的ip:port未命名方式的节点,并将ioc容器里的所有key所代表的接口以稳健的方式读取出来,以#####为分隔符拼接成一个大的文本然后将其上传到zk集群上去。
3.开启Netty的对外服务。
自定义一对编解码器,其中handler的反省使用的是自定义的消息类MessageForNetty,这里类里面有包含了两个自定义的消息类,分别是MessgaePrepare(client到server)、MessageResult(server到client)。
4.其中MyDecoder就是将字节流反序列化成MessageForNetty,MyEncoder是将MessageForNetty序列化成字节流。
5.handler收到了数据后会开启一个线程用来处理数据,主要的路程就是如下:
5.1 获取MesageForNetty中的消息体MessagePrepare。
5.2 取出其中的类名、函数名、参数列表、传过来的实际参数值。
5.3 通过类名在ioc中找到具体的实例。
5.4 通过函数名和参数列表找到具体的函数。
5.5 通过Method.invoke(Object,args[])的方式执行函数,并获取返回值,然后封装在MessageResult中。
5.6 将MessageResult通过建立的连接发送回client。
6.添加了一个tcp的粘包拆包的处理器,使用自定义的分隔符“#####”来作为一个数据包的结束符。
客户端(Client)的主要内容
1.先是通过zk集群获取父节点下的所有子节点。
2.通过遍历所有的自己节点拿取数据。
3.每一个节点的数据处理方式都是一样的
3.1按照#####的方式分给字符串,得到每一个接口的字符串,然后为其填充package,使其变得完整,最后将其变现成接口文件。
3.2 将之前的接口文件略做修改。
如package的修改,添加Impl,修改其名字添加Impl,在类名的后面添加implemants 接口名。最后按照每一个函数的返回值将每一个函数的分号变成{ return (返回值)0;},这里只是简单地实现下。
3.3 使用动态代理的方式代理刚才的实现类,在自实现的Invocationhandler中的invoke里实现具体的与server通信的功能,如下:
3.3.1 首先获取参数信息如:类名、函数名、参数列表,invoke里的args即从调用者实际传过来的参数。
3.3.2 将上诉信息封装成MessagePrepare并填充至MessageForNetty中并序列化后发送给Server。
3.3.3 收到了Server返回的数据后,先反序列化成MessageForNetty,然后获取MessageResult,并提取出结果。
3.3.4 将收到的结果通过return返回。
4.客户端采用的也是Netty来与服务器通信。
这里为每一个服务器都创建了一个连接,每一个服务器里的所有的暴露接口使用同一个连接。在客户端维持着一个ConnectToServerPool,这个类存放的主要是每一个连接对应者的Netty的客户端,客户端继承了继承了Thread,所以这里将其放置在了线程池中取处理,在IPPort中直接通过该类就可以拿取到其对应的Netty的客户端,由于Netty的客户端和IPPort是相互隔离的所以不会出现之前的卡顿情况。在Netty的客户端使用了一个:
ConcurrentHashMap<String,SynchronousQueue<Object>> mapqueue = new ConcurrentHashMap<>();
用来维持一个“接口名.函数名”–>SynchronousQueue映射,这样就可以在多线程调用下不会出现服务器返回的结果序列被多个等待的函数错误接收的情况。
5.在Netty的客户端使用一个tcp的粘包拆包的处理器,使用自定义的分隔符“#####”来作为一个数据包的结束符。
补充说明
其中的Info、ZK是Client和Server共有的,Extra、Service、Serrver、StartRun是Server独有的,Client是client独有的。
开启的顺序是:
开启ZK集群,开启三个(使用ubuntu的命令行版本)
StartRun/StartRun 开启服务端。
Client/Sevice/test 开启客户端。
细节讲解
Server
编解码器
ZK_Netty_Rpc.Server.DeEncoder
/**
* 自定义解码器
* 将字节数组解码成MessageForNetty类
*/
public class MyDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
//获取字节流
byte[] array = new byte[msg.readableBytes()];
msg.readBytes(array);
//反序列化
MessageForNetty messageForNetty = (MessageForNetty)SerializableAndUnSerializable.ByteToString(array);
out.add(messageForNetty);
}
}
/**
* 自定义编码器
* 将MessageForNetty序列化成字节数组
*/
public class MyEncoder extends MessageToByteEncoder<MessageForNetty> {
@Override
protected void encode(ChannelHandlerContext ctx, MessageForNetty msg, ByteBuf out) throws Exception {
//类的序列化
byte[] bytes = SerializableAndUnSerializable.StringToByte(msg);
out.writeBytes(bytes);
out.writeBytes(argsInfo.FrameSplit.getBytes());
}
Server端的处理细节
ZK_Netty_Rpc.Server.HandlerRpcMethod.ReceiveMessageAndExcute
/**
* 将本地的字符串转换为method的形式
* 类名,函数名,返回值,参数具体类型,参数值这五项。
* 目前的参数仅支持八大基本类型和string及其数组形式
* @return ffahuie
*/
public void getMethodFromMessageAndExcute() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, IOException {
/**
* 获取参数
*/
String classname = Message.getInterfaceName();
String methodname = Message.getMethoedName();
Class[] paragrames = Message.getMethodParagrames();
Object[] realParagrames = Message.getRealMethodParagrames();
/**
* 这里先留下来,等ioc容器起来再说
*/
ConcurrentHashMap<String, Object> myIoc = AbstractBean.MyIoc;
String[] split = classname.split("\.");
String s = AbstractBean.ClassnameToReferenceName.get(split[split.length-1]);
/**
* 通过类名来找到具体的实现类的实例
*/
Object target = myIoc.get(s);
/**
* 先对参数类型做一波解析,得到函数和具体的实际的参数值,并执行
*/
Class<?> clazz = target.getClass();
Class[] paragrames1 = paragrames;
Method method = clazz.getMethod(methodname, paragrames1);
Object invoke = method.invoke(target, realParagrames);
// System.out.println(invoke);
/**
* 组装答案,并传递给远程的客户端。
*/
MessageResult mr = new MessageResult(classname,methodname,invoke);
MessageForNetty messageForNetty = new MessageForNetty();
messageForNetty.setMessageResult(mr);
/**
* 将结果返还给client
*/
channel.writeAndFlush(messageForNetty);
}
容器的处理
ZK_Netty_Rpc.Extras.MyAutoCollectWork.CreateBean
public void createBean(){
for (String s : FileName) {
if (s.contains(".class")){
String classanme = s.split(".class")[0];
try {
Class<?> aClass = Class.forName(classanme);
//只选择被Export注释的class来创建实例
if (aClass.isAnnotationPresent(Export.class)){
Class<?> aClass1 = Class.forName(classanme + argsInfo.aftername);
MyIoc.put(aClass.getName(),aClass1.newInstance());
String[] split = aClass.getName().split("\.");
//类的名字到类的权限定名的映射
ClassnameToReferenceName.put(split[split.length-1],aClass.getName());
}
} catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
e.printStackTrace();
}
}else continue;
}
}
Service函数接口编写规范
package ZK_Netty_Rpc.Service;
import ZK_Netty_Rpc.Extras.MyAnnonation.Export;
/**
* 一个接口对应一个实现类,不可多个对用,主要是用来实现rpc的远程调用的
* 上面的规范仅限于这个借口里的方法,其他的接口不做规范
*/
@Export
//下面的#####分隔符在argsInfo中有了明确的定义了,名为 FileSplit="#####"
//#####
public interface show {
public String showmore();
public int state();
public void setname(String name);
}
编写的时候一定要在@Export和具体的接口名之间加上一个注释//#####,如:
@Export
//#####
public interface test{
.......
}
这样是方便程序的切分,当然用户可以自定义分隔符,但是在这里改动的同时也需要在ZK_Netty_Rpc.Info.argsInfo文件中改动FileSplit参数的值为你所定义的分隔符。
如自定义的分隔符为abcde,那么就需要将ZK_Netty_Rpc.Info.argsInfo的FileSplit的值改为abcde。代码如下:
@Export
//abcde
public interface test{
.......
}
向ZK集群注册数据
ZK_Netty_Rpc.Extras.MyAutoCollectWork.ExportInterfaces
/**
* 将接口暴露给zk集群,也就是向zk集群注册
* @throws Exception
*/
public void export() throws Exception {
// 获取项目路径
String basepath = argsInfo.basepath;
String javaFileToString = "";
/**
* 获取接口的代码
*/
for(Map.Entry<String,Object> maps:MyIoc.entrySet()){
String s = maps.getKey().replaceAll("\.", "/");
String replace = s.replace(argsInfo.Projectname, "");
javaFileToString = getJavaFileToString(basepath+replace+".java");
javaFileToString+="#####";
}
distributeZK.CreateNodeAndGetPersistentNodeName(argsInfo.path+"/"+argsInfo.hostname+":"+argsInfo.port,javaFileToString);
}
Cleint的细节
来取数据并创建接口文件、接口的简单实现类的文件和生成代理类并放入rpc容器中
ZK_Netty_Rpc.Client.GetInterfaces.GetInterfaces
/**
* 获取节点下的所有的子节点,没有孙辈的节点
* 然后获取所有子节点的信息
* 1.创建接口文件
* 2.创建java实现类文件
* 3.jdk代理java实现类文件
* 4.将其装载到容器中方便使用
* @throws IOException
*/
public GetInterfaces() throws Exception {
// 设置监听器
distributeZK.setChildrenWatch(basepath);
// 获取节点的子节点列表
List<String> nodeChildern = distributeZK.getNodeChildern(basepath);
// 挨个的获取每一个子节点下的数据,并将其变现为java文件,并加载到内存中
for (String s : nodeChildern) {
String data = distributeZK.SelectNodeDataByName(basepath + "/" + s);
// 分割数据,获取到最有用的数据
String[] split = data.split("#####");
// 开始获取java文件的url地址
String javafileurl = argsInfo.basepath+argsInfo.ClientRpcMethodPath+"/";
// 没有就创建路径
File file = new File(javafileurl);
if (!file.exists())
file.mkdirs();
for (int i = 0; i < split.length; i++) {
String str = split[i];
/**
* 获取文件名
*/
String[] s1 = str.split("\{")[0].split(" ");
String classname = s1[s1.length-1];
// 添加包
String packages = argsInfo.ClientRpcMethodPath.replaceAll("/", "\.");
String javafile = "package "+argsInfo.Projectname+packages+";n"+str;
/**
* 创建接口及其实现类
*/
cjf.CreateJavaFile(javafileurl+classname+".java",javafile,classname,javafileurl);
/**
* 1.拿取实现类的权限定类名
* 2.通过类名反射创建对象
* 3.jdk动态代理这个类
* 4.将ip,port,代理类封装带一个IPPort中,再将这个加入到一个容器当中。
* 现在java文件和接口文件都已经建立好了,那么现在就可以开始进行动态代理了,隐藏底层的具体的业务逻辑了
*/
// 1
String javaimplname = argsInfo.Projectname+packages+"."+argsInfo.aftername+"."+classname+argsInfo.aftername;
// 2
Class<?> aClass = Class.forName(javaimplname);
Object o = aClass.newInstance();
// 3
MyInvokeHandler myInvokeHandler = new MyInvokeHandler(o);
Object newinstance = GetNewProxyInstance.newinstance(o, myInvokeHandler);
/**
* 下面就是将这个装载到容器中其,方便进一步的使用。
* 这个容器不能和服务器端的容器混用。
*/
// 4
channelManager.SetAndGetSocket(s,classname,newinstance);
}
}
}
IPPort
/**
* port 端口号
* ip ip地址
* Object 远程对象的动态代理的实例
* BioSocket 一个用于连接远程的socket,在执行函数时才会具体的执行业务
*/
public class IPPort{
private int port;
private String ip;
private BioSocket socket;
private Object proxyinstance;
public IPPort(int port,String ip,Object proxyinstance){
this.port = port;
this.ip = ip;
this.proxyinstance = proxyinstance;
// this.socket = new BioSocket(ip,port);
ConnectToServerPool.addConnextLinkForIPAndPort(ip,port);
}
public int getPort() {
return port;
}
public String getIp() {
return ip;
}
public BioSocket getBioSocket() {
return socket;
}
public Client getClient() {
return ConnectToServerPool.getConnectForIPAndPort(ip,port);
}
public Object getProxyinstance() {
return proxyinstance;
}
}
ZK的实现细节(主要是针对curator的一些方法的封装)
clindren节点的监听器
ZK_Netty_Rpc.ZK.DistributeZK
/**
* 设置子节点的监听器
* @param nodename
*/
public void setChildrenWatch(final String nodename){
if (client == null){
reset();
}
System.out.println("设置");
PathChildrenCache pc = new PathChildrenCache(client,nodename,true);
pc.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
List<String> strings;
String string = "";
switch(event.getType()){
case CHILD_REMOVED:
string = nodename + ",child removed,and left children are ";
break;
case CHILD_ADDED:
string = nodename+",child add";
break;
case CHILD_UPDATED:
string = nodename+",child add";
/**
* 数据跟新,那么就需要对应的去修改本地的文件
* 这里就需要进行区分了,如果实在服务器上的话,那么这里什么都不需要些
* 但是在client上就需要编写相应的代码了,如更新也饿无逻辑代码
* 1.拿到节点名称,更新接口和实现类文件
* 2.更新ioc容器
*/
ChildData data = event.getData();
System.out.println(data.getPath()+"下的数据发生了改变,改变的内容是:"+new String(data.getData()));
// 以下是client才需要编写的代码,由于我将client和server卸载了同一个项目中,
// 所以就有些尴尬了,所以以下代码将会被注释掉,在后面分开始在将其恢复
// new GetInterfaces();
break;
default:
string = nodename+"other things happened";
break;
}
strings = client.getChildren().forPath(nodename);
string = getString(strings.iterator(), string);
}
});
try {
pc.start();
} catch (Exception e) {
e.printStackTrace();
}
}
简单的测试
Server端
ZK_Netty_Rpc.StartRun
public class StartRun {
public static void main(String[] args) throws Exception {
ThreadPoolExecutor threadPoolExecutor = MyThreadPool.getDefaultThreadPoolExcutor();
// threadPoolExecutor.submit(new Server());
new ContainerInit();
new Server();
}
}
ZK_Netty_Rpc.Service
public class showImpl implements show{
static ArrayList<String> list = new ArrayList<>();
static {
for (int i = 0; i < 1000; i++) {
list.add("今天天气正好加"+i);
}
}
@Override
public String showmore() {
StringBuilder stringBuilder = new StringBuilder();
for (String s : list) {
stringBuilder.append(s+"n");
}
return stringBuilder.toString();
}
@Override
public int state() {
return list.size();
}
@Override
public void setname(String name) {
System.out.println(name);
}
}
Client端
ZK_Netty_Rpc.Client.Service
public class test {
static String getclassname(String name){
String[] split = name.split("\.");
return split[split.length-1];
}
public static void main(String[] args) throws Exception {
/**
* 如果本地无任何的文件,那么先拉取文件,在执行对应的代码。即将下面的代码注释掉。
*/
int n = 1;
try {
new GetInterfaces();
} catch (Exception e) {
e.printStackTrace();
}
for (int i = 0; i < n; i++) {
new Thread(new Runnable() {
@Override
public void run() {
ConcurrentHashMap<String, IPPort> clientRpcIoc = ChannelManager.ClientRpcIoc;
IPPort ipPort = clientRpcIoc.get(getclassname(show.class.getName()));
show sj = (show) ipPort.getProxyinstance();
String showmore = sj.showmore();
System.out.println(showmore);
}
}).start();
}
// Scanner scanner = new Scanner(System.in);
// scanner.nextInt();
// ipPort.getClient().getChannel().closeFuture().sync();
// ipPort.getClient().getWorkers().shutdownGracefully();
}
}
效果
服务端:
Client
压测实验
1个服务器,3个客户端,每一个客户端上开启100个线程,每一个线程都使用同一个接口的三个方法。
经检测,服务端和客户端能够正常的工作,并能够正常的显示结果。
最后
以上就是健康导师为你收集整理的基于ZooKeeper为注册中心的Netty_Rpc的全部内容,希望文章能够帮你解决基于ZooKeeper为注册中心的Netty_Rpc所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复