Skip to content

使用Ice实现延迟队列

Yizzuide edited this page Feb 14, 2022 · 18 revisions

依赖版本

1.15.0+

模块说明

模块的设计实现原理参考《有赞延迟队列设计》,详细架构图如下: 有赞延迟队列设计

有赞的这个架构设计是基于Redis的,我们为什么不使用RabbitMQ?可能是用的不是这个MQ,可能项目的并发量不高,可能是项目中只用到了Redis,还可能是维护成本的增加...

目前能找到的Java版开源只有delay-queue,对已有的开源实现进行对比:

开源项目 支持Spring 开发文档 集成方案 模块化 可扩展性 配置性 超时延迟增长 消费调度 消费个数
delay-queue 支持 微服务 有计划支持 不支持
Ice 支持 微服务和单体应用 支持 支持

安装

当前文档使用的版本为2.1.1,使用以下坐标安装:

<dependency>
  <groupId>com.github.yizzuide</groupId>
  <artifactId>milkomeda-spring-boot-starter</artifactId>
  <version>2.1.1</version>
</dependency>

<!-- redis -->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

微服务布署使用方案

服务端

1.配置

spring:
  # Ice模块的调度线程池
  scheduling:
    thread-name-prefix: ice-
    pool:
      size: 20
    shutdown:
      await-termination: true
      await-termination-period: 60s
milkomeda:
  ice:
    # 延迟队列分桶数量(默认为3)
    delay-bucket-count: 2
    # 消费执行超时时间(默认5000ms)
    ttr: 10000
    # 重试时添加延迟增长因子(默认为1,计算公式:delay * (retryCount++) * factor)
    retry-delay-multi-factor: 2

注意:本模块依赖SpringBoot data Redis,这里需要添加redis连接配置(下面的配置不再说明)

2.启用模块

@EnableIceServer
@SpringBootApplication
public class IceServerApplication {
    public static void main(String[] args) {
        SpringApplication.run(IceServerApplication.class, args);
    }
}

3.开发微服务API

@RestController
@RequestMapping("job")
public class JobController {
    
    @Autowired
    private Ice ice;

    /**
     * 延迟一个Job
     * @param job 必需设置属性:id(唯一id), topic(消费Topic标识), delay(延迟ms), ttr(消费执行超时ms), retryCount(重试次数), body(JSON业务数据)
     */
    @RequestMapping("push")
    public void push(@RequestBody Job job) {
        ice.add(job);
    }

    /**
     * 轮询可消费Job
     * @param topic 消费Topic标识
     * @param count 获取最多消费个数
     * @return List
     */
    @RequestMapping("pop")
    public List<Job<Map>> pop(String topic, Integer count) {
        List<Job<Map<String, Object>>> jobs = Collections.emptyList();
        try {
            if (count == null || count < 2) {
                Job<Map<String, Object>> job = ice.pop(topic);
                if (job != null) {
                    jobs = Collections.singletonList(job);
                }
                return jobs;
            }
            jobs = ice.pop(topic, count);
            return jobs;
        } finally {
            // 标记完成,清除元数据
            if (!CollectionUtils.isEmpty(jobs)) {
                ice.finish(jobs);
            }
        }
    }
}

消费端

1.配置

spring:
  # Ice模块的调度线程池
  scheduling:
    thread-name-prefix: ice-
    pool:
      size: 20
    shutdown:
      await-termination: true
      await-termination-period: 60s
milkomeda:
  ice:
    # 最大消费个数(默认为10)
    task-topic-pop-max-size: 5
    # 消费轮询间隔(默认5000ms)
    task-execute-rate: 2000

2.启用模块

@EnableIceClient
@SpringBootApplication
public class IceClientApplication {
    public static void main(String[] args) {
        SpringApplication.run(IceClientApplication.class, args);
    }
}

3.使用声明式编程方式注册消费组件

@Slf4j
// 注解为Topic接收组件
@IceHandler
public class ProductCheckHandler {

    // 监听topic消息,topic_xxx需要根据在调用微服务端添加Job的topic来指定
    @IceListener(topic = "topic_xxx")
    public void handle(List<Job<Product>> jobs) {
      log.info("job list: {}", jobs);
    }
}

单体应用使用方案

1.配置

spring:
  # Ice模块的调度线程池
  scheduling:
    thread-name-prefix: ice-
    pool:
      size: 20
    shutdown:
      await-termination: true
      await-termination-period: 60s

milkomeda:
  ice:
    # 延迟队列分桶数量(默认为3)
    delay-bucket-count: 2
    # 消费执行超时时间(默认5000ms)
    ttr: 10000
    # 重试时添加延迟增长因子(默认为1,计算公式:delay * (retryCount++) * factor)
    retry-delay-multi-factor: 2
    # 开启Task功能(使用@EnableIce时消费端需要配置为true)
    enable-task: true
    # 最大消费个数(默认为10)
    task-topic-pop-max-size: 5
    # 消费轮询间隔(默认5000ms)
    task-execute-rate: 2000

2.启用模块

@EnableIce
@SpringBootApplication
public class MilkomedaDemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(MilkomedaDemoApplication.class, args);
    }
}

3.在业务接口里调用

@Slf4j
@RequestMapping("ice")
@RestController
public class ProductController {

    @Autowired
    private Ice ice;

    @RequestMapping("product/publish")
    public ResponseEntity publish(Product product) {
        log.info("正在上传商品:{}", product.getId());
        // 模拟审核商品。。
        if (product.getPics() == null) {
            log.info("当前商品没有上传图片,加入黑名单");
            ice.add(product.getId(), "topic_product_check",  product, 10000);
        }
        return ResponseEntity.ok("ok");
    }
}

4.使用声明式编程方式注册消费组件

@Slf4j
// 注解为Topic接收组件
@IceHandler
public class ProductCheckHandler {

    // 监听topic消息
    // @IceListener(topic = "topic_product_check")
    // 监听topic消息(支持SpEL)
    @IceListener(topic = "#target.topicName()")
    public void handle(List<Product> products) {
      log.info("product list: {}", products);
    }

    // 该方法被上面SpEL调用
    public String topicName() {
        return "topic_product_check";
    }
}