Table of Contents
1 Background
二dynamic scheduled task scheduling
Three multi-node task execution issues
Home Java javaTutorial How to implement SpringBoot scheduled task function

How to implement SpringBoot scheduled task function

May 10, 2023 pm 04:16 PM
springboot

1 Background

The project needs a function that can dynamically add scheduled tasks. The project currently uses the xxl-job scheduled task scheduling system, but after some understanding of the xxl-job function , I found that xxl-job’s support for dynamically adding scheduled tasks and dynamically deleting scheduled tasks to the project is not that good, so I need to manually implement the function of a scheduled task

二dynamic scheduled task scheduling

1 Technology Selection

Timer or ScheduledExecutorService

Both of these can implement scheduled task scheduling, let’s take a look first Timer's scheduled task scheduling

  public class MyTimerTask extends TimerTask {
    private String name;
    public MyTimerTask(String name){
        this.name = name;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    @Override
    public void run() {
        //task
        Calendar instance = Calendar.getInstance();
        System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(instance.getTime()));
    }
}
Timer timer = new Timer();
MyTimerTask timerTask = new MyTimerTask("NO.1");
//首次执行,在当前时间的1秒以后,之后每隔两秒钟执行一次
timer.schedule(timerTask,1000L,2000L);
Copy after login

Let's take a look at the implementation of ScheduledThreadPoolExecutor

//org.apache.commons.lang3.concurrent.BasicThreadFactory
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
    new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());
executorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        //do something
    }
},initialDelay,period, TimeUnit.HOURS);
Copy after login

Both can implement scheduled tasks, so what are their differences? Using Alibaba p3c will give suggestions and differences

多线程并行处理定时任务时,Timer运行多个TimeTask时,只要其中之一没有捕获抛出的异常,其它任务便会自动终止运行,使用ScheduledExecutorService则没有这个问题。
Copy after login

From the suggestion point of view, you must choose ScheduledExecutorService. Let’s take a look at the source code to see why Timer will terminate execution if there is a problem.

/**
 * The timer thread.
 */
private final TimerThread thread = new TimerThread(queue);
public Timer() {
    this("Timer-" + serialNumber());
}
public Timer(String name) {
    thread.setName(name);
    thread.start();
}
Copy after login

New Object, we see that a thread is started, so what is this thread doing? Take a look at

class TimerThread extends Thread {
  boolean newTasksMayBeScheduled = true;
  /**
   * 每一件一个任务都是一个quene
   */
  private TaskQueue queue;
  TimerThread(TaskQueue queue) {
      this.queue = queue;
  }
  public void run() {
      try {
          mainLoop();
      } finally {
          // Someone killed this Thread, behave as if Timer cancelled
          synchronized(queue) {
              newTasksMayBeScheduled = false;
              queue.clear();  // 清除所有任务信息
          }
      }
  }
  /**
   * The main timer loop.  (See class comment.)
   */
  private void mainLoop() {
      while (true) {
          try {
              TimerTask task;
              boolean taskFired;
              synchronized(queue) {
                  // Wait for queue to become non-empty
                  while (queue.isEmpty() && newTasksMayBeScheduled)
                      queue.wait();
                  if (queue.isEmpty())
                      break; // Queue is empty and will forever remain; die
                  // Queue nonempty; look at first evt and do the right thing
                  long currentTime, executionTime;
                  task = queue.getMin();
                  synchronized(task.lock) {
                      if (task.state == TimerTask.CANCELLED) {
                          queue.removeMin();
                          continue;  // No action required, poll queue again
                      }
                      currentTime = System.currentTimeMillis();
                      executionTime = task.nextExecutionTime;
                      if (taskFired = (executionTime<=currentTime)) {
                          if (task.period == 0) { // Non-repeating, remove
                              queue.removeMin();
                              task.state = TimerTask.EXECUTED;
                          } else { // Repeating task, reschedule
                              queue.rescheduleMin(
                                task.period<0 ? currentTime   - task.period
                                              : executionTime + task.period);
                          }
                      }
                  }
                  if (!taskFired) // Task hasn&#39;t yet fired; wait
                      queue.wait(executionTime - currentTime);
              }
              if (taskFired)  // Task fired; run it, holding no locks
                  task.run();
          } catch(InterruptedException e) {
          }
      }
  }
}
Copy after login

