概述
Rpc中间件是目前互联网企业用的最多的中间件,是实现分布式系统最基础的中间件系统。在国内,用的最多的就是Dubbo以及Thrift。在国外,包括grpc,以及Finagle。Rpc的原理大同小异,都是利用TCP/IP协议将要本地要调用的类,方法,参数按照某种协议传输到远程主机,远程主机执行完毕以后再返回到本地主机。
当然其真正的实现非常复杂,涉及到IO,网络,多线程以及整个框架的架构设计。那么接下来的几篇文章(包括这篇文章)就来实现一个简单的基本的RPC框架,NIO框架使用的是Netty,原因你懂得。
定义一个简单的服务
假设有一个叫做IDemoService的简单服务。这个服务放在了contract包中间:
public interface IDemoService
{
public int sum(int a,int b);
}复制代码
本地机器只有接口,实现是在远端实现的。
那么在本地调用的时候肯定是通过代理走网络发送到远程主机。如果使用静态代理,那么每个接口都必须实现一个代理类,所以一般来说没有哪个RPC框架使用静态代理,都是使用动态代理:
public class JDKDynamicService<T> implements InvocationHandler {
private Class<T> clazz;
private RpcClient client = new RpcClient("127.0.0.1", 6666);
public void setClass(Class<T> clazz) {
this.clazz = clazz;
}
@SuppressWarnings("unchecked")
public T get() {
return (T) Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class<?>[] { this.clazz }, this);
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
return client.sendCommand(clazz, method, args);
}
}复制代码
使用动态代理,调用被代理类的每一个方法都会调用invoke方法。在invoke方法内部,调用RpcClient来传输协议和返回结果。
构建一个简单的传输协议
本地主机要调用远程接口,肯定要告诉远程主机调用哪个类的哪个接口,参数是什么。这里就简单的定一个传输的协议类:
public class MethodInvoker implements Serializable {
private static final long serialVersionUID = 6644047311439601478L;
private Class clazz;
private String method;
private Object[] args;
public MethodInvoker(String method, Object[] args, Class clazz) {
super();
this.method = method;
this.args = args;
this.clazz = clazz;
}
public String getMethod() {
return method;
}
public void setMethod(String method) {
this.method = method;
}
public Object[] getArgs() {
return args;
}
public void setArgs(Object[] args) {
this.args = args;
}
public Class getClazz() {
return clazz;
}
public void setClazz(Class clazz) {
this.clazz = clazz;
}
}复制代码
这个类就不具体分析了,原因你懂得,继承Serializable接口是为了使用JAVA自带的序列化协议。
使用RPCClient实际发送数据
public class RpcClient {
private String host;
private int port;
public RpcClient(String host, int port) {
super();
this.host = host;
this.port = port;
}
public Object sendCommand(Class clazz, Method method, Object[] args) {
MethodInvoker invoker = new MethodInvoker(method.getName(), args, clazz);
final ClientHandler clientHandler = new ClientHandler(invoker);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ObjectDecoder(1024 * 1024,
ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
ch.pipeline().addLast(new ObjectEncoder());
ch.pipeline().addLast(clientHandler);
}
});
// Start the client.
ChannelFuture f = b.connect(new InetSocketAddress(host, port)).sync();
// Wait until the connection is closed. 当一个任务完成的时候会继续执行。
f.channel().closeFuture().sync();
return clientHandler.getResponse();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
}
return null;
}
}复制代码
ClientHandler 用来向服务端发送数据:
public class ClientHandler extends ChannelInboundHandlerAdapter {
private Object response;
private MethodInvoker methodInvoker;
public ClientHandler(MethodInvoker methodInvoker) {
this.methodInvoker = methodInvoker;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(this.methodInvoker); // 发送到下一个Handler。input处理完,就输出,然后output。
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("channelRead:"+msg);
response = msg;
ctx.close();
}
public Object getResponse() {
return response;
}
}复制代码
值得注意的是,被传输对象的编码和解码使用了Netty自带的编码与解码器。此外,一定要调用ctx.close()方法来关闭这个链接。
server端业务的实现
假设server端实现了IDemoService,并且使用Spring来管理bean对象:
public class DemoServiceImpl implements IDemoService
{
public int sum(int a, int b) {
return a+b;
}
}复制代码
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="demoService" class="com.slowlizard.rpc.server.business.DemoServiceImpl"></bean>
</beans>复制代码
server端接受传过来的数据
public class ServerHandler extends ChannelInboundHandlerAdapter {
private static ApplicationContext springApplicationContext;
static {
springApplicationContext = new ClassPathXmlApplicationContext("context.xml");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("server come here:channelRead");
MethodInvoker methodInvoker = (MethodInvoker) msg;
Object service = springApplicationContext.getBean(methodInvoker.getClazz());
Method[] methods = service.getClass().getDeclaredMethods();
for (Method method : methods) {
if (method.getName().equals(methodInvoker.getMethod())) {
Object result = method.invoke(service, methodInvoker.getArgs());
ctx.writeAndFlush(result);
}
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("server come here:channelActive");
}
}复制代码
启动server
public class Server {
private static int port = 6666;
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap().group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel arg0) throws Exception {
arg0.pipeline().addLast(new ObjectDecoder(1024*1024,
ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())) );
arg0.pipeline().addLast(new ObjectEncoder());
arg0.pipeline().addLast(new ServerHandler());
}
}
).option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.TCP_NODELAY, true).childOption(ChannelOption.SO_KEEPALIVE, true); // 保持长连接状态
// 绑定端口,开始接收进来的连接
ChannelFuture future = bootstrap.bind(port).sync();
future.channel().closeFuture().sync();// 子线程开始监听
} catch (Exception e) {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}复制代码
测试
public class App {
public static void main(String[] args) {
JDKDynamicService<IDemoService> proxy = new JDKDynamicService<IDemoService>();
proxy.setClass(IDemoService.class);
IDemoService service = proxy.get();
System.out.println("result" + service.sum(1, 2));
}
}复制代码
很快,我们就是实现了一个简单的RPC远程调用。但是它只是一个原理性的示范,离真正的RPC框架还非常远。
首先,它的性能怎么样?
RpcClient每次发送协议到服务端,都会建立一个新的连接,能否优化它?
Server端能更快的查找到Bean并更快执行吗?
客户端能否与Spring集成?
Server端 Netty传输能否与Spring分离?
在下一章,我们将着手优化这个“框架”,来解决目前遇到的问题。
差点忘了附上Github地址:
github.com/slowlizard/…
最后
以上就是会撒娇往事为你收集整理的使用Netty构建Rpc中间件(一)定义一个简单的服务ClientHandler 用来向服务端发送数据:server端业务的实现启动server测试的全部内容,希望文章能够帮你解决使用Netty构建Rpc中间件(一)定义一个简单的服务ClientHandler 用来向服务端发送数据:server端业务的实现启动server测试所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复