【译】RabbitMQ系列(六)-RPC模式

栏目: Redis · 发布时间: 5年前

内容简介:在第二章中我们学习了如何使用Work模式在多个worker之间派发时间敏感的任务。这种情况是不涉及到返回值的,worker执行任务就好。如果涉及返回值,就要用到本章提到的RPC(Remote Procedure Call)了。本章我们使用RabbitMQ来构建一个RPC系统:一个客户端和一个可扩展的RPC服务端。我们让RPC服务返回一个斐波那契数组。我们创建一个简单的客户端类来演示如何使用RPC服务。call方法发送RPC请求,并阻塞知道结果返回。

RPC模式

在第二章中我们学习了如何使用Work模式在多个worker之间派发时间敏感的任务。这种情况是不涉及到返回值的,worker执行任务就好。如果涉及返回值,就要用到本章提到的RPC(Remote Procedure Call)了。

本章我们使用RabbitMQ来构建一个RPC系统:一个客户端和一个可扩展的RPC服务端。我们让RPC服务返回一个斐波那契数组。

Client interface

我们创建一个简单的客户端类来演示如何使用RPC服务。call方法发送RPC请求,并阻塞知道结果返回。

FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);

RPC贴士

虽然RPC的使用在计算机领域非常普遍,但是却经常受到批评。主要问题是编码人如果不注意使用的方法是本地还是远程时,往往会造成问题。往往让系统变得不可预知,增加不必要的复杂性和调试的难度。对此我们有如下几点建议:

  • 是本地方法还是远程方法要一目了然
  • 把系统的依赖写进文档
  • 系统要处理好超时的问题

如果可以尽量使用异步的pipeline来替代像RPC这种阻塞的操作。

Callback queue

在RabbitMQ上实现RPC是非常简单的。客户端发送一个request message,服务端回应一个response message。为了接受response message我们需要在发送request message的时候附带上'callback' queue的地址。我们可以使用默认的queue。

callbackQueueName = channel.queueDeclare().getQueue();

BasicProperties props = new BasicProperties
                            .Builder()
                            .replyTo(callbackQueueName)
                            .build();

channel.basicPublish("", "rpc_queue", props, message.getBytes());

// ... then code to read a response message from the callback_queue ...

Message的属性

AMQP 0-9-1协议预定义了14个消息属性,其中大部分很少使用,下面的属性较为常用

  • deliverMode: 标记message为持久(设置为2)或其他值。
  • contentType:message的编码类型,我们经常使用JSON编码,则设置为application/json
  • replyTo: 命名回调queue
  • correlationId:将RPC的请求和回应关联起来

需要引入新的类

import com.rabbitmq.client.AMQP.BasicProperties;

Correlaton Id

在上面的代码中,每次RPC请求都会创建一个用于回调的临时queue,我们有更好的方法,我们为每一个client创建一个回调queue。

但是这样有新的问题,从回调queue中收到response无法和相应的request关联起来。这时候就是correlationId属性发挥作用的时候了。为每个request中设置唯一的值,在稍后的回调queue中收到的response里也有这个属性,基于此,我们就可以关联之前的request了。如果我们遇到一个匹配不到的correlationId,那么丢弃的行为是安全的。

你可能会问,为什么我们忽略这些无法匹配的message,而不是当做一个错误处理呢?主要是考虑服务端的竞态条件,如果RPC服务器在发送response之后就宕机了,但是却没有发送ack消息。那么当RPC Server重启之后,会继续执行这个request。这就是为什么client需要幂等处理response。

Summary

【译】RabbitMQ系列(六)-RPC模式

我们的RPC向下面这样进行工作:

  • 对于一个RPC request,客户端发送message时设置两个属性:replyTo设置成一个没有名字的request独有的queue;为每个request设置一个唯一的correlationId。
  • request发送到rpc_queue
  • RPC worker监听rpc_queue。当有消息时,进行计算并通过replyTo指定的queue发送message给客户端。
  • 客户端监听回调queue。当接收到message,则检查correlationId。如果和之前的request匹配,则将消息返回给应用进行处理。

