DelayQueue系列(三):持久化方案

栏目: 后端 · 发布时间: 5年前

内容简介:原文发表于简书上一篇文章中提到了我们在项目中运用DelayQueue解决了一些需要延迟执行的任务,但是最近我们在生产环境上遇到了一个问题。重启服务器后,那些未执行的延迟任务就消失不见了。于是如何将延迟任务持久化就提上了日程。关于DelayQueue的具体实现方案,已经在上一篇文章

原文发表于简书 DelayQueue之持久化方案 ,本次更新主要是对processTask方法做了优化,以及优化了补偿执行的额外延迟时间设置,可对比阅读。

上一篇文章中提到了我们在项目中运用DelayQueue解决了一些需要延迟执行的任务,但是最近我们在生产环境上遇到了一个问题。重启服务器后,那些未执行的延迟任务就消失不见了。于是如何将延迟任务持久化就提上了日程。

关于DelayQueue的具体实现方案,已经在上一篇文章 DelayQueue系列(二):通用组件 中提到过了。本文就不再复述了。

本期的主题主要是探讨如何将延迟任务进行持久化。

何为延迟任务的持久化?顾名思义,就是将这些延迟任务的执行必要的数据,存储到数据库或 redis 等。

那么为何要进行持久化呢?很简单,因为延迟任务的数据是放在内存里,那么自己需要我们自己写持久化的备案以达到高可用,否则服务器故障宕机或新发布版本而导致服务器重启的时候,那么那些未执行的延迟任务数据将彻底丢失,这显然是我们不愿意见到的。

我目前采用的方案如下:

1、在需要使用到DelayQueue的地方,调用saveDelayTask方法,需要的参数有延迟任务函数策略工厂类的路由tag,执行方法所需的json格式的参数messageBody,延迟多久执行以秒为单位的delayTime。

2、任务调度每5秒去执行getNotCompletedMessageList方法。

大多数情况下,会在预计执行的时间点准时去执行processTask方法,那么异常状况下,如果服务器重启,那么定时任务调度会在一定时间后找到那些没有如期执行的延迟任务,通过定时任务调度的方式依次执行各自任务的processTask方法。

异常状态下,延迟任务执行会比预期执行时间有一定的延后,我设计的方案是目前我们可以允许的范围,这个大家可以酌情设置备选方案延后的时间。

期间会用到yb_delay_task_message这张表,如下是该表的表结构:

CREATE TABLE `yb_delay_task_message` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增id',
  `tag` varchar(128) NOT NULL DEFAULT '' COMMENT '延迟队列执行函数的key',
  `message_body` longtext CHARACTER SET utf8mb4 COMMENT '消息体,以json格式存储',
  `status` tinyint(3) unsigned DEFAULT '0' COMMENT '状态;0:未完成,1:已完成,2:已失败 3:执行中',
  `error_stack` longtext COMMENT '失败堆栈',
  `version` bigint(20) unsigned NOT NULL DEFAULT '0' COMMENT '版本号',
  `ip_address` bigint(20) DEFAULT NULL COMMENT '执行ip地址',
  `delay_time` bigint(20) NOT NULL COMMENT '延迟执行的时间长度',
  `expected_time` datetime DEFAULT NULL COMMENT '预计执行时间',
  `execution_time` datetime DEFAULT NULL COMMENT '实际执行时间',
  `create_time` datetime NOT NULL COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
  PRIMARY KEY (`id`),
  KEY `idx_expectedTime_status` (`expected_time`,`status`)
) ENGINE=InnoDB  DEFAULT CHARSET=utf8 COMMENT='延迟队列消息表';
复制代码

核心代码大致如下,其他代码很简单就不一一公布了。

public void saveDelayTask(String tag, String messageBody, Long delayTime) {
    DelayTaskMessage delayTaskMessage = new DelayTaskMessage();
    delayTaskMessage.setTag(tag);
    LocalDateTime now = LocalDateTime.now();
    delayTaskMessage.setCreateTime(now);
    delayTaskMessage.setUpdateTime(now);
    delayTaskMessage.setDelayTime(delayTime);
    delayTaskMessage.setExpectedTime(now.plusSeconds(delayTime));
    delayTaskMessage.setMessageBody(messageBody);
    delayTaskMessage.setStatus(KafkaMessageStatusEnum.NOT_COMPLETE.getCode());
    int res = delayTaskMessageMapper.insertDelayTaskMessage(delayTaskMessage);
    if (res <= 0) {
        log.error("ybBrokerApp|insertDelayTaskMessage error, res<=0");
        throw new RuntimeException("insertDelayTaskMessage error, res<=0");
    }
    TaskMessage taskMessage = new TaskMessage(delayTime * 1000, messageBody,
            function -> this.processTask(delayTaskMessage));
    DelayQueue<TaskMessage> queue = taskManager.getQueue();
    queue.offer(taskMessage);
}
复制代码

首先来分析一下,用来保存延迟任务的saveDelayTask方法。

tag是指延迟任务的标记,用于指定对应的策略类。

messageBody主要用于存储执行延迟任务的一些必要的数据,以json方法存储。

delayTime是延迟时间,默认以s为单位,主要是便于使用。

