【译】RabbitMQ系列(二)-Work模式

栏目: Redis · 发布时间: 6年前

内容简介:在第一章中,我们写了通过一个queue来发送和接收message的简单程序。在这一章中,我们会创建一个workqueue,来将执行时间敏感的任务分发到多个worker中。

Work模式

原文地址

【译】RabbitMQ系列(二)-Work模式

在第一章中,我们写了通过一个queue来发送和接收message的简单程序。在这一章中,我们会创建一个workqueue,来将执行时间敏感的任务分发到多个worker中。

work模式主要的意图是要避免等待完成一个耗时的任务。取而代之地,我们延迟任务的执行,将任务封装成消息,将之发送到queue。一个运行着的worker进程会弹出这个任务并执行它。当运行多个worker进程时,任务会在它们之间分派。

这种模式在web应用中特别有用,因为在一个较短的HTTP请求窗口中不会去执行一个复杂的任务。

准备工作

在上一章中,我们发送了一个”Hello World!"的message。现在我们将发送一个代表了复杂任务的字符串。这不是一个实际的任务,比如像调整图片大小或是重新渲染pdf文档,我们通 Thead.sleep() 来模拟一个耗时的任务。message中的小圆点表示其复杂度,圆点越多则任务的执行越耗时。比如“Hello..."的message将耗时3秒。

我们简单的修改上一章的Send.java代码,允许在命令行发送任意message。新的类叫做NewTask.java

String message = String.join(" ", argv);
channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

同样的,我们修改上一章中的Recv.java,让它在处理message的时候根据小圆点进行睡眠。新的类叫Worker.java

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  String message = new String(delivery.getBody(), "UTF-8");
  System.out.println(" [x] Received '" + message + "'");
  try {
    doWork(message);
  } finally {
    System.out.println(" [x] Done");
  }
};
boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
private static void doWork(String task) throws InterruptedException {
    for (char ch: task.toCharArray()) {
        if (ch == '.') Thread.sleep(1000);
    }
}

像在第一章一样编译这两个类

javac -cp $CP NewTask.java Worker.java

Round-robin分派

使用Task模式的一个明显的优势是让并行执行任务变得简单。我们只需要启动更多的worker就可以消减堆积的message,系统水平扩展简单。

首先,我们在同一时间启动两个worker。他们都会从queue获得message,来看一下具体细节。

打开了三个终端,两个是跑worker的。

java -cp $CP Worker
java -cp $CP Worker

第三个终端里来发布新的任务message。

java -cp $CP NewTask First message.
java -cp $CP NewTask Second message..
java -cp $CP NewTask Third message...
java -cp $CP NewTask Fourth message....
java -cp $CP NewTask Fifth message.....

让我们看看worker的处理message的情况.第一个worker收到了第1,3,5message,第二个worker收到了第2,4个message。

默认情况下,RabbitMQ会顺序的将message发给下一个消费者。每个消费者会得到平均数量的message。这种方式称之为round-robin(轮询).

Message 确认

执行任务需要一定的时间。你可能会好奇如果一个worker开始执行任务,但是中途异常退出,会是什么结果。在我们现在的代码中,一旦RabbitMQ将消息发送出去了,它会立即将该message删除。这样的话,就可能丢失message。

在实际场景中,我们不想丢失任何一个task。如果一个worker异常中断了,我们希望这个task能分派给另一个worker。

为了确保不会丢失message,RabbitMQ采用message确认机制。RabbitMQ只有收到该message的Ack之后,才会删除该消息。

如果worker中断退出了( channel关闭了,connection关闭了,或是TCP连接丢失了)而没有发送Ack,RabbitMQ会认为该消息没有完整的执行,会将该消息重新入队。该消息会被发送给其他的worker。这样就不用message丢失,即使是在worker经常异常中断退出的场景下。

不会有任何message会timeout。当消费者中断退出,RabbitMQ会重新分派message。即使消息的执行会花费很长的时间。

默认情况下,message是需要人工确认的。在上面的例子中,我们通过autoAck=true来关闭了人工确认。像下面这样,我们将该标志设置为false,worker就需要在完成了任务之后,发送确认。

channel.basicQos(1); // accept only one unack-ed message at a time (see below)

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  String message = new String(delivery.getBody(), "UTF-8");

  System.out.println(" [x] Received '" + message + "'");
  try {
    doWork(message);
  } finally {
    System.out.println(" [x] Done");
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  }
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

