SpringBoot中并发定时任务的实现、动态定时任务的实现(看这一篇就够了)

果子爸聊技术 2019-04-08 14:24:01 ⋅ 803 阅读

原文链接:https://www.cnblogs.com/baixianlong/p/10659045.html

一、在JAVA开发领域,目前可以通过以下几种方式进行定时任务

1、单机部署模式

  • Timer:jdk中自带的一个定时调度类,可以简单的实现按某一频度进行任务执行。提供的功能比较单一,无法实现复杂的调度任务。

  • ScheduledExecutorService:也是jdk自带的一个基于线程池设计的定时任务类。其每个调度任务都会分配到线程池中的一个线程执行,所以其任务是并发执行的,互不影响。

  • Spring Task:Spring提供的一个任务调度工具,支持注解和配置文件形式,支持Cron表达式,使用简单但功能强大。

  • Quartz:一款功能强大的任务调度器,可以实现较为复杂的调度功能,如每月一号执行、每天凌晨执行、每周五执行等等,还支持分布式调度,就是配置稍显复杂。

2、分布式集群模式(不多介绍,简单提一下)

问题:

I、如何解决定时任务的多次执行?
II、如何解决任务的单点问题,实现任务的故障转移?

问题I的简单思考:

1、固定执行定时任务的机器(可以有效避免多次执行的情况 ,缺点就是单点故障问题)。
2、借助Redis的过期机制和分布式锁。
3、借助mysql的锁机制等。

成熟的解决方案:

1、Quartz:可以去看看这篇文章[Quartz分布式]( https://www.cnblogs.com/jiafuwei/p/6145280.html)。
2、elastic-job:(https://github.com/elasticjob/elastic-job-lite)当当开发的弹性分布式任务调度系统,采用zookeeper实现分布式协调,实现任务高可用以及分片。
3、xxl-job:(https://github.com/xuxueli/xxl-job)是大众点评员发布的分布式任务调度平台,是一个轻量级分布式任务调度框架。
4、saturn:(https://github.com/vipshop/Saturn) 是唯品会提供一个分布式、容错和高可用的作业调度服务框架。

二、SpringTask实现定时任务(这里是基于springboot)

1、简单的定时任务实现

使用方式:

使用@EnableScheduling注解开启对定时任务的支持。
使用@Scheduled 注解即可,基于corn、fixedRate、fixedDelay等一些定时策略来实现定时任务。

使用缺点:

1、多个定时任务使用的是同一个调度线程,所以任务是阻塞执行的,执行效率不高。
2、其次如果出现任务阻塞,导致一些场景的定时计算没有实际意义,比如每天12点的一个计算任务被阻塞到1点去执行,会导致结果并非我们想要的。

使用优点:

1、配置简单
2、适用于单个后台线程执行周期任务,并且保证顺序一致执行的场景

源码分析:

//默认使用的调度器
if(this.taskScheduler == null) {
this.localExecutor = Executors.newSingleThreadScheduledExecutor();
this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
}
//可以看到SingleThreadScheduledExecutor指定的核心线程为1,说白了就是单线程执行
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
//利用了DelayedWorkQueue延时队列作为任务的存放队列,这样便可以实现任务延迟执行或者定时执行
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

  

2、实现并发的定时任务

使用方式:

  • 方式一:由1中我们知道之所以定时任务是阻塞执行,是配置的线程池决定的,那就好办了,换一个不就行了!直接上代码:

      @Configuration
    public class ScheduledConfig implements SchedulingConfigurer {

    @Autowired
    private TaskScheduler myThreadPoolTaskScheduler;

    @Override
    public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
    //简单粗暴的方式直接指定
    //scheduledTaskRegistrar.setScheduler(Executors.newScheduledThreadPool(5));
    //也可以自定义的线程池,方便线程的使用与维护,这里不多说了
    scheduledTaskRegistrar.setTaskScheduler(myThreadPoolTaskScheduler);
    }
    }

    @Bean(name = "myThreadPoolTaskScheduler")
    public TaskScheduler getMyThreadPoolTaskScheduler() {
    ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
    taskScheduler.setPoolSize(10);
    taskScheduler.setThreadNamePrefix("Haina-Scheduled-");
    taskScheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    //调度器shutdown被调用时等待当前被调度的任务完成
    taskScheduler.setWaitForTasksToCompleteOnShutdown(true);
    //等待时长
    taskScheduler.setAwaitTerminationSeconds(60);
    return taskScheduler;
    }
  • 方式二:方式一的本质改变了任务调度器默认使用的线程池,接下来这种是不改变调度器的默认线程池,而是把当前任务交给一个异步线程池去执行

    废话太多,直接上代码:

      @Scheduled(fixedRate = 1000*10,initialDelay = 1000*20)
    @Async("myThreadPoolTaskExecutor")
    //@Async
    public void scheduledTest02(){
    System.out.println(Thread.currentThread().getName()+"--->xxxxx--->"+Thread.currentThread().getId());
    }

    //自定义线程池
    @Bean(name = "myThreadPoolTaskExecutor")
    public TaskExecutor getMyThreadPoolTaskExecutor() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(20);
    taskExecutor.setMaxPoolSize(200);
    taskExecutor.setQueueCapacity(25);
    taskExecutor.setKeepAliveSeconds(200);
    taskExecutor.setThreadNamePrefix("Haina-ThreadPool-");
    // 线程池对拒绝任务(无线程可用)的处理策略,目前只支持AbortPolicy、CallerRunsPolicy;默认为后者
    taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    //调度器shutdown被调用时等待当前被调度的任务完成
    taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
    //等待时长
    taskExecutor.setAwaitTerminationSeconds(60);
    taskExecutor.initialize();
    return taskExecutor;
    }
    • 首先使用@EnableAsync 启用异步任务

    • 然后在定时任务的方法加上@Async即可,默认使用的线程池为SimpleAsyncTaskExecutor(该线程池默认来一个任务创建一个线程,就会不断创建大量线程,极有可能压爆服务器内存。当然它有自己的限流机制,这里就不多说了,有兴趣的自己翻翻源码~)

    • 项目中为了更好的控制线程的使用,我们可以自定义我们自己的线程池,使用方式@Async("myThreadPool")

  • 线程池的使用心得(后续有专门文章来探讨)

    • java中提供了ThreadPoolExecutor和ScheduledThreadPoolExecutor,对应与spring中的ThreadPoolTaskExecutor和ThreadPoolTaskScheduler,但是在原有的基础上增加了新的特性,在spring环境下更容易使用和控制。

    • 使用自定义的线程池能够避免一些默认线程池造成的内存溢出、阻塞等等问题,更贴合自己的服务特性

    • 使用自定义的线程池便于对项目中线程的管理、维护以及监控。

    • 即便在非spring环境下也不要使用java默认提供的那几种线程池,坑很多,阿里代码规约不说了吗,得相信大厂!!

三、动态定时任务的实现

问题:

  • 使用@Scheduled注解来完成设置定时任务,但是有时候我们往往需要对周期性的时间的设置会做一些改变,或者要动态的启停一个定时任务,那么这个时候使用此注解就不太方便了,原因在于这个注解中配置的cron表达式必须是常量,那么当我们修改定时参数的时候,就需要停止服务,重新部署。

    解决办法:

  • 方式一:实现SchedulingConfigurer接口,重写configureTasks方法,重新制定Trigger,核心方法就是addTriggerTask(Runnable task, Trigger trigger) ,不过需要注意的是,此种方式修改了配置值后,需要在下一次调度结束后,才会更新调度器,并不会在修改配置值时实时更新,实时更新需要在修改配置值时额外增加相关逻辑处理。

      @Configuration
    public class ScheduledConfig implements SchedulingConfigurer {

    @Autowired
    private TaskScheduler myThreadPoolTaskScheduler;

    @Override
    public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
    //scheduledTaskRegistrar.setScheduler(Executors.newScheduledThreadPool(5));
    scheduledTaskRegistrar.setTaskScheduler(myThreadPoolTaskScheduler);
    //可以实现动态调整定时任务的执行频率
    scheduledTaskRegistrar.addTriggerTask(
    //1.添加任务内容(Runnable)
    () -> System.out.println("cccccccccccccccc--->" + Thread.currentThread().getId()),
    //2.设置执行周期(Trigger)
    triggerContext -> {
    //2.1 从数据库动态获取执行周期
    String cron = "0/2 * * * * ? ";
    //2.2 合法性校验.
    // if (StringUtils.isEmpty(cron)) {
    // // Omitted Code ..
    // }
    //2.3 返回执行周期(Date)
    return new CronTrigger(cron).nextExecutionTime(triggerContext);
    }
    );
    }
    }
  • 方式二:使用threadPoolTaskScheduler类可实现动态添加删除功能,当然也可实现执行频率的调整

      首先,我们要认识下这个调度类,它其实是对javaScheduledThreadPoolExecutor的一个封装改进后的产物,主要改进有以下几点:
    1、提供默认配置,因为是ScheduledThreadPoolExecutor,所以只有poolSize这一个默认参数。
    2、支持自定义任务,通过传入Trigger参数。
    3、对任务出错处理进行优化,如果是重复性的任务,不抛出异常,通过日志记录下来,不影响下次运行,如果是只执行一次的任务,将异常往上抛。
    顺便说下ThreadPoolTaskExecutor相对于ThreadPoolExecutor的改进点:
    1、提供默认配置,原生的ThreadPoolExecutor的除了ThreadFactoryRejectedExecutionHandler其他没有默认配置
    2、实现AsyncListenableTaskExecutor接口,支持对FutureTask添加successfail的回调,任务成功或失败的时候回执行对应回调方法。
    3、因为是spring的工具类,所以抛出的RejectedExecutionException也会被转换为spring框架的TaskRejectedException异常(这个无所谓)
    4、提供默认ThreadFactory实现,直接通过参数重载配置

    扯了这么多,还是直接上代码

      @Component
    public class DynamicTimedTask {

    private static final Logger logger = LoggerFactory.getLogger(DynamicTimedTask.class);

    //利用创建好的调度类统一管理
    //@Autowired
    //@Qualifier("myThreadPoolTaskScheduler")
    //private ThreadPoolTaskScheduler myThreadPoolTaskScheduler;


    //接受任务的返回结果
    private ScheduledFuture<?> future;

    @Autowired
    private ThreadPoolTaskScheduler threadPoolTaskScheduler;

    //实例化一个线程池任务调度类,可以使用自定义的ThreadPoolTaskScheduler
    @Bean
    public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
    ThreadPoolTaskScheduler executor = new ThreadPoolTaskScheduler();
    return new ThreadPoolTaskScheduler();
    }


    /**
    * 启动定时任务
    * @return
    */

    public boolean startCron() {
    boolean flag = false;
    //从数据库动态获取执行周期
    String cron = "0/2 * * * * ? ";
    future = threadPoolTaskScheduler.schedule(new CheckModelFile(),cron);
    if (future!=null){
    flag = true;
    logger.info("定时check训练模型文件,任务启动成功!!!");
    }else {
    logger.info("定时check训练模型文件,任务启动失败!!!");
    }
    return flag;
    }

    /**
    * 停止定时任务
    * @return
    */

    public boolean stopCron() {
    boolean flag = false;
    if (future != null) {
    boolean cancel = future.cancel(true);
    if (cancel){
    flag = true;
    logger.info("定时check训练模型文件,任务停止成功!!!");
    }else {
    logger.info("定时check训练模型文件,任务停止失败!!!");
    }
    }else {
    flag = true;
    logger.info("定时check训练模型文件,任务已经停止!!!");
    }
    return flag;
    }


    class CheckModelFile implements Runnable{

    @Override
    public void run() {
    //编写你自己的业务逻辑
    System.out.print("模型文件检查完毕!!!")
    }
    }

    }