开始执行

斐波那契处理函数

private static int fib(int n) {
    if (n == 0) return 0;
    if (n == 1) return 1;
    return fib(n-1) + fib(n-2);
}

这是一个简易的实现,如果传入一个较大的值,将会是个灾难。

RPC服务器的代码为RPCServer.java, 代码是很简单明确的

  • 先是建立connection,channel和声明queue.
  • 设置prefetchCount,我们基于请求频繁程度,会启动多个RPC Server
  • 使用basicConsume来接收,该方法提供回调参数设置(DeliverCallback).

RPC客户端的代码为RPCClient.java,代码略微有点复杂

  • 建立connection和channel。
  • call方法来发送RPC请求
  • 生成correlationId
  • 生成默认名字的queue用于reply,并订阅它
  • 发送request message,设置参数replyTo和correlationId.
  • 然后返回并开始等待response到达
  • 因为消费者发送response是在另一个线程中,我们需要让main线程阻塞,在这里我们使用BlockingQueue。
  • 消费者进行简单的处理,为每一个response message检查其correlationId,如果是,则将response添加进阻塞队列
  • main函数阻塞在BlockingQueue返回
  • 将response返回给用户

RPCClient.java完整代码

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;

public class RPCClient implements AutoCloseable {

    private Connection connection;
    private Channel channel;
    private String requestQueueName = "rpc_queue";

    public RPCClient() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        connection = factory.newConnection();
        channel = connection.createChannel();
    }

    public static void main(String[] argv) {
        try (RPCClient fibonacciRpc = new RPCClient()) {
            for (int i = 0; i < 32; i++) {
                String i_str = Integer.toString(i);
                System.out.println(" [x] Requesting fib(" + i_str + ")");
                String response = fibonacciRpc.call(i_str);
                System.out.println(" [.] Got '" + response + "'");
            }
        } catch (IOException | TimeoutException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    public String call(String message) throws IOException, InterruptedException {
        final String corrId = UUID.randomUUID().toString();

        String replyQueueName = channel.queueDeclare().getQueue();
        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

        channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

        final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);

        String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
            if (delivery.getProperties().getCorrelationId().equals(corrId)) {
                response.offer(new String(delivery.getBody(), "UTF-8"));
            }
        }, consumerTag -> {
        });

        String result = response.take();
        channel.basicCancel(ctag);
        return result;
    }

    public void close() throws IOException {
        connection.close();
    }
}

RPCServer.java完整代码

import com.rabbitmq.client.*;

public class RPCServer {

    private static final String RPC_QUEUE_NAME = "rpc_queue";

    private static int fib(int n) {
        if (n == 0) return 0;
        if (n == 1) return 1;
        return fib(n - 1) + fib(n - 2);
    }

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
            channel.queuePurge(RPC_QUEUE_NAME);

            channel.basicQos(1);

            System.out.println(" [x] Awaiting RPC requests");

            Object monitor = new Object();
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                        .Builder()
                        .correlationId(delivery.getProperties().getCorrelationId())
                        .build();

                String response = "";

                try {
                    String message = new String(delivery.getBody(), "UTF-8");
                    int n = Integer.parseInt(message);

                    System.out.println(" [.] fib(" + message + ")");
                    response += fib(n);
                } catch (RuntimeException e) {
                    System.out.println(" [.] " + e.toString());
                } finally {
                    channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                    // RabbitMq consumer worker thread notifies the RPC server owner thread
                    synchronized (monitor) {
                        monitor.notify();
                    }
                }
            };

            channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> { }));
            // Wait and be prepared to consume the message from RPC client.
            while (true) {
                synchronized (monitor) {
                    try {
                        monitor.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Pro JavaScript Design Patterns

Pro JavaScript Design Patterns

Dustin Diaz、Ross Harmes / Apress / 2007-12-16 / USD 44.99

As a web developer, you’ll already know that JavaScript™ is a powerful language, allowing you to add an impressive array of dynamic functionality to otherwise static web sites. But there is more power......一起来看看 《Pro JavaScript Design Patterns》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具