上面的代码保证即使当worker还在处理一条消息,而强制它退出,也不会丢失message。然后不久,所有未被确认的消息都会被重新分派。

发送确认必须和接收相同的channel。使用不同的channel进行确认会导致channel-level protocol 异常。

忘记确认消息是一个比较常见的错误,但是其后果是很严重的。当client退出时,message会被重新分派,但是RabbitMQ会占用越来越多的内存,因它无法释放那些未被确认的message。

##linux
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged 
##windows
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

Message 持久化

我们学习了在消费者出现问题的时候不丢失message。但是如果RabbitMQ服务器宕机了,我们还是会丢失message。

当RabbitMQ宕机时,默认情况下,它会”忘记“所有的queue和message。为了确保message不丢失,我们需要确认两件事情:我们要使得queue和message都是持久的。

首先,我们要确保RabbitMQ不会丢失我们设置好的queue。所以,我们要把它声明成持久的:

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

虽然代码没有任何问题,但是光这样是无效的。因为我们之前已经定义过名字为hello的queue。RabbitMQ不允许你使用不同的参数去重新定义一个已经存在的queue,而且这还不会反悔任何错误信息。但是我们还是有别的方法,让我们使用一个别的名字,比如task_queue:

boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

声明queue的改变要在生产者和消费者的代码里都进行修改。

接着我们要设置message的持久性,我们通过设置MessageProperties为PERSISTENT_TEXT_PLAIN:

import com.rabbitmq.client.MessageProperties;
channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

将message标记成持久的不能100%保证message不会丢失,虽然这告诉RabbitMQ将message保存到磁盘,然而在RabbitMQ从接到message到保存之间,仍然有一小段时间。同时RabbitMQ不会给每一条message执行fsync(2) -- 可能只是保存到了cache而没有写到磁盘上去。所以持久的保证也不是非常强,然后对我们简单的task queue来说则足够了。如果需要一个非常强的保证,则可以使用 发布确认 的方式。

Fair 分派

你可能已经注意到分派的工作没有如我们所期望的来执行。比如在有2个worker的情况系,所有偶数的message耗时很长,而所有奇数的message则耗时很短,这样其中一个worker则一直被分派到偶数的message,而另一个则一直是奇数的message。RabbitMQ对此并不知晓,进而继续这样分派着message。

这样的原因是RabbitMQ是在message入queue的时候确定分派的。它不关心消费者ack的情况。

【译】RabbitMQ系列(二)-Work模式

我们可以通过basicQos方法和prefetchCount(1)来解决这个问题。这个设置是让RabbitMQ给worker一次一个message。或者这么说,直到worker处理完之前的message并发送ack,才给worker下一个message。否则,Rabbitmq会将message发送给其它不忙的worker。

int prefetchCount = 1;
channel.basicQos(prefetchCount);

注意queue的大小。如果所有的worker都处于忙碌状态,queue可能会被装满。必须监控queue深度,可能要开启更多的worker,或者采取其他的措施。

开始执行

NewTask.java的最终版本

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class NewTask {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

        String message = String.join(" ", argv);

        channel.basicPublish("", TASK_QUEUE_NAME,
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + message + "'");
    }
  }

}

Worker.java的最终版本

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Worker {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    final Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    channel.basicQos(1);

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");

        System.out.println(" [x] Received '" + message + "'");
        try {
            doWork(message);
        } finally {
            System.out.println(" [x] Done");
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    };
    channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
  }

  private static void doWork(String task) {
    for (char ch : task.toCharArray()) {
        if (ch == '.') {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException _ignored) {
                Thread.currentThread().interrupt();
            }
        }
    }
  }
}

使用message ack和prefetchCount,来设定work queue。持久化选项则在RabbitMQ重启后能让任务得以恢复。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Numerical Recipes 3rd Edition

Numerical Recipes 3rd Edition

William H. Press、Saul A. Teukolsky、William T. Vetterling、Brian P. Flannery / Cambridge University Press / 2007-9-6 / GBP 64.99

Do you want easy access to the latest methods in scientific computing? This greatly expanded third edition of Numerical Recipes has it, with wider coverage than ever before, many new, expanded and upd......一起来看看 《Numerical Recipes 3rd Edition》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

Base64 编码/解码
Base64 编码/解码

Base64 编码/解码