简易RPC框架实现

栏目: Java · 发布时间: 7年前

内容简介:简易RPC框架实现

PRC(Remote Procedure Call) 远程过程调用。通俗的讲就是程序通过 RPC 框架调用远程主机的方法就如同调用本地方法一样。 Dubbo 就是这样一个 Rpc 框架,本文主要参考Dubbo的设计思路,简易实现了Rpc框架。 本文涉及到知识点包括:

  • Jdk 动态代理
  • serialization 序列化
  • Netty 相关
  • Zookeeper 使用

1、Rpc框架

Rpc 框架一般分为三个部分,Registry(注册中心)、Provider(提供者)、Consumer(消费者)。

简易RPC框架实现
  1. Registry 服务的注册中心,可以通过zookeeper、 redis 等实现。
  2. Provider 服务提供者被调用方,提供服务供消费者调用
  3. Consumer 消费者,通过订阅相应的服务获取需要调用服务的ip和端口号调用远程provider提供的服务。

2、代理

java中常见的代理有JDK动态代理、Cglib动态代理、静态代理(ASM等字节码技术)。

2.1、JDK 代理

举个例子

@Override
   @SuppressWarnings("unchecked")
   public <T> T createProxyBean(Class<T> rpcServiceInterface) {
       return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{rpcServiceInterface}, new AryaRpcInvocationHandler());
   }

JDK代理生成代理对象主要通过 java.lang.reflect.Proxy 类的 newProxyInstance 方法。JDK代理需要被代理对象必须实现接口。

2.2、Cglib

Cglib实际上是对ASM的易用性封装,Cglib不需要目标对象必须实现某一个接口,相对JDK动态代理更加灵活。

Enhancer en = new Enhancer();  
       en.setSuperclass(clazz);  
       en.setCallback(new MethodInterceptor() {  
           @Override  
           public Object intercept(Object arg0, Method method, Object[] args, MethodProxy arg3) throws Throwable {  
               Object o = method.invoke(object, args);  
               return o;  
           }  
       });  
       return en.create();

2.3、静态代理

通过字节码技术对class文件进行修改,使用和学习成本相对较高,需要对Class的文件结构以及各种符号引用有比较深的认识,才能较好的使用,因为是对字节码的修改所以相对的性能上也比动态代理要好一些。

3、序列化

我们知道数据在网络上传输都是通过二进制流的形式进行进行的。当Consumer调用Provider时传输的参数需要先进行序列化,provider接收到参数时需要进行反序列化才能拿到需要的参数数据,所以序列化的性能对RPC的调用性能有很大的影响。目前主流的序列化方式有很多包括:Kryo、Protostuff、hessian。等

Protostuff是google序列化Protosbuff的开源实现,项目中我们用到它的序列化方式

/**
* @author HODO
*/
public class ProtostuffSerializer implements Serializer {

   @Override
   public byte[] serialize(Object object) {
       Class targetClass = object.getClass();
       RuntimeSchema schema = RuntimeSchema.createFrom(targetClass);
       LinkedBuffer linkedBuffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
       return ProtostuffIOUtil.toByteArray(object, schema, linkedBuffer);
   }

   @SuppressWarnings("unchecked")
   @Override
   public <T> T deserialize(byte[] bytes, Class<T> targetClass) {
       RuntimeSchema schema = RuntimeSchema.createFrom(targetClass);
       T object = (T) schema.newMessage();
       ProtostuffIOUtil.mergeFrom(bytes, object, schema);
       return object;
   }
}

4、Netty

Netty是一个高性能、异步事件驱动的NIO框架,它提供了对TCP、UDP和文件传输的支持。举个例子: Netty 服务端代码

public class NettyServer {

   private ApplicationContext applicationContext;

   public NettyServer(ApplicationContext applicationContext) {
       this.applicationContext = applicationContext;
   }