四、总结

  • 到此基于springtask下的定时任务的简单使用算是差不多了,其中不免有些错误的地方,或者理解有偏颇的地方欢迎大家提出来!

  • 基于分布式集群下的定时任务使用,后续有时间再继续!!!


---------------END----------------

后续的内容同样精彩

长按关注“IT实战联盟”哦




全部评论: 0

    我有话说:

    MySql实战:写一个简单存储过程,完成订单定时任务

    前言之前我们分享MySql性能优化、索引详解等内容,本文章主要是针对想要入门MySql存储过程读者,主要实现业务是订单库里面超过30分钟没有支付订单全部置为失效订单......

    精品推荐:全懂负载均衡

    随着互联网发展,业务流量越来越大并且业务逻辑也越来越复杂,单台机器性能问题以及单点问题凸显出来,因此需要多台机器来进行性能水平扩展以及避免单点故障。

    PowerJob —强大分布式任务调度与计算框架

    PowerJob让您轻松完成作业调度与繁杂任务分布式计算。

    JAVA实现附近范围内公交定位问题

    接上【前端实战:通过JS抓取城市所有站点与线路】获取附近定位信息

    微信小程序微商城(四):动态API实现商品详情页(上)

    1、实现商品详情页面布局(实现3个模块,头部商品图片轮播、商品价格和商品描述、商品详情展示) 2、根据用户点击不同商品请求API动态加载数据

    SpringBoot+zk+dubbo架构实践):本地部署zookeeper

    SpringBoot+zk+dubbo架构实践系列实现目标:自己动手搭建微服务架构

    【SpringCloud实战次开发使用Feign添加动态Header问题思考

    一个Spring Cloud Feign添加自定义Header坑,分享给大家

    「强烈推荐」是我过最接“地气”代码问题与重构实践

      写这个文章是因为前段时间确实因为公司业务开发太忙太紧,所有开发都处在于加班赶项目,并且加入新人较多造成系列代码不可控质量问题。 文章针对段时间代码出现各种各样问题

    「转载」微服务分布式架构,如何实现日志链路跟踪?

    背景 开发排查系统问题用得最多手段是查看系统日志,在分布式环境一般使用ELK来统一收集日志,但是在并发大时使用日志定位问题还是比较麻烦,我们来下面图     上图

    精品推荐:缓存架构之实战演练Elastic Job定时实现redis缓存预热、缓存更新

    缓存预热是系统上线后,将相关缓存数据直接加载到缓存系统。这样可以避免在用户请求时候,先查询数据库,然后再将数据缓存问题!用户直接查询事先被预热缓存数据!

    DDDplus 1.0.2 发布,轻量级业务台开发框架

    DDDplus 简介 套轻量级业务台开发框架,以DDD思想为本,致力于业务资产可沉淀可传承,全方位解决复杂业务场景扩展问题,实现台核心要素,赋能台建设。 融合台复杂生态协作方法论

    Node rabbitmq 入门

      消息中间件 消息队列中间件(Message Queue Middleware, 简称为 MQ)是指利用高效可靠消息传递机制进行与平台无关数据交流, 并基于数据通信来进行分布式系统

    Node&RabbitMQ系列二 延迟|死信队列

      前提 目前项目采用ts+eggjs结合方式,针对定时任务,采用schedule,随着业务增多,觉得缺点啥,可能是缺消息队列吧。上文章,针对rabbitmq基本语法进行

    JavaScript作业队列和微任务

    JavaScript作业队列和微任务 当Promises在ES6首次引入时,它们使编写异步代码工作变得更加容易。回调地狱被更简单构造所取代,该构造使开发人员可以更轻松地处理异步任务。理解诺言

    并发核心技术-幂等实现方案

    幂等性应该是合格程序员一个基因,在设计系统时,是首要考虑问题,尤其是在像支付宝,银行,互联网金融公司等涉及都是钱系统,既要高效,数据也要准确,所以不能出现多扣款,多打款等问题......

    Nginx服务器高性能优化--轻松实现10万并发访问量

    作者:章为忠学架构https://www.toutiao.com/i6804346550882402828 前面讲如何配置Nginx虚拟主机,如何配置服务日志等很多基础内容,大家可以去这里看看

    微信小程序电商实战-商品详情(上)

    下今天要实现小程序商品详情页吧!

    数据生成工具 ZenData 发布 1.5 版本,新增 CSV 和 Excel 输出格式,字段定义支持表达式

    随着DevOps日益流行,越来越多团队开始关注持续集成和持续交付。在这种大背景下,自动化测试越来越重要。那么问题来,如何能够实现大规模、工程化自动化测试呢?里面会涉及到诸多问题,比如

    微信小程序微商城():https框架搭建并实现导航功能

    本文将带领大家搭建https小程序框架,并实现动态获取数据展示效果!