聊聊redisson的DelayedQueue

栏目: Lua · 发布时间: 5年前

内容简介:redisson-3.8.1-sources.jar!/org/redisson/RedissonDelayedQueue.javaredisson-3.8.1-sources.jar!/org/redisson/RedissonDelayedQueue.javaredisson-3.8.1-sources.jar!/org/redisson/QueueTransferService.java
<dependency>
			<groupId>org.redisson</groupId>
			<artifactId>redisson</artifactId>
			<version>3.8.1</version>
		</dependency>
复制代码

实例

@Test
    public void testDelayedQueue() throws InterruptedException {
        Config config = new Config();
        config.useSingleServer()
                .setAddress("redis://192.168.99.100:6379");
        RedissonClient redisson = Redisson.create(config);
        RBlockingQueue<String> blockingQueue = redisson.getBlockingQueue("dest_queue1");
        RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingQueue);
        delayedQueue.offer("demo", 10, TimeUnit.SECONDS);
        Assert.assertFalse(blockingQueue.contains("demo"));
        TimeUnit.SECONDS.sleep(15);
        Assert.assertTrue(blockingQueue.contains("demo"));
    }
复制代码
  • 这里使用了两个queue,对delayedQueue的offer操作是直接进入delayedQueue,但是delay是作用在目标队列上,这里就是RBlockingQueue

源码解析

RDelayedQueue.offer

redisson-3.8.1-sources.jar!/org/redisson/RedissonDelayedQueue.java

public class RedissonDelayedQueue<V> extends RedissonExpirable implements RDelayedQueue<V> {

    private final QueueTransferService queueTransferService;
    private final String channelName;
    private final String queueName;
    private final String timeoutSetName;
    
    protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) {
        super(codec, commandExecutor, name);
        channelName = prefixName("redisson_delay_queue_channel", getName());
        queueName = prefixName("redisson_delay_queue", getName());
        timeoutSetName = prefixName("redisson_delay_queue_timeout", getName());
        
        //QueueTransferTask task = ......
        
        queueTransferService.schedule(queueName, task);
        
        this.queueTransferService = queueTransferService;
    }

    public void offer(V e, long delay, TimeUnit timeUnit) {
        get(offerAsync(e, delay, timeUnit));
    }
    
    public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) {
        long delayInMs = timeUnit.toMillis(delay);
        long timeout = System.currentTimeMillis() + delayInMs;
     
        long randomId = PlatformDependent.threadLocalRandom().nextLong();
        return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID,
                "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);" 
              + "redis.call('zadd', KEYS[2], ARGV[1], value);"
              + "redis.call('rpush', KEYS[3], value);"
              // if new object added to queue head when publish its startTime 
              // to all scheduler workers 
              + "local v = redis.call('zrange', KEYS[2], 0, 0); "
              + "if v[1] == value then "
                 + "redis.call('publish', KEYS[4], ARGV[1]); "
              + "end;"
                 ,
              Arrays.<Object>asList(getName(), timeoutSetName, queueName, channelName), 
              timeout, randomId, encode(e));
    }

    public ByteBuf encode(Object value) {
        if (commandExecutor.isRedissonReferenceSupportEnabled()) {
            RedissonReference reference = RedissonObjectFactory.toReference(commandExecutor.getConnectionManager().getCfg(), value);
            if (reference != null) {
                value = reference;
            }
        }
        
        try {
            return codec.getValueEncoder().encode(value);
        } catch (IOException e) {
            throw new IllegalArgumentException(e);
        }
    }

    public static String prefixName(String prefix, String name) {
        if (name.contains("{")) {
            return prefix + ":" + name;
        }
        return prefix + ":{" + name + "}";
    }

    //......
}
复制代码
  • 这里使用的是一段 lua 脚本,其中keys参数数组有四个值,KEYS[1]为getName(), KEYS[2]为timeoutSetName, KEYS[3]为queueName, KEYS[4]为channelName
  • 变量有三个,ARGV[1]为timeout,ARGV[2]为randomId,ARGV[3]为encode(e)
  • 这段lua脚本对timeoutSetName的zset添加一个结构体,其score为timeout值;对queueName的list的表尾添加结构体;然后判断timeoutSetName的zset的第一个元素是否是当前的结构体,如果是则对channel发布timeout消息

queueTransferService.schedule

redisson-3.8.1-sources.jar!/org/redisson/RedissonDelayedQueue.java

QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) {
            
            @Override
            protected RFuture<Long> pushTaskAsync() {
                return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
                        "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
                      + "if #expiredValues > 0 then "
                          + "for i, v in ipairs(expiredValues) do "
                              + "local randomId, value = struct.unpack('dLc0', v);"
                              + "redis.call('rpush', KEYS[1], value);"
                              + "redis.call('lrem', KEYS[3], 1, v);"
                          + "end; "
                          + "redis.call('zrem', KEYS[2], unpack(expiredValues));"
                      + "end; "
                        // get startTime from scheduler queue head task
                      + "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "
                      + "if v[1] ~= nil then "
                         + "return v[2]; "
                      + "end "
                      + "return nil;",
                      Arrays.<Object>asList(getName(), timeoutSetName, queueName), 
                      System.currentTimeMillis(), 100);
            }
            
            @Override
            protected RTopic<Long> getTopic() {
                return new RedissonTopic<Long>(LongCodec.INSTANCE, commandExecutor, channelName);
            }
        };
        
        queueTransferService.schedule(queueName, task);
