RabbitMQ RabbitMQ 学习笔记 -- 08 RabbitTemplate 及消息序列化

hector · 2020-10-24 16:20:33 · 热度: 89

生产者发送消息时可以为消息指定一些参数

  1. Delivery mode: 是否持久化,1 - Non-persistent,2 - Persistent
  2. Headers:头文件可以有任何名称。这里只能设置长字符串头。
  3. Properties: 设置消息属性(传递模式和头信息是最常见的情况)。无效的属性将被忽略. Valid properties are:

content_type : 消息内容的类型
content_encoding: 消息内容的编码格式
priority: 消息的优先级
correlation_id:关联id
reply_to: 用于指定回复的队列的名称
expiration: 消息的失效时间
message _id: 消息id
timestamp:消息的时间戳
type: 类型
user_id: 用户id
app_id: 应用程序id
cluster_id: 集群id

  1. Payload: 消息内容

在 web 控制台发布消息配置:

web 控制台

RabbitTemplate

RabbitTemplate中,发现不管是初始化还是默认的MessageConverter都是SimpleMessageConverter

// messageConverter 默认实例化 SimpleMessageConverter
private MessageConverter messageConverter = new SimpleMessageConverter();
...
/**
*  便捷的构造器与setter注入一起使用
*/
public RabbitTemplate() {
    initDefaultStrategies();
}
...
/**
*  设置默认策略。如果需要,子类可以重写。
*/
protected void initDefaultStrategies() {
    setMessageConverter(new SimpleMessageConverter());
}

我们进入SimpleMessageConverter类中看其默认的转换逻辑

/**
 * Converts from a AMQP Message to an Object.
 */
@Override
public Object fromMessage(Message message) throws MessageConversionException {
    Object content = null;
    MessageProperties properties = message.getMessageProperties();
    if (properties != null) {
        String contentType = properties.getContentType();
        // 如果 content_type 是以 text 为开头,则把消息转换成 String 类型
        if (contentType != null && contentType.startsWith("text")) {
            String encoding = properties.getContentEncoding();
            if (encoding == null) {
                encoding = this.defaultCharset;
            }
            try {
                content = new String(message.getBody(), encoding);
            }
            catch (UnsupportedEncodingException e) {
                throw new MessageConversionException("failed to convert text-based Message content", e);
            }
        }
        // 其他类型转换为 application/x-java-serialized-object
        else if (contentType != null && contentType.equals(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT)) {
            try {
                content = SerializationUtils.deserialize(
                        createObjectInputStream(new ByteArrayInputStream(message.getBody()), this.codebaseUrl));
            }
            catch (IOException | IllegalArgumentException | IllegalStateException e) {
                throw new MessageConversionException("failed to convert serialized Message content", e);
            }
        }
    }
    if (content == null) {
        content = message.getBody();
    }
    return content;
}

MessageConverter:

消息转换器 MessageConverter 重要的两个方法
// 将 java 对象和属性对象转换成 Message 对象。
Message toMessage(Object object, MessageProperties messageProperties);

// 将消息对象转换成 java 对象。
Object fromMessage(Message message) throws MessageConversionException;

RabbitTemplate 内部通过 MessageConverter 把 Message 转换成 java 对象,用于将对象参数解析为convertAndSend方法以及 receiveAndConvert 方法的对象结果。

默认转换器是 SimpleMessageConverter,它能够根据消息内容类型头处理字节数组、字符串和可序列化对象。

SimpleMessageConverter 处理的逻辑

  • 如果 content_type 是以 text 为开头,则把消息转换成 String 类型
  • 如果 content_type的 值是 application/x-java-serialized-object 则把消息序列化为 java 对象,否则,把消息转换成字节数组。

Message 内容对象序列化与反序列化

使用 Java 序列化与反序列化

  • 默认的 SimpleMessageConverter 在发送消息时会将对象序列化成字节数组,若要反序列化对象,需要自定义 MessageConverter
  • SimpleMessageConverter 对于要发送的消息体 body 为 byte[] 时不进行处理,如果是 String 则转成字节数组,如果是 Java 对象,则使用 jdk 序列化将消息转成字节数组,转出来的结果较大,含class类名,类相应方法等信息。因此性能较差。当使用 RabbitMQ 作为中间件时,数据量比较大,此时就要考虑使用类似 Jackson2JsonMessageConverter 等序列化形式以此提高性能

使用 JSON 序列化与反序列化

  • RabbitMQ 提供了 Jackson2JsonMessageConverter 来支持消息内容 JSON 序列化与反序列化
  • 消息发送者在发送消息时应设置 MessageConverter 为 Jackson2JsonMessageConverter

    @Test
    public void demo_06_Producer() {

      String routingKey = "hello";
      TestA a = new TestA();
      a.setFieldA("FBI WARNING");
      // 设置 MessageConverter 
      rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
      rabbitTemplate.convertAndSend(routingKey, a);
      System.out.println("发送成功");
    

    }

  • 消费者也应该配置 MessageConverter 为 Jackson2JsonMessageConverter,这样消费者反序列化就能匹配成功

    @Configuration
    public class RabbitMQConfig {

      @Bean
      public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
          SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
          factory.setConnectionFactory(connectionFactory);
          // 临时设置 MessageConverter 为 Jackson2JsonMessageConverter
          factory.setMessageConverter(new Jackson2JsonMessageConverter());
          return factory;
      }
    

    }

    注意:被序列化对象应提供一个无参的构造函数,否则会抛出异常

为您推荐与 rabbitmq 相关的帖子:

暂无回复。
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册