我是靠谱客的博主 快乐秋天,最近开发中收集的这篇文章主要介绍手写一个简化版的Dubbo框架,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

在学习了Dubbo之后, 我发现自己好像了解了Dubbo的实现原理, 又好像不是很了解, 毕竟我只是背诵了下概念, 没有深入的去看源码. 这里我就来手写一个简化版的Dubbo框架, 通过动手实践来深入理解Dubbo的实现原理.

Dubbo的实现原理

RPC调用的过程

我们先来看下RPC调用的过程.

在这里插入图片描述

  • 服务容器负责启动,加载,运行服务提供者。
  • 服务提供者在启动时,向注册中心注册自己提供的服务。
  • 服务消费者在启动时,向注册中心订阅自己所需的服务。
  • 注册中心返回服务提供者地址列表给消费者,如果有变更,注册中心将基于长连接推送变更数据给消费者。
  • 服务消费者,从提供者地址列表中,基于软负载均衡算法,选一台提供者进行RPC调用,如果调用失败,再选另一台调用。
  • 服务消费者和提供者,在内存中累计调用次数和调用时间,定时每分钟发送一次统计数据到监控中心。

RPC调用的原理

RPC调用的原理是: 动态代理, 反射, 网络传输.

  • 消费者从注册中心获取到服务提供者的地址后, 与服务提供者建立TCP连接.
  • 消费者将服务的全限定类名(String), 方法名(String), 方法参数类型(Class[]), 方法参数(Object[]), 通过TCP传输给服务提供者.
  • 服务提供者获取到这些数据后, 通过反射调用对应服务的方法, 然后将执行结果通过TCP返回给服务消费者.
  • 整个RPC调用过程被封装到动态代理中, 对用户来说是透明的.

Dubbo架构

Dubbo框架设计分为十层:

在这里插入图片描述

  • service 服务层, 为服务提供者和服务消费者提供接口.
  • config 配置层, 提供dubbo的各种配置.
  • proxy 服务接口透明代理, 生成动态代理.
  • registry 注册中心层, 负责服务的注册与发现.
  • cluster 路由层, 封装多个提供者的路由及负载均衡.
  • monitor 监控层, RPC调用次数和调用时间监控.
  • protocol 远程调用层, 封装 RPC 调用.
  • exchange 信息交换层, 封装请求响应模式, 同步转异步.
  • transport 网络传输层, 抽象 mina 和 netty 为统一接口.
  • serialize 数据序列化层, 提供数据序列化的接口.

手写简化版的Bubbo框架

我们根据Dubbo的框架设计来手写一个简化版的Dubbo, 其中序列化协议使用Java原生的Serializable, 网络传输协议使用原生的TCP, 负载均衡使用随机算法, 注册中心使用ZooKeeper, 动态代理使用JDK Proxy.

github地址: 手写一个简化版的Dubbo框架

服务提供者

(1) ZooKeeper常量

定义了ZooKeeper的地址和Dubbo注册中心的根节点路径.

/**
 * @author litianxiang
 * @date 2020/3/17 11:45
 */
public class ZooKeeperConst {
	/**
	 * ZooKeeper的地址
	 */
	public static String host = "xxx.xx.xx.xxx:2181";
	/**
	 * Dubbo在ZooKeeper上的根节点
	 */
	public static String rootNode = "/dubbo";
}

(2) 注册中心

这里使用ZooKeeper来实现注册中心, 将服务及服务提供者地址注册到注册中心.

/**
 * @author litianxiang
 * @date 2020/3/17 11:28
 */
public class RegisterCenter {
	private static Logger logger = LoggerFactory.getLogger(RegisterCenter.class);
	private ZooKeeper zk;