复制代码
  • RedissonDelayedQueue构造器里头对QueueTransferTask进行调度
  • 调度执行的是pushTaskAsync方法,主要就是将到期的元素从元素队列移到目标队列
  • 这里使用一段lua脚本,KEYS[1]为getName(),KEYS[2]为timeoutSetName,KEYS[3]为queueName;ARGV[1]为当前时间戳,ARGV[2]为100
  • 这里调用zrangebyscore,对timeoutSetName的zset使用timeout参数进行排序,取得分介于0和当前时间戳的元素,取前200条
  • 如果有值表示该元素需要移交到目标队列,然后调用rpush移交到目标队列,再调用lrem从元素队列移除,最后在从timeoutSetName的zset中删除掉已经处理的这些元素
  • 处理完过元素转移之后,再取timeoutSetName的zset的第一个元素的得分返回,如果没有返回nil

QueueTransferService.schedule

redisson-3.8.1-sources.jar!/org/redisson/QueueTransferService.java

public class QueueTransferService {

    private final ConcurrentMap<String, QueueTransferTask> tasks = PlatformDependent.newConcurrentHashMap();
    
    public synchronized void schedule(String name, QueueTransferTask task) {
        QueueTransferTask oldTask = tasks.putIfAbsent(name, task);
        if (oldTask == null) {
            task.start();
        } else {
            oldTask.incUsage();
        }
    }
    
    public synchronized void remove(String name) {
        QueueTransferTask task = tasks.get(name);
        if (task != null) {
            if (task.decUsage() == 0) {
                tasks.remove(name, task);
                task.stop();
            }
        }
    }
}
复制代码
  • 这里的schedule方法首先添加到ConcurrentMap中,如果该任务已经存在,则调用oldTask.incUsage(),不存在则启动该任务

QueueTransferTask.start

redisson-3.8.1-sources.jar!/org/redisson/QueueTransferTask.java

public void start() {
        RTopic<Long> schedulerTopic = getTopic();
        statusListenerId = schedulerTopic.addListener(new BaseStatusListener() {
            @Override
            public void onSubscribe(String channel) {
                pushTask();
            }
        });
        
        messageListenerId = schedulerTopic.addListener(new MessageListener<Long>() {
            @Override
            public void onMessage(CharSequence channel, Long startTime) {
                scheduleTask(startTime);
            }
        });
    }

    private void scheduleTask(final Long startTime) {
        TimeoutTask oldTimeout = lastTimeout.get();
        if (startTime == null) {
            return;
        }
        
        if (oldTimeout != null) {
            oldTimeout.getTask().cancel();
        }
        
        long delay = startTime - System.currentTimeMillis();
        if (delay > 10) {
            Timeout timeout = connectionManager.newTimeout(new TimerTask() {                    
                @Override
                public void run(Timeout timeout) throws Exception {
                    pushTask();
                    
                    TimeoutTask currentTimeout = lastTimeout.get();
                    if (currentTimeout.getTask() == timeout) {
                        lastTimeout.compareAndSet(currentTimeout, null);
                    }
                }
            }, delay, TimeUnit.MILLISECONDS);
            if (!lastTimeout.compareAndSet(oldTimeout, new TimeoutTask(startTime, timeout))) {
                timeout.cancel();
            }
        } else {
            pushTask();
        }
    }

    private void pushTask() {
        RFuture<Long> startTimeFuture = pushTaskAsync();
        startTimeFuture.addListener(new FutureListener<Long>() {
            @Override
            public void operationComplete(io.netty.util.concurrent.Future<Long> future) throws Exception {
                if (!future.isSuccess()) {
                    if (future.cause() instanceof RedissonShutdownException) {
                        return;
                    }
                    log.error(future.cause().getMessage(), future.cause());
                    scheduleTask(System.currentTimeMillis() + 5 * 1000L);
                    return;
                }
                
                if (future.getNow() != null) {
                    scheduleTask(future.getNow());
                }
            }
        });
    }
复制代码
timeoutSetName的zset的第一个元素的得分

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Transcending CSS

Transcending CSS

Andy Clarke、Molly E. Holzschlag / New Riders / November 15, 2006 / $49.99

As the Web evolves to incorporate new standards and the latest browsers offer new possibilities for creative design, the art of creating Web sites is also changing. Few Web designers are experienced p......一起来看看 《Transcending CSS》 这本书的介绍吧!

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码

MD5 加密
MD5 加密

MD5 加密工具

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具