这个方法的主要功能是首先保存还未执行的延迟任务,自动根据延迟时间计算该延迟任务的预期执行时间,以便于后续的补偿算法跟踪,然后运用DelayQueue的特性,将这个延迟任务提交给延迟队列执行。

public int processTask(DelayTaskMessage param) {
    DelayTaskMessage delayTaskMessage = delayTaskMessageMapper.getDelayTaskMessageById(param.getId());
    try {
        if (null != delayTaskMessage) {
            if (!Objects.equals(delayTaskMessage.getStatus(), DelayTaskMessageStatusEnum.NOT_COMPLETE.getCode())) {
                log.info("processTask executed already");
                return 1;
            }
            try {
                delayTaskMessage.setIpAddress(InetAddress.getLocalHost().getHostAddress());
            } catch (UnknownHostException ex) {
                log.error("Address.getLocalHost error", ex);
            }
            int res = delayTaskMessageMapper.delayTaskStartProcess(delayTaskMessage);
            if (res <= 0) {
                log.info("delayTaskStartProcess error,maybe processTask executed already");
                return 1;
            }
            //处理逻辑
            DelayTaskExecuteProcessor delayTaskExecuteProcessor = delayTaskExecuteProcessorFactory.getExecuteProcessor(delayTaskMessage.getTag());
            if (delayTaskExecuteProcessor != null) {
                delayTaskExecuteProcessor.execute(delayTaskMessage);
            } else {
                throw new RuntimeException("no such processor,tag=" + delayTaskMessage.getTag());
            }
            delayTaskMessage.setExecutionTime(LocalDateTime.now());
            res = delayTaskMessageMapper.delayTaskProcessSuccess(delayTaskMessage);
            if (res <= 0) {
                log.error("delayTaskProcessSuccess error");
                return 1;
            }
            return 1;
        } else {
            log.error("ybBrokerApp processTask error, delayTaskMessage is null delayTaskMessageId=", param.getId());
            return 0;
        }
    } catch (Exception e) {
        log.error("ybBrokerApp processTask error , param = " + param.toString() + "|", e);
        if (null != delayTaskMessage) {
            delayTaskMessage.setErrorStack(e.getMessage());
            delayTaskMessageMapper.delayTaskProcessFail(delayTaskMessage);
        }
        return 0;
    }
}
复制代码

然后是核心的处理延迟任务的processTask方法。

1、根据id,在数据库寻找到对应需要执行的延迟任务的持久化数据。

2、如果这条持久化数据非空且状态不是未执行的状态,那么提示该任务无需再被执行,防止重复执行。这里的status,一共有四种状态,未执行,执行中,执行成功和执行失败。

3、如果这条持久化数据非空,且是未执行的状态,那么将这条数据的执行状态改为执行中,同时将执行方法的ip地址记录下来,便于后续分析,然后通过version来控制并发问题。只有当version和数据库中的version一致,且数据库中的status为未执行,才允许将状态改为执行中。

4、如果上一步执行成功,则找到tag对应的策略类执行对应的execute方法。

5、接下来,将这条持久化数据的状态改为已执行成功的状态,这里就不需要受version和status的限制了,直接改为执行成功即可。

6、如果执行过程中出现其他异常,那么就将数据的状态改为执行失败。

public List<DelayTaskMessage> getNotCompletedMessageList(int total, int index) {
    LocalDateTime expectedTime = LocalDateTime.now().plusSeconds(1L);
    List<DelayTaskMessage> delayTaskMessageList = delayTaskMessageMapper.getNotCompletedMessageList(expectedTime,total, index);
    if (CollectionUtils.isEmpty(delayTaskMessageList)) {
        return Lists.newArrayList();
    }
    return delayTaskMessageList;
}
复制代码

最后是补偿方案的落实,我是在定时任务中去保证延迟任务一定会被执行至少一次的。至于会不会被重复执行,我是通过在processTask这个方法中去控制的。

我的设计是每隔5s去遍历一下那些过了预期执行时间+1s依然未执行的的延迟任务。然后将这些列表中的延迟任务重新调用processTask方法。

如果最终是通过补偿方案执行的延迟任务会比预期执行时间还要晚执行1到6s。目前在我们的项目中,这个额外延迟是可以被接收的。大家还是要根据实际情况酌情修改这个额外延迟的时间。

我在yb_delay_task_message表中记录了expectedTime(预计执行时间)和executionTime(实际执行时间),所以具体的执行性能可以通过这两个字段去对比。

以上是我针对DelayQueue设计的的持久化方案,如果大家有更好的意见,可以一起讨论哦。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

引爆用户增长

引爆用户增长

黄天文 / 机械工业出版社 / 2017-11-1 / 79.00

本书是用户增长领域的开创性著作,是作者在去哪儿、360、百度等知名企业多年用户增长工作的经验总结。宏观层面,从战略高度构建了一套系统的、科学的用户增长方法论;微观层面,从战术执行细节上针对用户增长体系搭建、用户全生命周期运营等总结了大量能引爆用户增长的实操方法和技巧。 不仅有方法论和技巧,而且非常注重实操。对电商、团购、共享经济、互联网金融等4大行业的50余家企业(360、美团、滴滴等)的1......一起来看看 《引爆用户增长》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具