概述
分布式网络通信框架Netty
基于Netty+Zookeeper纯手写RPC远程调用框架
2 Dubbo框架完整实现
学习总结
田超凡
2019年11月4日
1 创建生产者(Provider)和消费者(Consumer)
生产者:netty-zookeeper-dubbo-user-service-api 用户服务-接口
netty-zookeeper-dubbo-user-service-provider 用户服务-接口实现类
package com.tcf.netty.zookeeper.dubbo.user.service;
/***
* TODO TCF 用户接口-生产者
* @author Hasee
*
*/
public interface UserService {
//TODO TCF 根据用户id查询用户信息
public String getUserById(String id);
}
package com.tcf.netty.zookeeper.dubbo.user.service.impl;
import com.tcf.netty.zookeeper.dubbo.common.annotation.RpcRegistration;
import com.tcf.netty.zookeeper.dubbo.user.service.UserService;
/***
* TODO TCF 用户接口实现类-生产者
* @author Hasee
*
*/
@RpcRegistration(UserService.class)
public class UserServiceImpl implements UserService {
//TODO TCF 根据用户id查询用户信息
public String getUserById(String id)
{
return "张三";
}
}
消费者:netty-zookeeper-dubbo-order-service-api 订单服务-接口
netty-zookeeper-dubbo-order-service-consumer 订单服务-接口实现类
package com.tcf.netty.zookeeper.dubbo.order.service;
/***
* TODO TCF 订单业务接口-消费者
* @author Hasee
*
*/
public interface OrderService {
//TODO TCF 根据用户id查询用户订单
public String getOrderByUserId(String id);
}
package com.tcf.netty.zookeeper.dubbo.order.service.impl;
import com.tcf.netty.zookeeper.dubbo.order.service.OrderService;
import com.tcf.netty.zookeeper.dubbo.order.util.DubboClientUtil;
import com.tcf.netty.zookeeper.dubbo.user.service.UserService;
/***
* TODO TCF 订单服务-消费者
* @author Hasee
*
*/
public class OrderServiceImpl implements OrderService {
//TODO TCF 需要远程调用的用户服务接口
private UserService userService;
//TODO TCF 构造柱入
public OrderServiceImpl()
{
this.userService=DubboClientUtil.getDubboClient().createService(UserService.class);
}
//TODO TCF 根据用户id获取订单信息,远程调用用户服务接口
public String getOrderByUserId(String id)
{
//TODO TCF 根据用户id获取用户信息,远程服务调用
String userInfo=userService.getUserById(id);
return userInfo;
}
}
2 创建Zookeeper注册中心服务注册接口ServerRegistration和实现类ServerRegistry
package com.tcf.netty.zookeeper.dubbo.server.registry;
/***
* TODO TCF Zookeeper注册中心服务注册接口,定义服务注册类需要注册的规范
* @author Hasee
*
*/
public interface ServerRegistration {
//TODO TCF 注册服务到Zookeeper注册中心
public void registry(String serverName,String serverClassAddr);
}
package com.tcf.netty.zookeeper.dubbo.server.registry.impl;
import java.net.URLEncoder;
import org.I0Itec.zkclient.ZkClient;
import com.tcf.netty.zookeeper.dubbo.server.registry.ServerRegistration;
/***
* TODO TCF Zookeeper注册中心服务注册工具类
* @author Hasee
*
*/
public class ServerRegistry implements ServerRegistration {
//TODO TCF Zookeeper注册中心ip地址
private String host;
//TODO TCF Zookeeper注册中心客户端
private ZkClient zkClient;
//TODO TCF Zookeeper注册中心连接超时时长
private int connectTimeout=5000;
//TODO TCF 服务注册根目录
private String rootPath="/netty-zookeeper-dubbo-user-service-api-v1.1";
//TODO TCF 注册服务类型:生产者
private String serverSuffix="/providers";
//TODO TCF 构造柱入
public ServerRegistry(String host)
{
this.host=host;
this.zkClient=new ZkClient(host,connectTimeout);
}
//TODO TCF 注册服务到Zookeeper注册中心
public void registry(String serverName, String serverClassAddr)
{
try
{
//TODO TCF Zookeeper服务根目录/rootPath
if(!zkClient.exists(rootPath))
{
zkClient.createPersistent(rootPath);
}
//TODO TCF Zookeeper服务二级目录/rootPath/serverName
String secondLevelPath=rootPath+"/"+serverName;
if(!zkClient.exists(secondLevelPath))
{
zkClient.createPersistent(secondLevelPath);
}
//TODO TCF Zookeeper服务三级目录/rootPath/serverName/serverSuffix
String thirdLevelPath=secondLevelPath+serverSuffix;
if(!zkClient.exists(thirdLevelPath))
{
zkClient.createPersistent(thirdLevelPath);
}
//TODO TCF Zookeeper服务四级目录/rootPath/serverName/serverSuffix/serverClassPath
String nodePath=thirdLevelPath+"/"+URLEncoder.encode(serverClassAddr,"UTF-8");
if(zkClient.exists(nodePath))
{
//TODO TCF 服务节点已存在,先删除该服务节点
zkClient.delete(nodePath);
}
//TODO TCF 创建服务节点,注册服务
zkClient.createEphemeral(nodePath);
System.out.println("Zookeeper服务注册成功,服务地址===>"+nodePath);
}
catch(Exception e)
{
e.printStackTrace();
}
}
}
3 创建Dubbo服务器:DubboServer
- .基于反射获取需要注册到Zookeeper注册中心的服务并实现服务注册,存入已注册服务容器
- .初始化Netty服务器、通道,绑定事件驱动处理器、MarShalling编码解码器
- .创建DubboServerEventHandle事件驱动处理器,监听通讯事件并处理
package com.tcf.netty.zookeeper.dubbo.server.core;
import java.util.HashMap;
import java.util.Map;
import com.tcf.netty.zookeeper.dubbo.common.annotation.RpcRegistration;
import com.tcf.netty.zookeeper.dubbo.common.coder.MarShallingCoderFactory;
import com.tcf.netty.zookeeper.dubbo.server.core.event.DubboServerEventHandle;
import com.tcf.netty.zookeeper.dubbo.server.registry.ServerRegistration;
import com.tcf.netty.zookeeper.dubbo.server.registry.impl.ServerRegistry;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/***
* TODO TCF Dubbo服务器,基于Netty服务器端实现
* @author Hasee
*
*/
public class DubboServer {
//TODO TCF 已经注册到Zookeeper注册中心的服务
private Map<String,Object> registryMap=new HashMap<String,Object>();
//TODO TCF Zookeeper注册中心和Dubbo服务器ip地址
private String host="";
//TODO TCF 服务启动端口号:20880(Dubbo默认)
private int port=20880;
//TODO TCF Zookeeper注册中心注册服务协议类型
private String protocol="tcf://";
//TODO TCF Zookeeper服务注册工具类
private ServerRegistration serverRegistration=null;
//TODO TCF 构造柱入
public DubboServer(String host)
{
this.host=host;
this.serverRegistration=new ServerRegistry(host);
}
//TODO TCF 初始化Dubbo服务器,注册服务到Zookeeper注册中心
public void start(Object service)
{
//TODO TCF 基于反射获取需要注册到Zookeeper注册中心的服务,注册到Zookeeper注册中心
serviceRegistry(service);
//TODO TCF 初始化Netty服务器
initServer();
}
//TODO TCF 基于反射获取需要注册到Zookeeper注册中心的服务并进行服务注册
public void serviceRegistry(Object service)
{
//TODO TCF 获取需要注册到Zookeeper注册中心的服务上方标注的RPC注解
if(service.getClass().isAnnotationPresent(RpcRegistration.class))
{
RpcRegistration rpcRegistration=service.getClass().getAnnotation(RpcRegistration.class);
if(rpcRegistration!=null)
{
//TODO TCF 需要注册到Zookeeper注册中心的服务接口
Class<?> interfaceClass=rpcRegistration.value();
if(interfaceClass!=null)
{
//TODO TCF 需要注册的服务名称
String serviceName=interfaceClass.getName().replace("interface ","");
//TODO TCF 拼装服务注册二级地址(唯一标识需要注册的服务)
String serviceClassAddr=protocol+host+":"+port+"//";
//TODO TCF 把服务注册到Zookeeper注册中心
serverRegistration.registry(serviceName,serviceClassAddr);
//TODO TCF 注册成功的服务
registryMap.put(serviceName,service);
}
}
}
}
//TODO TCF 初始化Dubbo服务器端,使用Netty服务器
public void initServer()
{
//TODO TCF Boss线程组和工作线程组
NioEventLoopGroup bossGroup=new NioEventLoopGroup();
NioEventLoopGroup workGroup=new NioEventLoopGroup();
//TODO TCF Netty服务器初始化,绑定事件驱动处理器、编码解码器,初始化Netty通道
ServerBootstrap serverBootstrap=new ServerBootstrap();
serverBootstrap.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
//TODO TCF Netty服务器加载通道时绑定编码解码器、事件驱动处理器
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception
{
//TODO TCF Netty-RPC请求编码解码器MarSharlling
socketChannel.pipeline().addLast(MarShallingCoderFactory.buildMarshallingEncoder());
socketChannel.pipeline().addLast(MarShallingCoderFactory.buildMarshallingDecoder());
//TODO TCF 绑定事件驱动处理器-事件监听
socketChannel.pipeline().addLast(new DubboServerEventHandle(registryMap));
}
});
try
{
//TODO TCF 初始化Netty通道
ChannelFuture channelFuture=serverBootstrap.bind(port).sync();
System.out.println("====Dubbo 服务器启动成功===="+port);
//TODO TCF 当断开和客户端的连接时,关闭通道,释放资源,未断开连接时,此次调用会产生阻塞
channelFuture.channel().closeFuture().sync();
}
catch(Exception e)
{
e.printStackTrace();
}
finally
{
//TODO TCF 关闭Netty线程组,释放资源
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
DubboSeverEventHandle事件驱动处理器
package com.tcf.netty.zookeeper.dubbo.server.core.event;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import com.tcf.netty.zookeeper.dubbo.common.model.RpcRequest;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/***
* TODO TCF Dubbo服务器事件驱动处理器-实现事件监听和处理
* @author Hasee
*
*/
public class DubboServerEventHandle extends ChannelInboundHandlerAdapter {
//TODO TCF 已经注册到Zookeeper注册中心的服务
private Map<String,Object> registryMap=new HashMap<String,Object>();
//TODO TCF 构造柱入
public DubboServerEventHandle(Map<String,Object> registryMap)
{
this.registryMap=registryMap;
}
//TODO TCF 监听客户端发送的消息
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception
{
//TODO TCF 将数据解析为客户端发送的RPCRequest请求参数模型
if(msg!=null)
{
if(msg instanceof RpcRequest)
{
RpcRequest rpcRequest=(RpcRequest)msg;
System.out.println("====接收到客户端发送的服务调用请求:"+rpcRequest.toString());
//TODO TCF 根据需要调用的服务名称获取对应的服务
Object serviceInstance=registryMap.get(rpcRequest.getServiceClass().getName());
if(serviceInstance!=null)
{
//TODO TCF 基于反射获取对应服务需要调用的方法
Method method=serviceInstance.getClass().getMethod(rpcRequest.getMethodName(),rpcRequest.getParameterTypes());
//TODO TCF 执行对应方法,实现服务调用
Object invokeResult=method.invoke(serviceInstance,rpcRequest.getParameterValues());
//TODO TCF 返回调用方法执行结果给客户端
ctx.writeAndFlush(invokeResult);
ctx.close();
}
}
}
}
}
4 创建Zookeeper注册中心服务发现接口和实现类ZkFoundService、ZkFoundServiceImpl
package com.tcf.netty.zookeeper.dubbo.client.service.handle;
import java.util.List;
/***
* TODO TCF Zookeeper注册中心服务发现接口
* @author Hasee
*
*/
public interface ZkFoundService {
//TODO TCF 根据服务名称获取注册到Zookeeper注册中心的服务访问地址
public List<String> getServiceAddressList(String serviceName);
}
package com.tcf.netty.zookeeper.dubbo.client.service.handle.impl;
import java.util.List;
import org.I0Itec.zkclient.ZkClient;
import com.tcf.netty.zookeeper.dubbo.client.service.handle.ZkFoundService;
/***
* TODO TCF 根据服务名称获取Zookeeper注册中心的服务访问地址
* @author Hasee
*
*/
public class ZkFoundServiceImpl implements ZkFoundService {
//TODO TCF Zookeeper注册中心地址
private String host="";
//TODO TCF Zookeeper注册中心连接超时时长
private int connectTimeout=5000;
//TODO TCF Zookeeper服务节点根目录
private String rootPath="/netty-zookeeper-dubbo-user-service-api-v1.1";
//TODO TCF Zookeeper服务注册角色:生产者
private String serviceSuffix="/providers";
//TODO TCF Zookeeper客户端
private ZkClient zkClient;
//TODO TCF 构造柱入,初始化Zookeeper客户端
public ZkFoundServiceImpl(String host)
{
this.host=host;
this.zkClient=new ZkClient(host,connectTimeout);
}
//TODO TCF 根据服务名称获取注册到Zookeeper注册中心的服务访问地址
public List<String> getServiceAddressList(String serviceName)
{
//TODO TCF 服务节点路由
String serviceNodePath=rootPath+"/"+serviceName+serviceSuffix;
List<String> addressList=zkClient.getChildren(serviceNodePath);
return addressList;
}
}
5 基于策略模式实现Dubbo负载均衡处理器DubboLoadBalance
- .提供随机负载均衡策略实现类RandomLoadBalanceService
- .提供索引负载均衡策略实现类
IndexLoadBalanceService
(3).注意考虑多线程并发的情况
package com.tcf.netty.zookeeper.dubbo.client.loadbalance;
import java.util.List;
/***
* TODO TCF 基于策略模式实现Dubbo负载均衡处理器业务接口
* @author Hasee
*
*/
public interface DubboLoadBalanceService {
//TODO TCF 根据访问服务地址进行负载均衡策略实现,返回最终需要调用的服务访问地址
public String loadBalance(List<String> serviceAddressList);
}
package com.tcf.netty.zookeeper.dubbo.client.loadbalance.impl;
import java.util.List;
import java.util.Random;
import com.tcf.netty.zookeeper.dubbo.client.loadbalance.DubboLoadBalanceService;
/***
* TODO TCF 随机负载均衡策略实现类
* @author Hasee
*
*/
public class RandomLoadBalanceService implements DubboLoadBalanceService {
//TODO TCF 随机获取服务调用地址中的某一个服务地址,实现随机负载均衡策略
public String loadBalance(List<String> serviceAddressList)
{
String address=serviceAddressList.get(new Random().nextInt(serviceAddressList.size()));
return address;
}
}
package com.tcf.netty.zookeeper.dubbo.client.loadbalance.impl;
import java.util.List;
import com.tcf.netty.zookeeper.dubbo.client.loadbalance.DubboLoadBalanceService;
/***
* TODO TCF 基于索引的负载均衡处理器
* @author Hasee
*
*/
public class IndexLoadBalanceService implements DubboLoadBalanceService {
//TODO TCF 当前加载到第几个服务访问地址
private int index=0;
//TODO TCF 实现索引负载均衡策略
public synchronized String loadBalance(List<String> serviceAddressList)
{
String resultAddress="";
if(serviceAddressList!=null && serviceAddressList.size()>0)
{
if(index>=serviceAddressList.size())
{
index=0;
}
resultAddress=serviceAddressList.get(index++);
System.out.println("负载均衡处理后,服务调用地址===>"+resultAddress);
}
return resultAddress;
}
}
6 实现Dubbo客户端DubboClient
- .根据需要调用的服务接口类型创建JDK动态代理实例,实现代理方法
- .在代理方法中实现服务发现和远程调用具体逻辑实现,满足开闭原则
- .创建DubboClientEventHandle事件驱动处理器实现Dubbo客户端网络通讯事件监听和处理
package com.tcf.netty.zookeeper.dubbo.client.core;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.URLDecoder;
import java.util.List;
import com.tcf.netty.zookeeper.dubbo.client.core.event.DubboClientEventHandle;
import com.tcf.netty.zookeeper.dubbo.client.loadbalance.DubboLoadBalanceService;
import com.tcf.netty.zookeeper.dubbo.client.loadbalance.impl.IndexLoadBalanceService;
import com.tcf.netty.zookeeper.dubbo.client.service.handle.ZkFoundService;
import com.tcf.netty.zookeeper.dubbo.client.service.handle.impl.ZkFoundServiceImpl;
import com.tcf.netty.zookeeper.dubbo.common.coder.MarShallingCoderFactory;
import com.tcf.netty.zookeeper.dubbo.common.model.RpcRequest;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/***
* TODO TCF Dubbo客户端,创建需要调用的服务接口的JDK动态代理实例并调用代理方法,具体的服务发现和调用实现逻辑在代理方法中
* @author Hasee
*
*/
public class DubboClient {
//TODO TCF Zookeeper注册服务发现类
private ZkFoundService zkFoundService;
//TODO TCF Dubbo负载均衡处理器
private DubboLoadBalanceService dubboLoadBalanceService;
//TODO TCF 构造注入
public DubboClient(String host)
{
this.zkFoundService=new ZkFoundServiceImpl(host);
//TODO TCF 默认采用索引负载均衡策略
this.dubboLoadBalanceService=new IndexLoadBalanceService();
}
//TODO TCF 创建需要调用的服务接口的JDK动态代理实例,实现Zookeeper服务发现和调用
@SuppressWarnings("unchecked")
public <T> T createService(final Class<T> interfaceClass)
{
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
new Class[]{interfaceClass},
new InvocationHandler() {
public Object invoke(Object proxy,Method method,Object[] args) throws Throwable
{
//TODO TCF 远程服务调用后方法执行结果
Object invokeResult=null;
//TODO TCF 获取需要调用的服务名称
String serviceName=interfaceClass.getName();
//TODO TCF 根据服务名称获取已经注册到Zookeeper注册中心的服务访问地址
List<String> addressList=zkFoundService.getServiceAddressList(serviceName);
if(addressList!=null && addressList.size()>0)
{
//TODO TCF 基于负载均衡策略获取最终需要调用的服务地址
String address=dubboLoadBalanceService.loadBalance(addressList);
address=URLDecoder.decode(address,"UTF-8");
//TODO TCF 获取ip地址和端口号
address=address.replace("//","").replace("/","");
String[] arrays=address.split(":");
//TODO TCF tcf://host:port//serviceName
String host=arrays[1];
Integer port=Integer.parseInt(arrays[2]);
//TODO TCF 封装RPC远程调用请求参数模型
final RpcRequest rpcRequest=new RpcRequest(interfaceClass,method.getName(),method.getParameterTypes(),args);
//TODO TCF 初始化Netty客户端,连接远程Netty服务器,传递RPC远程调用请求参数,实现服务远程调用
//TODO TCF 工作线程组
NioEventLoopGroup workGroup=new NioEventLoopGroup();
//TODO TCF Netty客户端事件驱动处理器
final DubboClientEventHandle dubboClientEventHandle=new DubboClientEventHandle(rpcRequest);
//TODO TCF Netty客户端初始化,绑定事件驱动处理器、MarShalling编码解码器,初始化Netty通道
Bootstrap client=new Bootstrap();
client.group(workGroup)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host,port.intValue()))
.handler(new ChannelInitializer<SocketChannel>() {
//TODO TCF Netty通道初始化
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception
{
//TODO TCF 绑定MarShalling编码解码器
socketChannel.pipeline().addLast(MarShallingCoderFactory.buildMarshallingEncoder());
socketChannel.pipeline().addLast(MarShallingCoderFactory.buildMarshallingDecoder());
//TODO TCF 绑定事件驱动处理器,实现事件监听
socketChannel.pipeline().addLast(dubboClientEventHandle);
}
});
try
{
//TODO TCF 初始化Netty通道
ChannelFuture channelFuture=client.connect().sync();
//TODO TCF 断开客户端连接时关闭Netty通道,释放资源
channelFuture.channel().closeFuture().sync();
System.out.println(serviceName+"服务的"+method.getName()+"方法远程调用成功......");
//TODO TCF 服务调用之后,方法执行返回结果
invokeResult=dubboClientEventHandle.getResponseMessage();
}
catch(Exception e)
{
e.printStackTrace();
}
finally
{
//TODO TCF 关闭线程组,释放线程占用资源
workGroup.shutdownGracefully();
}
}
return invokeResult;
}
});
}
}
Dubbo客户端事件驱动处理器DubboClientEventHandle,发送RPC服务远程调用请求到对应的Netty服务器并接收服务器返回的方法执行响应结果
package com.tcf.netty.zookeeper.dubbo.client.core.event;
import com.tcf.netty.zookeeper.dubbo.common.model.RpcRequest;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/***
* TODO TCF Dubbo客户端事件驱动处理器-监听事件
* @author Hasee
*
*/
public class DubboClientEventHandle extends ChannelInboundHandlerAdapter {
//TODO TCF 服务远程调用请求参数
private RpcRequest rpcRequest;
//TODO TCF Netty服务器返回的服务调用执行结果
private Object responseMessage;
//TODO TCF 构造柱入
public DubboClientEventHandle(RpcRequest rpcRequest)
{
this.rpcRequest=rpcRequest;
}
public Object getResponseMessage() {
return responseMessage;
}
public void setResponseMessage(Object responseMessage) {
this.responseMessage = responseMessage;
}
//TODO TCF 接收到服务器返回的响应信息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
{
if(msg!=null)
{
this.responseMessage=msg;
System.out.println("接收到服务器返回的响应信息(方法执行结果):"+msg);
ctx.close();
}
}
//TODO TCF 发送RPC服务远程调用请求到Netty服务器端
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception
{
ctx.writeAndFlush(rpcRequest);
}
}
7 创建自定义RPC服务注册注解RpcRegistration
package com.tcf.netty.zookeeper.dubbo.common.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/***
* TODO TCF 标注需要注册到Zookeeper注册中心的服务接口实现类
* @author Hasee
*
*/
@Documented
@Inherited
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcRegistration {
//TODO TCF 需要注册到Zookeeper注册中心的服务接口类型
Class<?> value();
}
8 创建RPC远程服务调用-数据传输载体RpcRequest模型类
package com.tcf.netty.zookeeper.dubbo.common.model;
import java.io.Serializable;
/***
* TODO TCF RPC远程调用请求数据模型类
* @author Hasee
*
*/
public class RpcRequest implements Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
//TODO TCF 需要调用的服务接口
private Class<?> serviceClass;
//TODO TCF 需要调用的服务接口方法名
private String methodName;
//TODO TCF 需要调用的服务接口方法参数类型
private Class<?>[] parameterTypes;
//TODO TCF 需要调用的服务接口方法参数列表
private Object[] parameterValues;
//TODO TCF 构造柱入
public RpcRequest(Class<?> serviceClass,String methodName,Class<?>[] parameterTypes,Object[] parameterValues)
{
this.serviceClass=serviceClass;
this.methodName=methodName;
this.parameterTypes=parameterTypes;
this.parameterValues=parameterValues;
}
public Class<?> getServiceClass() {
return serviceClass;
}
public void setServiceClass(Class<?> serviceClass) {
this.serviceClass = serviceClass;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Class<?>[] getParameterTypes() {
return parameterTypes;
}
public void setParameterTypes(Class<?>[] parameterTypes) {
this.parameterTypes = parameterTypes;
}
public Object[] getParameterValues() {
return parameterValues;
}
public void setParameterValues(Object[] parameterValues) {
this.parameterValues = parameterValues;
}
}
9 创建Netty编码器和解码器工厂MarShallingCoderFactory,创建MarShalling编码器Encoder和解码器Decoder,实现RpcRequest请求数据传输时的序列化和反序列化
package com.tcf.netty.zookeeper.dubbo.common.coder;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;
import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallingDecoder;
import io.netty.handler.codec.marshalling.MarshallingEncoder;
import io.netty.handler.codec.marshalling.UnmarshallerProvider;
public class MarShallingCoderFactory {
/**
* 创建Jboss Marshalling解码器MarshallingDecoder
* @return MarshallingDecoder
*/
public static MarshallingDecoder buildMarshallingDecoder()
{
//首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
//创建了MarshallingConfiguration对象,配置了版本号为5
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
//根据marshallerFactory和configuration创建provider
UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
//构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
return decoder;
}
/**
* 创建Jboss Marshalling编码器MarshallingEncoder
* @return MarshallingEncoder
*/
public static MarshallingEncoder buildMarshallingEncoder()
{
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
//构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
MarshallingEncoder encoder = new MarshallingEncoder(provider);
return encoder;
}
}
10 创建启动类,查看运行效果
Dubbo客户端工具类:DubboClientUtil
生成并获取单例的Dubbo客户端实例
生产者ProviderApplication:
注册服务到Zookeeper注册中心,初始化并启动Dubbo服务器
package com.tcf.netty.zookeeper.dubbo.core;
import com.tcf.netty.zookeeper.dubbo.server.core.DubboServer;
import com.tcf.netty.zookeeper.dubbo.user.service.impl.UserServiceImpl;
/***
* TODO TCF 用户服务-生产者,应用启动类,实现服务注册
* @author Hasee
*
*/
public class ProviderApplication {
public static void main(String[] args)
{
//TODO TCF 初始化Dubbo服务器,基于反射获取需要注册到Zookeeper注册中心的服务并实现服务注册,初始化Netty服务器(Dubbo服务器)
DubboServer dubboServer=new DubboServer("127.0.0.1");
dubboServer.start(new UserServiceImpl());
}
}
消费者ConsumerApplication:
RPC远程调用指定服务的指定方法,获取方法执行返回结果
package com.tcf.netty.zookeeper.dubbo.order.core;
import com.tcf.netty.zookeeper.dubbo.order.service.OrderService;
import com.tcf.netty.zookeeper.dubbo.order.service.impl.OrderServiceImpl;
/***
* TODO TCF 订单服务-消费者,应用启动,发起远程调用服务请求
* @author Hasee
*
*/
public class ConsumerApplication {
public static void main(String[] args)
{
OrderService orderService=new OrderServiceImpl();
String userInfo=orderService.getOrderByUserId("1");
System.out.println("UserInfo ===> "+userInfo);
}
}
maven相关依赖
<dependencies>
<groupId>com.tcf.netty.zookeeper.dubbo.common</groupId>
|
转载请注明原作者
最后
以上就是糟糕大炮为你收集整理的分布式网络通信框架Netty_基于Netty+Zookeeper手写RPC远程调用框架_2 Dubbo框架完整实现 分布式网络通信框架Netty的全部内容,希望文章能够帮你解决分布式网络通信框架Netty_基于Netty+Zookeeper手写RPC远程调用框架_2 Dubbo框架完整实现 分布式网络通信框架Netty所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复