使用Ice实现延迟队列
Yizzuide edited this page Feb 14, 2022
·
18 revisions
1.15.0+
模块的设计实现原理参考《有赞延迟队列设计》,详细架构图如下:
有赞的这个架构设计是基于Redis的,我们为什么不使用RabbitMQ?可能是用的不是这个MQ,可能项目的并发量不高,可能是项目中只用到了Redis,还可能是维护成本的增加...
目前能找到的Java版开源只有delay-queue,对已有的开源实现进行对比:
开源项目 | 支持Spring | 开发文档 | 集成方案 | 模块化 | 可扩展性 | 配置性 | 超时延迟增长 | 消费调度 | 消费个数 |
---|---|---|---|---|---|---|---|---|---|
delay-queue | 支持 | 无 | 微服务 | 无 | 无 | 无 | 有计划支持 | 无 | 不支持 |
Ice | 支持 | 有 | 微服务和单体应用 | 有 | 支持 | 强 | 有 | 有 | 支持 |
当前文档使用的版本为2.1.1,使用以下坐标安装:
<dependency>
<groupId>com.github.yizzuide</groupId>
<artifactId>milkomeda-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
<!-- redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
spring:
# Ice模块的调度线程池
scheduling:
thread-name-prefix: ice-
pool:
size: 20
shutdown:
await-termination: true
await-termination-period: 60s
milkomeda:
ice:
# 延迟队列分桶数量(默认为3)
delay-bucket-count: 2
# 消费执行超时时间(默认5000ms)
ttr: 10000
# 重试时添加延迟增长因子(默认为1,计算公式:delay * (retryCount++) * factor)
retry-delay-multi-factor: 2
注意:本模块依赖SpringBoot data Redis,这里需要添加redis连接配置(下面的配置不再说明)
@EnableIceServer
@SpringBootApplication
public class IceServerApplication {
public static void main(String[] args) {
SpringApplication.run(IceServerApplication.class, args);
}
}
@RestController
@RequestMapping("job")
public class JobController {
@Autowired
private Ice ice;
/**
* 延迟一个Job
* @param job 必需设置属性:id(唯一id), topic(消费Topic标识), delay(延迟ms), ttr(消费执行超时ms), retryCount(重试次数), body(JSON业务数据)
*/
@RequestMapping("push")
public void push(@RequestBody Job job) {
ice.add(job);
}
/**
* 轮询可消费Job
* @param topic 消费Topic标识
* @param count 获取最多消费个数
* @return List
*/
@RequestMapping("pop")
public List<Job<Map>> pop(String topic, Integer count) {
List<Job<Map<String, Object>>> jobs = Collections.emptyList();
try {
if (count == null || count < 2) {
Job<Map<String, Object>> job = ice.pop(topic);
if (job != null) {
jobs = Collections.singletonList(job);
}
return jobs;
}
jobs = ice.pop(topic, count);
return jobs;
} finally {
// 标记完成,清除元数据
if (!CollectionUtils.isEmpty(jobs)) {
ice.finish(jobs);
}
}
}
}
spring:
# Ice模块的调度线程池
scheduling:
thread-name-prefix: ice-
pool:
size: 20
shutdown:
await-termination: true
await-termination-period: 60s
milkomeda:
ice:
# 最大消费个数(默认为10)
task-topic-pop-max-size: 5
# 消费轮询间隔(默认5000ms)
task-execute-rate: 2000
@EnableIceClient
@SpringBootApplication
public class IceClientApplication {
public static void main(String[] args) {
SpringApplication.run(IceClientApplication.class, args);
}
}
@Slf4j
// 注解为Topic接收组件
@IceHandler
public class ProductCheckHandler {
// 监听topic消息,topic_xxx需要根据在调用微服务端添加Job的topic来指定
@IceListener(topic = "topic_xxx")
public void handle(List<Job<Product>> jobs) {
log.info("job list: {}", jobs);
}
}
spring:
# Ice模块的调度线程池
scheduling:
thread-name-prefix: ice-
pool:
size: 20
shutdown:
await-termination: true
await-termination-period: 60s
milkomeda:
ice:
# 延迟队列分桶数量(默认为3)
delay-bucket-count: 2
# 消费执行超时时间(默认5000ms)
ttr: 10000
# 重试时添加延迟增长因子(默认为1,计算公式:delay * (retryCount++) * factor)
retry-delay-multi-factor: 2
# 开启Task功能(使用@EnableIce时消费端需要配置为true)
enable-task: true
# 最大消费个数(默认为10)
task-topic-pop-max-size: 5
# 消费轮询间隔(默认5000ms)
task-execute-rate: 2000
@EnableIce
@SpringBootApplication
public class MilkomedaDemoApplication {
public static void main(String[] args) {
SpringApplication.run(MilkomedaDemoApplication.class, args);
}
}
@Slf4j
@RequestMapping("ice")
@RestController
public class ProductController {
@Autowired
private Ice ice;
@RequestMapping("product/publish")
public ResponseEntity publish(Product product) {
log.info("正在上传商品:{}", product.getId());
// 模拟审核商品。。
if (product.getPics() == null) {
log.info("当前商品没有上传图片,加入黑名单");
ice.add(product.getId(), "topic_product_check", product, 10000);
}
return ResponseEntity.ok("ok");
}
}
@Slf4j
// 注解为Topic接收组件
@IceHandler
public class ProductCheckHandler {
// 监听topic消息
// @IceListener(topic = "topic_product_check")
// 监听topic消息(支持SpEL)
@IceListener(topic = "#target.topicName()")
public void handle(List<Product> products) {
log.info("product list: {}", products);
}
// 该方法被上面SpEL调用
public String topicName() {
return "topic_product_check";
}
}