   public void init(int port) {
       EventLoopGroup boss = new NioEventLoopGroup();
       EventLoopGroup worker = new NioEventLoopGroup();

       try {
           ServerBootstrap bootstrap = new ServerBootstrap();
           bootstrap.group(boss, worker);
           bootstrap.channel(NioServerSocketChannel.class);
           bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
           bootstrap.option(ChannelOption.TCP_NODELAY, true);
           bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
           bootstrap.localAddress(port);
           bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
               @Override
               public void initChannel(SocketChannel socketChannel) throws Exception {
                   ChannelPipeline channelPipeline = socketChannel.pipeline();
                   channelPipeline.addLast(new NettyServerHandler(applicationContext));
               }
           });
           ChannelFuture f = bootstrap.bind().sync();
           if (f.isSuccess()) {
               System.out.println("Netty端口号:" + port);
           }
           f.channel().closeFuture().sync();
       } catch (Exception e) {
           e.printStackTrace();
       } finally {
           boss.shutdownGracefully();
           worker.shutdownGracefully();
       }
   }

}

Netty 客服端代码

public class NettyClient {

    private int port;
    private String host;

    private final CountDownLatch countDownLatch = new CountDownLatch(1);

    SerializerFactory serializerFactory = new SerializerFactory();
    Serializer serializer = serializerFactory.getSerialize(ProtostuffSerializer.class);


    public NettyClient(String host, int port) {
        this.port = port;
        this.host = host;
    }

    public NettyClient(String inetAddress) {
        if (inetAddress != null && inetAddress.length() != 0) {
            String[] strings = inetAddress.split(":");
            this.host = strings[0];
            this.port = Integer.valueOf(strings[1]);
        }
    }

    public RpcResponse invoker(RpcRequest rpcRequest) throws InterruptedException {

        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            final NettyClientHandler clientHandler = new NettyClientHandler();
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline().addLast(clientHandler);
                }});
            ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)).sync();

            serializer.serialize(rpcRequest);
            future.channel().writeAndFlush(Unpooled.buffer().writeBytes(serializer.serialize(rpcRequest)));
            countDownLatch.await();
            // 等待链接关闭
            //future.channel().closeFuture().sync();
            return clientHandler.getRpcResponse();
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }

    public class NettyClientHandler extends ChannelInboundHandlerAdapter {


        private RpcResponse rpcResponse;

        /**
         * 接收 Rpc 调用结果
         *
         * @param ctx netty 容器
         * @param msg 服务端答复消息
         * @throws Exception
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf) msg;
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);
            rpcResponse = serializer.deserialize(req, RpcResponse.class);
            countDownLatch.countDown();
        }

        RpcResponse getRpcResponse() {
            return rpcResponse;
        }
    }
}

5、注册中心 zookeeper

选用了 zookeeper 作为注册中心,在建议Rpc框架中提供了注册中心的扩展。只要实现 RegistryManager 接口即可。 zookeeper 常用的命令行:

1、客服端脚本连接zookeeper服务器不指定 -server 默认连接本地服务

./zkCli -service ip:port

2、创建

create [-s] [-e] path data acl

创建一个节点 -s -e 分别指定节点的类型和特性:顺序和临时节点默认创建的是临时节点,acl用于权限控制

3、读取

ls path 只能看指定节点下的一级节点

get path 查看指定节点的数据和属性信息

4、更新

set path data [version]

可以指定更新操作是基于哪一个版本当更新的 path 不存在时报 Node does not exist

5、删除

`delete path [version]``

6、Spring 支持

在框架中还提供了两个注解 @RpcConsumerRpcProvider 在项目中只要引入

<dependency>
			<groupId>com.yoku.arya</groupId>
			<artifactId>arya</artifactId>
			<version>1.0-SNAPSHOT</version>
		</dependency>

在provider端容器注入

@Bean
	public RpcProviderProcessor rpcProviderProcessor() {
		return new RpcProviderProcessor();
	}

在comsumer端容器注入

@Bean
	public RpcConsumerProcessor rpcConsumerProcessor() {
		return new RpcConsumerProcessor();
	}

项目完整的代码 arya github.com/hoodoly/ary…

框架使用Demo github.com/hoodoly/ary…

欢迎 star


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Computer Age Statistical Inference

Computer Age Statistical Inference

Bradley Efron、Trevor Hastie / Cambridge University Press / 2016-7-21 / USD 74.99

The twenty-first century has seen a breathtaking expansion of statistical methodology, both in scope and in influence. 'Big data', 'data science', and 'machine learning' have become familiar terms in ......一起来看看 《Computer Age Statistical Inference》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具