We see that mainLoop() is executed, and inside it is the while (true) method that loops infinitely to obtain the time in the task object in the program Compare it with the current time, if it is the same, it will be executed. However, once an error is reported, it will enter finally and clear all task information.

At this time we have found the answer. After the timer is instantiated, it starts a thread and performs tasks in an uninterrupted loop matching. It is single-threaded. Once an error is reported, the thread is terminated. Therefore, subsequent tasks will not be executed, and ScheduledThreadPoolExecutor is executed by multiple threads. Even if one of the tasks reports an error, it will not affect the execution of other threads.

2 Using ScheduledThreadPoolExecutor

From the above, using ScheduledThreadPoolExecutor is relatively simple, but we want to achieve something more elegant, so chooseTaskScheduler To implement

@Component
public class CronTaskRegistrar implements DisposableBean {
    private final Map<Runnable, ScheduledTask> scheduledTasks = new ConcurrentHashMap<>(16);
    @Autowired
    private TaskScheduler taskScheduler;
    public TaskScheduler getScheduler() {
        return this.taskScheduler;
    }
    public void addCronTask(Runnable task, String cronExpression) {
        addCronTask(new CronTask(task, cronExpression));
    }
    private void addCronTask(CronTask cronTask) {
        if (cronTask != null) {
            Runnable task = cronTask.getRunnable();
            if (this.scheduledTasks.containsKey(task)) {
                removeCronTask(task);
            }
            this.scheduledTasks.put(task, scheduleCronTask(cronTask));
        }
    }
    public void removeCronTask(Runnable task) {
        Set<Runnable> runnables = this.scheduledTasks.keySet();
        Iterator it1 = runnables.iterator();
        while (it1.hasNext()) {
            SchedulingRunnable schedulingRunnable = (SchedulingRunnable) it1.next();
            Long taskId = schedulingRunnable.getTaskId();
            SchedulingRunnable cancelRunnable = (SchedulingRunnable) task;
            if (taskId.equals(cancelRunnable.getTaskId())) {
                ScheduledTask scheduledTask = this.scheduledTasks.remove(schedulingRunnable);
                if (scheduledTask != null){
                    scheduledTask.cancel();
                }
            }
        }
    }
    public ScheduledTask scheduleCronTask(CronTask cronTask) {
        ScheduledTask scheduledTask = new ScheduledTask();
        scheduledTask.future = this.taskScheduler.schedule(cronTask.getRunnable(), cronTask.getTrigger());
        return scheduledTask;
    }
    @Override
    public void destroy() throws Exception {
        for (ScheduledTask task : this.scheduledTasks.values()) {
            task.cancel();
        }
        this.scheduledTasks.clear();
    }
}
Copy after login

TaskScheduler is the core class for this function implementation, but it is an interface

