概述
最近在b站看到一个视频,可以用来入门rpc(remote procedure call),记录一下学习的过程,rpc即是一个计算机通信协议,该协议允许运行于一台计算机的程序调用另一台计算机的子程序,程序员无需额外地为这个交互作用编程,如果设计的软件采用面向对象编程,远程调用亦可作为远程方法调用
大概的流程是消费方以本地方式调用服务,将方法、参数等信息封装成请求体,并且找到服务地址,将消息发送到服务端,服务端对请求信息进行解码,调用本地服务,并将结果返回给消费方法,消费方进行解码并得到最后结果。而透明化远程服务调用在java方式上面就是通过jdk动态代理,或者字节码生成如cglib,asm等等,但字节码生成的代码编写难度可能相对较高以及不易维护。
首先明确两个角色,消费方和服务方,消费方如何拿到服务方的地址?这时候需要引入一个注册中心的角色,用来注册服务方的地址,消费方可以从注册中心里拿到服务方地址,而且也需要确定通信的格式,比如定义什么格式的消息头、消息体,采用什么样的序列化和反序列化(用于编码解码)方式,这里通信方式选择http和nettynio支持的tcp进行远程调用。
下面开始编码环节:
分为五个模块,这里不作子模块划分了,consumer就是服务调用方,framework是这个简单的远程调用框架所需的一些类, protocol是协议的具体实现,provider是服务提供方,register则起到注册中心的作用
简单的框架部分
invocation对象
用来封装所请求的方法信息,而返回结果其实也可以定义一个结果对象,这里只是string类型
/**
* @author: lele
* @date: 2019/11/15 下午7:01
* 封装调用方所想调用的远程方法信息
*/
@Data
@AllArgsConstructor
public class Invocation implements Serializable {
private static final long serialVersionUID = -7789662944864163267L;
private String interfaceName;
private String methodName;
private Object[] params;
//防止重载
private Class[] paramsTypes;
}
目标地址类
/**
* @author: lele
* @date: 2019/11/15 下午6:56
* 自定义的URL类,方便管理
*/
@Data
@AllArgsConstructor
public class URL implements Serializable {
private static final long serialVersionUID = 7947426935048871471L;
private String hostname;
private Integer port;
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
URL url = (URL) o;
return Objects.equals(hostname, url.hostname) &&
Objects.equals(port, url.port);
}
@Override
public int hashCode() {
return Objects.hash(hostname, port);
}
}
协议接口
把不同协议抽象
public interface Protocol {
//服务提供方启动的方法
void start(URL url);
//发送请求
String send(URL url, Invocation invocation);
}
协议工厂
public class ProtocolFactory {
public static HttpProtocol http() {
return new HttpProtocol();
}
public static NettyProtocol netty(){
return new NettyProtocol();
}
}
动态代理工厂
负责对服务接口下的方法进行代理,这个是核心
/**
* @author: lele
* @date: 2019/11/15 下午7:48
* 代理工厂,对传入的类进行代理对具体执行的方法进行封装然后发送给服务端进行执行
*/
public class ProxyFactory {
public static <T> T getProxy(Class interfaceClass) {
return (T)Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//指定所用协议
Protocol protocol=ProtocolFactory.http();
//通过注册中心获取可用链接
URL url= MapRegister.random(interfaceClass.getName());
//封装方法参数
Invocation invocation = new Invocation(interfaceClass.getName(), method.getName(), args, method.getParameterTypes());
//发送请求
String res = protocol.send(url,invocation);
return res;
}
});
}
}
http
通过提供一个内嵌的tomcat服务器作为服务提供者,并且添加一个servlet,这个servlet对请求进行解码后,通过反射执行相关的逻辑并返回给调用方
协议具体实现
public class HttpProtocol implements Protocol {
@Override
public void start(URL url) {
HttpServer server=new HttpServer();
server.start(url.getHostname(),url.getPort());
}
@Override
public String send(URL url, Invocation invocation) {
HttpClient client=new HttpClient();
String res = client.post(url.getHostname(), url.getPort(), invocation);
return res;
}
}
服务端
tomcat实例
/**
* @author: lele
* @date: 2019/11/15 下午6:44
* 构造嵌入式tomcat服务器
*/
public class HttpServer {
public void start(String hostname, Integer port) {
Tomcat tomcat = new Tomcat();
//获取server实例
Server server = tomcat.getServer();
Service service = server.findService("Tomcat");
//连接器
Connector connector = new Connector();
connector.setPort(port);
Engine engine = new StandardEngine();
engine.setDefaultHost(hostname);
//host
Host host = new StandardHost();
host.setName(hostname);
String contextPath = "";
//上下文
Context context = new StandardContext();
context.setPath(contextPath);
context.addLifecycleListener(new Tomcat.FixContextListener());
host.addChild(context);
engine.addChild(host);
service.setContainer(engine);
service.addConnector(connector);
//添加servlet,匹配所有路径
tomcat.addServlet(contextPath, "dispathcer", new DispatcherServlet());
context.addServletMappingDecoded("/*","dispathcer");
try {
tomcat.start();
tomcat.getServer().await();
} catch (LifecycleException e) {
e.printStackTrace();
}
}
}
处理servlet的逻辑,而自定义servlet添加该逻辑并配置到内嵌tomcat当中即可完成服务端处理
public class DispatcherServlet extends HttpServlet {
@Override
protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
new HttpServerHandler().handle(req,resp);
}
}
public class HttpServerHandler {
public void handle(HttpServletRequest req, HttpServletResponse resp){
try {
//获取输入流
ServletInputStream inputStream = req.getInputStream();
//包装成对象输入流
ObjectInputStream ois=new ObjectInputStream(inputStream);
//转换成方法调用参数
Invocation invocation= (Invocation) ois.readObject();
String hostAddress = InetAddress.getLocalHost().getHostName();
URL url=new URL(hostAddress,8080);
Class implClass=MapRegister.get(invocation.getInterfaceName(),url);
Method method = implClass.getMethod(invocation.getMethodName(), invocation.getParamsTypes());
String result = (String) method.invoke(implClass.newInstance(), invocation.getParams());
//写回结果
IOUtils.write(result,resp.getOutputStream());
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (NoSuchMethodException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InstantiationException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
}
客户端
发起请求
public class HttpClient {
public String post(String hostname, Integer port, Invocation invocation) {
try {
URL url = new URL("http", hostname, port, "/");
HttpURLConnection urlConnection = (HttpURLConnection) url.openConnection();
urlConnection.setRequestMethod("POST");
urlConnection.setDoOutput(true);
OutputStream outputStream = urlConnection.getOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(outputStream);
oos.writeObject(invocation);
oos.flush();
oos.close();
InputStream inputStream = urlConnection.getInputStream();
String result = IOUtils.toString(inputStream);
return result;
} catch (MalformedURLException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
}
return null;
}
}
netty
服务端通过继承simpleInboundHandler和复写数据流入的处理方法,同样的经过对请求参数进行处理通过反射执行相关逻辑返回给调用方,调用方编解码后保存结果即可。
协议实现
public class NettyProtocol implements Protocol {
@Override
public void start(URL url) {
NettyServer nettyServer=new NettyServer();
try {
nettyServer.start(url.getHostname(),url.getPort());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String send(URL url, Invocation invocation) {
NettyClient nettyClient=new NettyClient();
String res = nettyClient.send(url, invocation);
return res;
}
}
客户端
@Data
public class NettyClientHandler extends SimpleChannelInboundHandler<String> {
private String result;
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
this.result = s;
}
}
public class NettyClient {
public String send(URL url, Invocation invocation) {
//用来保存调用结果的handler
NettyClientHandler res = new NettyClientHandler();
NioEventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
try {
bootstrap.group(group)
.channel(NioSocketChannel.class)
//true保证实时性,默认为false会累积到一定的数据量才发送
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//编码器
ch.pipeline().addLast(new ObjectEncoder());
//反序列化(解码)对象时指定类解析器,null表示使用默认的类加载器
ch.pipeline().addLast(new ObjectDecoder(1024 * 64, ClassResolvers.cacheDisabled(null)));
ch.pipeline().addLast(res);
}
});
//connect是异步的,但调用其future的sync则是同步等待连接成功
ChannelFuture future = bootstrap.connect(url.getHostname(), url.getPort()).sync();
System.out.println("链接成功!" + "host:" + url.getHostname() + " port:" + url.getPort());
//同步等待调用信息发送成功
future.channel().writeAndFlush(invocation).sync();
//同步等待NettyClientHandler的channelRead0被触发后(意味着收到了调用结果)关闭连接
future.channel().closeFuture().sync();
return res.getResult();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
return null;
}
}
服务端
启动服务:
public class NettyServer {
public void start(String hostName,int port) throws InterruptedException {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
try {
bootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
//存放已完成三次握手的请求的队列的最大长度
.option(ChannelOption.SO_BACKLOG, 128)
//启用心跳保活
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//自带的对象编码器
ch.pipeline().addLast(new ObjectEncoder());
//解码器
ch.pipeline().addLast(new ObjectDecoder(1024 * 64, ClassResolvers.cacheDisabled(null)));
ch.pipeline().addLast(new NettyServerHandler());
}
});
//bind初始化端口是异步的,但调用sync则会同步阻塞等待端口绑定成功
ChannelFuture future = bootstrap.bind(hostName,port).sync();
System.out.println("绑定成功!"+"host:"+hostName+" port:"+port);
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
处理入站消息(消息即invocation类)
public class NettyServerHandler extends SimpleChannelInboundHandler<Invocation> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Invocation invocation) throws Exception {
String hostAddress = InetAddress.getLocalHost().getHostName();
//这里的port按照本地的端口,可以用其他变量指示
Class serviceImpl= MapRegister.get(invocation.getInterfaceName(),new URL(hostAddress,8080));
Method method=serviceImpl.getMethod(invocation.getMethodName(),invocation.getParamsTypes());
Object result=method.invoke(serviceImpl.newInstance(),invocation.getParams());
System.out.println("结果-------"+result);
//由于操作异步,确保发送消息后才关闭连接
ctx.writeAndFlush(result).addListener(ChannelFutureListener.CLOSE);
// ReferenceCountUtil.release(invocation); 默认实现,如果只是普通的adapter则需要释放对象
}
}
注册中心
这里的实现只是把存放服务信息的类存入文本中,因为服务端和客户端是两个jvm进程,所以对象内存地址也不一样,需要持久化再取出,而比较通用流行的方式的是使用redis,zookeeper作为注册中心,这里的格式是{接口名:{URL:实现类}}一个接口对应多个map,每个map只有一个key/value,key为可用的url(服务地址),value(具体的实现类)
public class MapRegister {
//{服务名:{URL:实现类}}
private static Map<String, Map<URL, Class>> REGISTER = new HashMap<>();
//一个接口对应多个map,每个map只有一个key/value,key为可用的url(服务地址),value(具体的实现类)
public static void register(String interfaceName, URL url, Class implClass) {
Map<URL, Class> map = new HashMap<>();
map.put(url,implClass);
REGISTER.put(interfaceName,map);
saveFile();
}
//这里直接拿第一个
public static URL random(String interfaceName){
REGISTER=getFile();
return REGISTER.get(interfaceName).keySet().iterator().next();
}
//通过接口名和url寻找具体的实现类,服务端使用
public static Class get(String interfaceName,URL url){
REGISTER=getFile();
return REGISTER.get(interfaceName).get(url);
}
public static Map<String,Map<URL,Class>> getFile(){
FileInputStream fileInputStream= null;
try {
fileInputStream = new FileInputStream(System.getProperty("user.dir")+"/"+"temp.txt");
ObjectInputStream in=new ObjectInputStream(fileInputStream);
Object o = in.readObject();
return (Map<String, Map<URL, Class>>) o;
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
return null;
}
private static void saveFile(){
FileOutputStream fileOutputStream= null;
try {
fileOutputStream = new FileOutputStream(System.getProperty("user.dir")+"/"+"temp.txt");
ObjectOutputStream objectOutputStream=new ObjectOutputStream(fileOutputStream);
objectOutputStream.writeObject(REGISTER);
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
//消费端
public class Comsumer {
public static void main(String[] args) {
HelloService helloService = ProxyFactory.getProxy(HelloService.class);
String result = helloService.qq();
System.out.println(result);
}
}
//服务提供方
public class Provider {
public static void main(String[] args) throws UnknownHostException {
String hostAddress = InetAddress.getLocalHost().getHostName();
URL url=new URL(hostAddress,8080);
//这里多个接口的话,都要注册上去
MapRegister.register(HelloService.class.getName(),url,HelloServiceImpl.class);
Protocol server= ProtocolFactory.http();
server.start(url);
}
}
先启动provider,再启动comsumer,可以看到访问的结果。
主要核心还是通过动态代理代理具体调用的方法,通过tcp或者http等远程调用其它服务的接口,但这个只能简单的作为入门例子,还有很多改进的地方,如结合多线程处理消息,接入spring,更详细的协议定义,比如返回结果不仅仅是string,注册中心改为zookeeper实现等等,项目具体地址——https://github.com/97lele/rpcstudy/tree/master
最后
以上就是洁净小鸭子为你收集整理的基于netty、zookeeper手写RPC框架之一——基本模块搭建与编写的全部内容,希望文章能够帮你解决基于netty、zookeeper手写RPC框架之一——基本模块搭建与编写所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复