灵感来袭,基于Redis的分布式延迟队列

栏目: IT技术 · 发布时间: 4年前

内容简介:延迟队列,也就是一定时间之后将消息体放入队列,然后消费者才能正常消费。比如1分钟之后发送短信,发送邮件,检测数据状态等。如果你项目中使用了redisson,那么恭喜你,使用延迟队列将非常的简单。

延迟队列

延迟队列,也就是一定时间之后将消息体放入队列,然后消费者才能正常消费。比如1分钟之后发送短信,发送邮件,检测数据状态等。

Redisson Delayed Queue

如果你项目中使用了redisson,那么恭喜你,使用延迟队列将非常的简单。

灵感来袭,基于 <a href='https://www.codercto.com/topics/18994.html'>Redis</a> 的分布式延迟队列

基于Redis的Redisson分布式延迟队列(Delayed Queue)结构的 RDelayedQueue Java对象在实现了 RQueue 接口的基础上提供了向队列按要求延迟添加项目的功能。该功能可以用来实现消息传送延迟按几何增长或几何衰减的发送策略。

RQueue<String> distinationQueue = ...
RDelayedQueue<String> delayedQueue = getDelayedQueue(distinationQueue);
// 10秒钟以后将消息发送到指定队列
delayedQueue.offer("msg1", 10, TimeUnit.SECONDS);
// 一分钟以后将消息发送到指定队列
delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);

在该对象不再需要的情况下,应该主动销毁。仅在相关的Redisson对象也需要关闭的时候可以不用主动销毁。

Java DelayQueue

DelayQueue它本质上是一个队列,而这个队列里也只有存放Delayed的子类才有意义。

灵感来袭,基于Redis的分布式延迟队列

延迟队列demo

public class DelayTask implements Delayed {
    private long startDate;
    public DelayTask(Long delayMillions) {
        this.startDate = System.currentTimeMillis() + delayMillions;
    }


    @Override
    public int compareTo(Delayed o) {
        Long.compare(this.getDelay(TimeUnit.NANOSECONDS), o.getDelay(TimeUnit.NANOSECONDS));
    }


    @Override
    public long getDelay(TimeUnit unit) {
        return this.startDate - System.currentTimeMillis();
    }

    public static void main(String[] args) throws Exception {
        BlockingQueue<DelayTask> queue = new DelayQueue<>();
        DelayTask delayTask = new DelayTask(1000 * 5L);
        queue.put(delayTask);
        while (queue.size()>0){
            queue.take();
        }
    }
}

延迟队列消费原理

灵感来袭,基于Redis的分布式延迟队列

源码中出现了三次await字眼:

  • 第一次是当队列为空时,等待;

  • 第二次等待是因为,发现有任务,没有到执行时间,并且有准备执行的线程(leader),那不好意思,还得接续等待直到下一个可执行的任务。

  • 第三次是真正延时的地方了,available.awaitNanos(delay),此时也没有别的线程要执行,也就是我将要执行,等待剩下的延迟时间即可。

延迟队列生产原理

灵感来袭,基于Redis的分布式延迟队列

为保证消费者正常消费,如果优先队列头元素和当前放入元素相等,则说明当前元素消费的优先级高,重置准备消费的线程(leader)为null,唤醒消费者线程重新执行take方法逻辑。

手写一个Redis延迟队列

Redis延迟队列设计

灵感来袭,基于Redis的分布式延迟队列

延迟消息体设计

灵感来袭,基于Redis的分布式延迟队列

延迟消息体Message实现了Delayed接口,这样Java DelayQueue就知道什么时候取出消息体。

Redis延迟队列实现

灵感来袭,基于Redis的分布式延迟队列

RedisDelayQueue构造函数依赖 redis操作缓存服务对象目标队列名称 (redis key)。

offer方法传入member(具体消息),delay(延迟时间),timeUnit(时间单位),然后封装成延迟消息体Message对象,放入Java DelayQueue中。

run方法是一个循环体,不断的从Java DelayQueue对象中获取消息体,然后放入redis对应的目标队列里。

延迟队列测试demo

灵感来袭,基于Redis的分布式延迟队列

控制台打印效果

灵感来袭,基于Redis的分布式延迟队列

思考

这种方案实现比较简单,使用的时候一定要谨慎,应用于延迟小,消息量不大的场景是没问题的,毕竟Java DelayQueue是占用内存的。另外也可以考虑利用Redis的sorted set 结构实现延迟队列,使用TimeStamp作为score,比如你的任务是要延迟5分钟,那么就在当前时间上加5分钟作为 score ,轮询任务每秒只轮询 score 大于当前时间的 key即可,如果任务支持有误差,那么当没有扫描到有效数据的时候可以休眠对应时间再继续轮询。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

算法概论

算法概论

Sanjoy Dasgupta、Christos Papadimitriou、Umesh Vazirani / 王沛、唐扬斌、刘齐军 / 清华大学出版社 / 2008-7 / 39.99元

《国外经典教材·算法概论》涵盖了绝大多数算法设计中的常用技术。在表达每一种技术时,阐述它的应用背景,强调每个算法运转背后的简洁数学思想,注意运用与其他技术类比的方法来说明它的特征,并提供了大量相应实际问题的例子。《国外经典教材·算法概论》同时也注重了对每一种算法的复杂性分析。全书共10章,从基本的数字算法人手,先后介绍了分治、图的遍历、贪心算法、动态规划、线性规划等技术,对NP完全问题进行厂基本而......一起来看看 《算法概论》 这本书的介绍吧!

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具