一文看懂Rabbitmq,从安装到实战演练

栏目: 后端 · 发布时间: 5年前

内容简介:随着微服务概念发展,大应用逐步拆分为小应用,提高开发效率,专门的人做专门的事情,逐渐的流行起来。在微服务上实现通信的方式大部分是采用rpc方式,也有升级版本的grpc。还有另外一种实现就是使用mq来进行解耦。

随着微服务概念发展,大应用逐步拆分为小应用,提高开发效率,专门的人做专门的事情,逐渐的流行起来。

在微服务上实现通信的方式大部分是采用rpc方式,也有升级版本的grpc。

还有另外一种实现就是使用mq来进行解耦。

今天初识mq,快速入门先,准备一个环境实现案例,该文涉及以下内容:

  • 安装rabbitmq
  • mq能解决的问题
  • 实战演练

安装

rabbitmq的安装我们采用 docker 的方式,docker方便我们快速的实现rabbitmq的安装,不需要再对安装mq进行头疼。

docker 的两种方式

docker方式

//拉取mq镜像
docker pull rabbitmq
//启动mq
docker run -d --name rabbitmq3.7.7 -p 5672:5672 -p 15672:15672 -v `pwd`/data:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost  -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin df80af9ca0c9

复制代码

说明:

  1. -d 后台运行容器;
  2. --name 指定容器名;
  3. -p 指定服务运行的端口(5672:应用访问端口;15672:控制台Web端口号);
  4. -v 映射目录或文件;
  5. --hostname 主机名(RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名);
  6. -e 指定环境变量;(RABBITMQ_DEFAULT_VHOST:默认虚拟机名;RABBITMQ_DEFAULT_USER:默认的用户名;RABBITMQ_DEFAULT_PASS:默认用户名的密码)

docker-compose 方式

version: "3"
services:
   rabbit:
      image: docker.infervision.com/library/rabbitmq:3-management
      ports:
        - "4369:4369"
        - "5671:5671"
        - "5672:5672"
        - "15671:15671"
        - "15672:15672"
      restart: always
      environment:
        - RABBITMQ_DEFAULT_USER=test
        - RABBITMQ_DEFAULT_PASS=test
      volumes:
        - /home/ruiqi/Desktop/disk/rabbitmq:/var/lib/rabbitmq
      container_name: rabbitmq

在该文件目录下执行:docker-compose up -d
复制代码

下载的rabbitmq内置管理界面,ip:15672 用户名与密码是我们在启动是写入的。

一文看懂Rabbitmq,从安装到实战演练

mq能解决什么?

通俗的来说,主要使用MQ来解决以下三个问题。

异步消息

在业务中,经常会遇到同时发送邮件,短信或者其他通知内容服务。业务初期,采用同步或者异步处理方式都需要等发送完毕后再返回给客户端。中间有一定的延迟

一文看懂Rabbitmq,从安装到实战演练

业务增长后,此方式系统性能就会造成很大的浪费。采用消息队列,将这几个服务进行解耦,只需将消息内容发送到消息队列中,降低用户的等待时间,体验效果比原先好很多。

一文看懂Rabbitmq,从安装到实战演练

应用间解耦

同一个服务中可能需要其他服务的配合才能完成一项业务操作.还是拿常见的购物案例来说明。

在京东下单支付后,消息要通知到商家,邮件通知用户已经购买某商品。

如果这两种操作都采用同步执行,用户等待时间会变长。

采用mq方式之后,订单系统将消息持久化到mq上,返回给用户下单成功。

  • 商家接收到用户的下单信息,进行处理,如果有库存管理那么需要进行库存处理。
  • 邮件通知用户,告知用户下单成功。

mq保证消息的可靠投递,不会导致消息丢失,保证消息的高可靠性。如果库存出现失败也不会导致用户下单失败的情况,可以重新进行投递。

流量削峰

流量削峰,一般是同一时间涌进来很多请求,后台处理不过来。那么需要采用削峰方式来处理。

简单来说是通过一个队列承接瞬时过来流量洪峰,在消费端平滑的将消息推送出去,如果消费者消费不及时可以将消息内容持久化在队列中,消息不存在丢失。

  1. 消费端不及时进行消费,还可以动态的扩增消费者数量,提高消费速度。
  2. 设定相关的阀值,多余的消息直接丢弃,告知用户秒杀失败等业务消息内容。