	/**
	 * 连接ZooKeeper, 创建dubbo根节点
	 */
	public RegisterCenter() {
		try {
			CountDownLatch connectedSignal = new CountDownLatch(1);
			zk = new ZooKeeper(ZooKeeperConst.host, 5000, new Watcher() {
				@Override
				public void process(WatchedEvent event) {
					if (event.getState() == Event.KeeperState.SyncConnected) {
						connectedSignal.countDown();
					}
				}
			});
			//因为监听器是异步操作, 要保证监听器操作先完成, 即要确保先连接上ZooKeeper再返回实例.
			connectedSignal.await();

			//创建dubbo注册中心的根节点(持久节点)
			if (zk.exists(ZooKeeperConst.rootNode, false) == null) {
				zk.create(ZooKeeperConst.rootNode, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
			}
		} catch (Exception e) {
			logger.error("connect zookeeper server error.", e);
		}
	}

	/**
	 * 将服务和服务提供者URL注册到注册中心
	 * @param serviceName 服务名称
	 * @param serviceProviderAddr 服务所在TCP地址
	 */
	public void register(String serviceName, String serviceProviderAddr) {
		try {
			//创建服务节点
			String servicePath = ZooKeeperConst.rootNode + "/" + serviceName;
			if (zk.exists(servicePath, false) == null) {
				zk.create(servicePath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
			}

			//创建服务提供者节点
			String serviceProviderPath = servicePath + "/" + serviceProviderAddr;
			if (zk.exists(serviceProviderPath, false) == null) {
				zk.create(serviceProviderPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
			}
			logger.info("服务注册成功, 服务路径: " + serviceProviderPath);
		} catch (Exception e) {
			logger.error("注册中心-注册服务报错", e);
		}
	}
}

(3) 接口全限定类名, 方法名, 方法参数类型, 方法参数的包装类

这里为了简单, 使用Java自带的序列化协议.

/**
 * 封装接口名, 方法名, 参数字节码数组, 参数对象
 */
public class Invocation implements Serializable {
    private static final long serialVersionUID = -2798340582119604989L;

    /**
     * 接口名
     */
    private String interfaceName;
    /**
     * 方法名
     */
    private String methodName;
    /**
     * 参数字节码数组
     */
    private Class[] paramTypes;
    /**
     * 参数对象
     */
    private Object[] params;


    public Invocation(String interfaceName, String methodName, Class[] paramTypes, Object[] params) {
        this.interfaceName = interfaceName;
        this.methodName = methodName;
        this.paramTypes = paramTypes;
        this.params = params;
    }

    public String getInterfaceName() {
        return interfaceName;
    }

    public void setInterfaceName(String interfaceName) {
        this.interfaceName = interfaceName;
    }

    public String getMethodName() {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    public Class[] getParamTypes() {
        return paramTypes;
    }

    public void setParamTypes(Class[] paramTypes) {
        this.paramTypes = paramTypes;
    }

    public Object[] getParams() {
        return params;
    }

    public void setParams(Object[] params) {
        this.params = params;
    }
}

(4) RPC监听服务

用来监听Consumer远程调用的TCP连接, 接收到Consumer传输过来的数据后, 通过反射调用对应的方法, 然后将结果返回给Consumer. Dubbo使用的是Netty框架, 这里为了简单, 我们使用原生的TCP连接.

/**
 * RPC监听服务, 监听consumer远程调用的tcp连接
 * @author litianxiang
 * @date 2020/3/17 18:01
 */
public class RpcServer {
	private static Logger logger = LoggerFactory.getLogger(RpcServer.class);
	private Map<String, Class> serviceMap;

	public RpcServer(Map<String, Class> serviceMap) {
		this.serviceMap = serviceMap;
	}

	/**
	 * 启动RPC监听服务
	 */
	public void start() {
		//监听端口, 处理rpc请求
		ServerSocket serverSocket = null;
		try {
			serverSocket = new ServerSocket(12000);
			logger.info("RPC监听服务启动...");
			while (true) {
				Socket socket = serverSocket.accept();
				new Thread(new ServerHandler(socket, serviceMap)).start();
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if (serverSocket != null) {
				try {
					serverSocket.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}
	}
}


/**
 * 处理RPC, 通过反射执行方法
 * @author litianxiang
 * @date 2020/3/6 17:52
 */
public class ServerHandler implements Runnable {
	private Socket socket;
	private Map<String, Class> serviceMap;

	public ServerHandler(Socket socket, Map<String, Class> serviceMap) {
		this.socket = socket;
		this.serviceMap = serviceMap;
	}

	@Override
	public void run() {
		ObjectInputStream in = null;
		ObjectOutputStream out = null;
		try {
			in = new ObjectInputStream(socket.getInputStream());
			out = new ObjectOutputStream(socket.getOutputStream());

			//获取Invocation对象
			Invocation invocation = (Invocation) in.readObject();

			//执行对应方法
			Class clazz = serviceMap.get(invocation.getInterfaceName());
			Method method = clazz.getMethod(invocation.getMethodName(), invocation.getParamTypes());
			Object invoke = method.invoke(clazz.newInstance(), invocation.getParams());

			//返回方法执行结果
			out.writeObject(invoke);
			out.flush();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if (in != null) {
				try {
					in.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
			if (out != null) {
				try {
					out.close();
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
			if (socket != null) {
				try {
					socket.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
			socket = null;
		}
	}
}

(5) Provider启动类

这里会模拟dubbo的service配置, 将接口名及其对应的实现类储存到serviceMap中, 然后将服务和服务提供者地址注册到注册中心, 最后再启动对Consumer远程调用的监听.

/**
 * @author litianxiang
 * @date 2020/3/6 15:32
 */
public class Provider {
	private static Logger logger = LoggerFactory.getLogger(Provider.class);
	private static Map<String, Class> serviceMap = new HashMap<>();
	private static String tcpHost = "127.0.0.1:12000";

	static {
		/**
		 * 模拟service配置处理逻辑
		 * <dubbo:service interface="com.client.service.IBookService" ref="bookService" />
		 * <bean id="bookService" class="com.provider.service.BookServiceImpl" />
		 */
		serviceMap.put(IBookService.class.getName(), BookServiceImpl.class);
	}

	public static void main(String[] args) {
		//将服务和服务提供者URL注册到注册中心
		RegisterCenter registerCenter = new RegisterCenter();
		for (Map.Entry<String, Class> entry : serviceMap.entrySet()) {
			registerCenter.register(entry.getKey(), tcpHost);
		}

		//监听Consumer的远程调用(为了简化代码, 这里使用TCP代替Netty)
		RpcServer rpcServer = new RpcServer(serviceMap);
		rpcServer.start();
	}
}

服务消费者

(1) 负载均衡

为了简单, 这里直接使用的是随机算法.

public class RandomLoadBalance {

	/**
	 * 随机一个provider
	 * @param providerList provider列表
	 * @return provider
	 */
	public String doSelect(List<String> providerList) {
		int size = providerList.size();
		Random random = new Random();
		return providerList.get(random.nextInt(size));
	}
}

(2) 服务订阅类

服务订阅类提供向注册中心订阅服务的功能, 涉及服务发现与负载均衡.

public class ServiceSubscribe {
	private static Logger logger = LoggerFactory.getLogger(ServiceSubscribe.class);
	private ZooKeeper zk;
	private List<String> providerList;

	/**
	 * 连接ZooKeeper, 创建dubbo根节点
	 */
	public ServiceSubscribe() {
		try {
			CountDownLatch connectedSignal = new CountDownLatch(1);
			zk = new ZooKeeper(ZooKeeperConst.host, 5000, new Watcher() {
				@Override
				public void process(WatchedEvent event) {
					if (event.getState() == Event.KeeperState.SyncConnected) {
						connectedSignal.countDown();
					}
				}
			});
			//因为监听器是异步操作, 要保证监听器操作先完成, 即要确保先连接上ZooKeeper再返回实例.
			connectedSignal.await();

			//创建dubbo注册中心的根节点(持久节点)
			if (zk.exists(ZooKeeperConst.rootNode, false) == null) {
				zk.create(ZooKeeperConst.rootNode, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
			}
		} catch (Exception e) {
			logger.error("connect zookeeper server error.", e);
		}
	}

	/**
	 * 在注册中心订阅服务, 返回对应的服务url
	 * 只要第一次获取到了服务的RPC地址, 后面注册中心挂掉之后, 仍然可以继续通信.
	 * @param serviceName 服务名称
	 * @return 服务host
	 */
	public String subscribe(String serviceName) {
		//服务节点路径
		String servicePath = ZooKeeperConst.rootNode + "/" + serviceName;
		try {
			//获取服务节点下的所有子节点, 即服务的RPC地址
			providerList = zk.getChildren(servicePath, new Watcher() {
				@Override
				public void process(WatchedEvent event) {
					if (event.getType() == Event.EventType.NodeChildrenChanged) {
						try {
							//循环监听
							providerList = zk.getChildren(servicePath, true);
						} catch (KeeperException | InterruptedException e) {
							logger.error("Consumer在ZooKeeper订阅服务-注册监听器报错", e);
						}
					}
				}
			});
		} catch (Exception e) {
			logger.error("从注册中心获取服务报错.", e);
		}
		logger.info(serviceName + "的服务提供者列表: " + providerList);

		//负载均衡
		RandomLoadBalance randomLoadBalance = new RandomLoadBalance();
		return randomLoadBalance.doSelect(providerList);
	}

}

(3) RPC代理类

根据JDK Proxy生成一个代理对象, 封装RPC调用的过程.

public class RpcServiceProxy {
	private ServiceSubscribe serviceSubscribe;

	public RpcServiceProxy(ServiceSubscribe serviceSubscribe) {
		this.serviceSubscribe = serviceSubscribe;
	}

	/**
	 * 获取RPC代理
	 * @param clazz
	 * @return
	 */
	public Object getProxy(final Class clazz) {
		return Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, new InvocationHandler() {
			@Override
			public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
				//在注册中心订阅服务, 返回对应的服务url
				String rpcHost = serviceSubscribe.subscribe(clazz.getName());
				String[] split = rpcHost.split(":");
				//与远程服务建立连接
				Socket socket = new Socket(split[0], Integer.parseInt(split[1]));
				ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream());
				ObjectInputStream in = new ObjectInputStream(socket.getInputStream());

				//向RPC服务传输Invocation对象
				String className = clazz.getName();
				String methodName = method.getName();
				Class[] paramTypes = method.getParameterTypes();
				Invocation invocation = new Invocation(className, methodName, paramTypes, args);
				out.writeObject(invocation);
				out.flush();

				//接收方法执行结果
				Object object = in.readObject();
				in.close();
				out.close();
				socket.close();

				return object;
			}
		});
	}
}

(4) Consumer启动类

消费者启动后, 会向注册中心订阅服务, 经过负载均衡获取到对应的服务后, 再进行RPC调用.

public class Consumer {
	private static Logger logger = LoggerFactory.getLogger(Consumer.class);

	public static void main(String[] args) {
		//在注册中心订阅服务, 获取服务所在的url, 然后通过代理远程调用服务
		ServiceSubscribe serviceSubscribe = new ServiceSubscribe();
		RpcServiceProxy rpcServiceProxy = new RpcServiceProxy(serviceSubscribe);
		//获取RPC代理
		IBookService bookService = (IBookService) rpcServiceProxy.getProxy(IBookService.class);
		BookDTO bookInfo = bookService.getBookInfo(1);
		System.out.println(bookInfo);
	}
}

测试

(1) 先修改注册中心的地址

public static String host = "xxx.xx.xx.xxx:2181";

(2) Service

public class BookDTO implements Serializable{

	private static final long serialVersionUID = 1934175717377394706L;

	private int id;
	private String name;
	private String desc;
	private String author;

	public int getId() {
		return id;
	}

	public void setId(int id) {
		this.id = id;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public String getDesc() {
		return desc;
	}

	public void setDesc(String desc) {
		this.desc = desc;
	}

	public String getAuthor() {
		return author;
	}

	public void setAuthor(String author) {
		this.author = author;
	}

	@Override
	public String toString() {
		return "BookDTO{" +
				"id=" + id +
				", name='" + name + ''' +
				", desc='" + desc + ''' +
				", author='" + author + ''' +
				'}';
	}
}

public interface IBookService {
	BookDTO getBookInfo(int id);
}

public class BookServiceImpl implements IBookService {
	@Override
	public BookDTO getBookInfo(int id) {
		if (id == 1) {
			BookDTO bookDTO = new BookDTO();
			bookDTO.setId(1);
			bookDTO.setName("仙逆");
			bookDTO.setDesc("顺为凡, 逆为仙, 只在心中一念间.");
			bookDTO.setAuthor("耳根");
			return bookDTO;
		} else {
			return new BookDTO();
		}
	}
}

(3) 启动Provider

在这里插入图片描述

(4) 启动Consumer

在这里插入图片描述

最后

以上就是快乐秋天为你收集整理的手写一个简化版的Dubbo框架的全部内容,希望文章能够帮你解决手写一个简化版的Dubbo框架所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(77)

评论列表共有 0 条评论

立即
投稿
返回
顶部