public interface TaskScheduler {
   /**
    * Schedule the given {@link Runnable}, invoking it whenever the trigger
    * indicates a next execution time.
    * <p>Execution will end once the scheduler shuts down or the returned
    * {@link ScheduledFuture} gets cancelled.
    * @param task the Runnable to execute whenever the trigger fires
    * @param trigger an implementation of the {@link Trigger} interface,
    * e.g. a {@link org.springframework.scheduling.support.CronTrigger} object
    * wrapping a cron expression
    * @return a {@link ScheduledFuture} representing pending completion of the task,
    * or {@code null} if the given Trigger object never fires (i.e. returns
    * {@code null} from {@link Trigger#nextExecutionTime})
    * @throws org.springframework.core.task.TaskRejectedException if the given task was not accepted
    * for internal reasons (e.g. a pool overload handling policy or a pool shutdown in progress)
    * @see org.springframework.scheduling.support.CronTrigger
    */
   @Nullable
   ScheduledFuture<?> schedule(Runnable task, Trigger trigger);
Copy after login

As you can see from the previous code, We injected this class into the class, but it is an interface. How do we know which implementation class it is? In the past, when this happened, we had to add @Primany or @Quality to the class to execute the implemented class, but we saw that my There is no mark on the injection because it is implemented in another way

@Configuration
public class SchedulingConfig {
    @Bean
    public TaskScheduler taskScheduler() {
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        // 定时任务执行线程池核心线程数
        taskScheduler.setPoolSize(4);
        taskScheduler.setRemoveOnCancelPolicy(true);
        taskScheduler.setThreadNamePrefix("TaskSchedulerThreadPool-");
        return taskScheduler;
    }
}
Copy after login

The Bean TaskScheduler is registered during spring initialization, and we can see that its implementation is ThreadPoolTaskScheduler. Some people say in online information ThreadPoolTaskScheduler is the default implementation class of TaskScheduler. In fact, it is not. We still need to specify it. In this way, when we want to replace the implementation, we only need to modify the configuration class, which is very flexible.

Why is it said to be a more elegant implementation, because its core is also implemented through ScheduledThreadPoolExecutor

public ScheduledExecutorService getScheduledExecutor() throws IllegalStateException {
   Assert.state(this.scheduledExecutor != null, "ThreadPoolTaskScheduler not initialized");
   return this.scheduledExecutor;
}
Copy after login

Three multi-node task execution issues

This time During the implementation process, I did not choose xxl-job for implementation, but used TaskScheduler for implementation. This also caused a problem. xxl-job is a distributed program scheduling system, which is used when applications want to execute scheduled tasks. When xxl-job is used, no matter how many nodes are deployed in the application, xxl-job will only select one of the nodes as the node for scheduled task execution, so that scheduled tasks will not be executed simultaneously on different nodes, causing repeated execution problems. Instead, use To implement TaskScheduler, we must consider the issue of repeated execution on multiple nodes. Of course, since there is a problem, there is a solution

· The first option is to separate the scheduled task function and deploy it separately, and only deploy one node. The second option uses redis setNx to ensure that only one task is running at the same time. Execution

I chose the second option to execute. Of course, there are some ways to ensure that it is not executed repeatedly. I won’t go into details here. Here is my implementation

public void executeTask(Long taskId) {
    if (!redisService.setIfAbsent(String.valueOf(taskId),"1",2L, TimeUnit.SECONDS)) {
        log.info("已有执行中定时发送短信任务,本次不执行!");
        return;
    }
Copy after login

The above is the detailed content of How to implement SpringBoot scheduled task function. For more information, please follow other related articles on the PHP Chinese website!

Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn

Hot AI Tools

Undresser.AI Undress

Undresser.AI Undress

AI-powered app for creating realistic nude photos

AI Clothes Remover

AI Clothes Remover

Online AI tool for removing clothes from photos.

Undress AI Tool

Undress AI Tool

Undress images for free

Clothoff.io

Clothoff.io

AI clothes remover

AI Hentai Generator

AI Hentai Generator

Generate AI Hentai for free.

Hot Article

R.E.P.O. Energy Crystals Explained and What They Do (Yellow Crystal)
2 weeks ago By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. Best Graphic Settings
2 weeks ago By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. How to Fix Audio if You Can't Hear Anyone
2 weeks ago By 尊渡假赌尊渡假赌尊渡假赌

Hot Tools

Notepad++7.3.1

Notepad++7.3.1

Easy-to-use and free code editor

SublimeText3 Chinese version

SublimeText3 Chinese version

Chinese version, very easy to use

Zend Studio 13.0.1

Zend Studio 13.0.1

Powerful PHP integrated development environment

Dreamweaver CS6

Dreamweaver CS6

Visual web development tools

SublimeText3 Mac version

SublimeText3 Mac version

God-level code editing software (SublimeText3)

How Springboot integrates Jasypt to implement configuration file encryption How Springboot integrates Jasypt to implement configuration file encryption Jun 01, 2023 am 08:55 AM

Introduction to Jasypt Jasypt is a java library that allows a developer to add basic encryption functionality to his/her project with minimal effort and does not require a deep understanding of how encryption works. High security for one-way and two-way encryption. , standards-based encryption technology. Encrypt passwords, text, numbers, binaries... Suitable for integration into Spring-based applications, open API, for use with any JCE provider... Add the following dependency: com.github.ulisesbocchiojasypt-spring-boot-starter2. 1.1Jasypt benefits protect our system security. Even if the code is leaked, the data source can be guaranteed.

How SpringBoot integrates Redisson to implement delay queue How SpringBoot integrates Redisson to implement delay queue May 30, 2023 pm 02:40 PM

Usage scenario 1. The order was placed successfully but the payment was not made within 30 minutes. The payment timed out and the order was automatically canceled. 2. The order was signed and no evaluation was conducted for 7 days after signing. If the order times out and is not evaluated, the system defaults to a positive rating. 3. The order is placed successfully. If the merchant does not receive the order for 5 minutes, the order is cancelled. 4. The delivery times out, and push SMS reminder... For scenarios with long delays and low real-time performance, we can Use task scheduling to perform regular polling processing. For example: xxl-job Today we will pick

How to use Redis to implement distributed locks in SpringBoot How to use Redis to implement distributed locks in SpringBoot Jun 03, 2023 am 08:16 AM

1. Redis implements distributed lock principle and why distributed locks are needed. Before talking about distributed locks, it is necessary to explain why distributed locks are needed. The opposite of distributed locks is stand-alone locks. When we write multi-threaded programs, we avoid data problems caused by operating a shared variable at the same time. We usually use a lock to mutually exclude the shared variables to ensure the correctness of the shared variables. Its scope of use is in the same process. If there are multiple processes that need to operate a shared resource at the same time, how can they be mutually exclusive? Today's business applications are usually microservice architecture, which also means that one application will deploy multiple processes. If multiple processes need to modify the same row of records in MySQL, in order to avoid dirty data caused by out-of-order operations, distribution needs to be introduced at this time. The style is locked. Want to achieve points

How to solve the problem that springboot cannot access the file after reading it into a jar package How to solve the problem that springboot cannot access the file after reading it into a jar package Jun 03, 2023 pm 04:38 PM

Springboot reads the file, but cannot access the latest development after packaging it into a jar package. There is a situation where springboot cannot read the file after packaging it into a jar package. The reason is that after packaging, the virtual path of the file is invalid and can only be accessed through the stream. Read. The file is under resources publicvoidtest(){Listnames=newArrayList();InputStreamReaderread=null;try{ClassPathResourceresource=newClassPathResource("name.txt");Input

Comparison and difference analysis between SpringBoot and SpringMVC Comparison and difference analysis between SpringBoot and SpringMVC Dec 29, 2023 am 11:02 AM

SpringBoot and SpringMVC are both commonly used frameworks in Java development, but there are some obvious differences between them. This article will explore the features and uses of these two frameworks and compare their differences. First, let's learn about SpringBoot. SpringBoot was developed by the Pivotal team to simplify the creation and deployment of applications based on the Spring framework. It provides a fast, lightweight way to build stand-alone, executable

How SpringBoot customizes Redis to implement cache serialization How SpringBoot customizes Redis to implement cache serialization Jun 03, 2023 am 11:32 AM

1. Customize RedisTemplate1.1, RedisAPI default serialization mechanism. The API-based Redis cache implementation uses the RedisTemplate template for data caching operations. Here, open the RedisTemplate class and view the source code information of the class. publicclassRedisTemplateextendsRedisAccessorimplementsRedisOperations, BeanClassLoaderAware{//Declare key, Various serialization methods of value, the initial value is empty @NullableprivateRedisSe

How to implement Springboot+Mybatis-plus without using SQL statements to add multiple tables How to implement Springboot+Mybatis-plus without using SQL statements to add multiple tables Jun 02, 2023 am 11:07 AM

When Springboot+Mybatis-plus does not use SQL statements to perform multi-table adding operations, the problems I encountered are decomposed by simulating thinking in the test environment: Create a BrandDTO object with parameters to simulate passing parameters to the background. We all know that it is extremely difficult to perform multi-table operations in Mybatis-plus. If you do not use tools such as Mybatis-plus-join, you can only configure the corresponding Mapper.xml file and configure The smelly and long ResultMap, and then write the corresponding sql statement. Although this method seems cumbersome, it is highly flexible and allows us to

How to get the value in application.yml in springboot How to get the value in application.yml in springboot Jun 03, 2023 pm 06:43 PM

In projects, some configuration information is often needed. This information may have different configurations in the test environment and the production environment, and may need to be modified later based on actual business conditions. We cannot hard-code these configurations in the code. It is best to write them in the configuration file. For example, you can write this information in the application.yml file. So, how to get or use this address in the code? There are 2 methods. Method 1: We can get the value corresponding to the key in the configuration file (application.yml) through the ${key} annotated with @Value. This method is suitable for situations where there are relatively few microservices. Method 2: In actual projects, When business is complicated, logic

See all articles