一文看懂Rabbitmq,从安装到实战演练

实战案例

本文是按照 Java 语言进行,使用Spring boot搭建,包管理工具Gradle。

导入rabbitmq jar包

compile("org.springframework.boot:spring-boot-starter-amqp:1.5.10.RELEASE")
复制代码

配置mq

yaml 文件配置

spring:
  rabbitmq:
    host: 192.168.110.5
    port: 5672
    username: tuixiang
    password: tuixiang
复制代码

准备好模板类,供后面直接使用

package com.infervision.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author: fruiqi
 * @date: 19-2-18 下午2:42
 * @version:1.0 rabbit配置
 **/
@Configuration
public class RabbitConfig {

    /**
     * 日志
     **/
    private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class);


    @Value("${spring.rabbitmq.username}")
    String userName;

    @Value("${spring.rabbitmq.password}")
    String userPassword;

    @Value("${spring.rabbitmq.host}")
    String host;

    @Value("${spring.rabbitmq.port}")
    Integer port;

    /**
     * 注入
     *
     * @param
     * @return com.rabbitmq.client.Connection
     * @author fruiqi
     * @date 19-1-22 下午5:41
     **/
    @Bean
    public ConnectionFactory getConnection() throws Exception {
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUsername(userName);
        factory.setPassword(userPassword);
        factory.setHost(host);
        factory.setPort(port);
        return factory;
    }


    /**
     * 创建制定的 监听容器
     *
     * @param queueName  监听的队列名字
     * @param listenerChannel 设置是否将监听的频道 公开给已注册的
     * @param PrefetchCount  告诉代理一次请求多少条消息过来
     * @param ConcurrentConsumers  制定创建多少个并发的消费者数量
     * @param acknowledgeMode  消息确认模式
     * @param listener 监听器
     * @return org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer
     **/
    public SimpleMessageListenerContainer setSimpleMessageListenerContainer(String queueName, boolean listenerChannel,
                                                                            int PrefetchCount, int ConcurrentConsumers,
                                                                            AcknowledgeMode acknowledgeMode,
                                                                            ChannelAwareMessageListener listener) throws Exception {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(getConnection());
        container.setQueueNames(queueName);
        container.setExposeListenerChannel(listenerChannel);
        container.setPrefetchCount(PrefetchCount);
        container.setConcurrentConsumers(ConcurrentConsumers);
        container.setAcknowledgeMode(acknowledgeMode);
        container.setMessageListener(listener);
        return container;
    }
}


package com.infervision.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author: fruiqi
 * @date: 19-2-18 下午2:51
 * @version:1.0
 **/
@Component
public class MsgSender {


    private static final Logger logger = LoggerFactory.getLogger(MsgSender.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * @param exchange 交换机名称
     * @param routingKey 路由名称
     * @param message 消息内容
     * @return void
     * @description //TODO 发送消息到消息队列中
     **/
    public void sendMsg(String exchange, String routingKey, Object message) {
        try {
            rabbitTemplate.convertAndSend(exchange,routingKey,message);
        }catch (Exception e){
            logger.error("[ERROR] send statistic message error ",e);
        }
    }

}
复制代码

实例链接mq

在使用rabbitmq 有的时候需要自己客户端创建queue,但有的时候并不是自己创建,在rabbitmq页面上进行创建queue,其他消费者直接引用。

客户端创建mq

//初始化队列,如果队列已存在,则不作任何处理 如果有权限控制如下操作并不能实现
    @Bean
    public Queue dicomQueue() {
        return new Queue(getMacPreStr(DICOM_QUEUE_NAME));
    }

    //初始化交换机
    @Bean
    public Exchange topicExchange() {
        return ExchangeBuilder.topicExchange((DEFAULT_TOPIC_EXCHANGE).durable(true).build();
    }

    // 将队列与交换机按照路由规则进行绑定
    @Bean
    Binding bindingExchangeDicomQueue(Queue dicomQueue, TopicExchange topicExchange) {
        return BindingBuilder.bind(dicomQueue).to(topicExchange).with(DICOM_QUEUE_ROUTING_KEY);
    }

复制代码

使用

队列的使用:一个是发送,属于生产者;一个是监听,属于消费者.

生产者实现

在mq配置模板类中,专门实现了一个发送类,发送文件内容,直接调用发送接口即可。

@Autowired
    RabbitService rabbitService;

    /**
     * 练习 发送数据到 mq中
     * 1. 发送的数据会到 mq中
     * 2. 我们配置的 listener 是用来消费消息的
     * 3. 客户端配置 可以参考 RabbitClientConfig
     * @param name 名字编号
     * @param vo   实体内容
     * @return: com.infervision.model.NameVo
     */
    @ApiOperation(value = "增加name信息", notes = "实体信息")
    @PostMapping(value = "/{name}")
    @ApiImplicitParam(paramType = "query", name = "name", value = "用户名字", required = true, dataType = "string")
    public NameVo addNameVo(@RequestParam String name, @RequestBody NameVo vo) {
        rabbitService.sendMessage(DEFAULT_TOPIC_TEST_EXCHANGE, LABEL_FIEL_XML_QUEUE_ROUTING_KEY, JSON.toJSONString(vo));
        return vo;
    }


   @Service
public class RabbitServiceImpl implements RabbitService {

    @Autowired
    MsgSender msgSender;

    /**
     * 尝试发送 message 到mq中
     * @param message
     * @return: void
     */
    @Override
    public void sendMessage(String exchange, String routingKey,String message) {
        msgSender.sendMsg(exchange, routingKey, message);
    }
}

复制代码

消费者实现

消费者实现有两种方式,一种通过注解的方式监听,一种是实现ChannelAwareMessageListener类来实现消费。

注解实现监听

//在方法上进行注入。配置工厂帮助提高单个消费者一次性消费的消息数量,设置多少个消费者,用来提高程序的性能
@RabbitListener(queues = "dicom.queue",containerFactory = "multipleConsumerContainerFactory")
    public void processDicomMessage(Message message, Channel channel) {
            logger.info(message);
    }

// 工厂可以在配置模板类中中配置好。
@Bean("multipleConsumerContainerFactory")
    public SimpleRabbitListenerContainerFactory multipleConsumerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setPrefetchCount(50);
        factory.setConcurrentConsumers(10);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        configurer.configure(factory, connectionFactory);
        return factory;
    }
复制代码

实现接口方式

/**
     * 创建监听器。
     * @author fruiqi
     * @date 19-2-11 下午4:18
     * @param labelStatisticsListener 监听器
     * 调用我们公用的方法
     **/
    @Bean
    public SimpleMessageListenerContainer mqMessageContainer(LabelStatisticsListener labelStatisticsListener) throws Exception {
        SimpleMessageListenerContainer container = rabbitConfig.setSimpleMessageListenerContainer(“queue_name”,
                true, rabbitProperties.getMaximumDelivery(),
                rabbitProperties.getConsumer(), AcknowledgeMode.MANUAL, labelStatisticsListener);
        return container;
    }


@Component
public class LabelStatisticsListener implements ChannelAwareMessageListener {


    private static final Logger logger = LoggerFactory.getLogger(LabelStatisticsListener.class);

    /**
     * 处理传输过来的数据
     * @param message 传送的消息内容
     * @param channel 实现通道
     * @return: void
     */
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        String mes = new String(message.getBody());
        logger.info("[INFO] message is {}",mes);

        // 手动应答 消息已消费
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

    }
}

复制代码

总结

以上内容就完成了rabbitmq 从搭建到使用全部的流程。当然里面还有更多的可以让我们去探讨,比如mq的队列模式,一个系统配置多个mq等等内容。敬请期待我们下一篇mq系列内容。

大家在系统中使用过mq吗?你们使用的mq是什么样的?可以在留言区我们一起探讨哦。

代码存放在: github中

·END·

路虽远,行则必至

本文原发于 同名微信公众号「胖琪的升级之路」,回复「1024」你懂得,给个赞呗。

微信ID:YoungRUIQ

一文看懂Rabbitmq,从安装到实战演练

以上所述就是小编给大家介绍的《一文看懂Rabbitmq,从安装到实战演练》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Out of their Minds

Out of their Minds

Dennis Shasha、Cathy Lazere / Springer / 1998-07-02 / USD 16.00

This best-selling book is now available in an inexpensive softcover format. Imagine living during the Renaissance and being able to interview that eras greatest scientists about their inspirations, di......一起来看看 《Out of their Minds》 这本书的介绍吧!

MD5 加密
MD5 加密

MD5 加密工具

html转js在线工具
html转js在线工具

html转js在线工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换