ホームページ > Java > &#&チュートリアル > SpringBootスケジュールタスク機能の実装方法

SpringBootスケジュールタスク機能の実装方法

WBOY
リリース: 2023-05-10 16:16:13
転載
1425 人が閲覧しました

1 背景

プロジェクトにはスケジュールされたタスクを動的に追加できる機能が必要です。プロジェクトでは現在 xxl-job スケジュールされたタスクのスケジューリング システムを使用していますが、xxl-job 関数をある程度理解した後、次のことがわかりました。 xxl-job のプロジェクトへのスケジュールされたタスクの動的追加と動的削除に対するサポートはあまり優れていないため、スケジュールされたタスクの機能を手動で実装する必要があります

#二動的スケジュールされたタスクのスケジューリング

1 テクノロジーの選択

Timer または ScheduledExecutorService

どちらもスケジュールされたタスクのスケジューリングを実装できます。見てみましょう。最初のタイマーのスケジュールされたタスクのスケジューリング

  public class MyTimerTask extends TimerTask {
    private String name;
    public MyTimerTask(String name){
        this.name = name;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    @Override
    public void run() {
        //task
        Calendar instance = Calendar.getInstance();
        System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(instance.getTime()));
    }
}
Timer timer = new Timer();
MyTimerTask timerTask = new MyTimerTask("NO.1");
//首次执行,在当前时间的1秒以后,之后每隔两秒钟执行一次
timer.schedule(timerTask,1000L,2000L);
ログイン後にコピー

ScheduledThreadPoolExecutorの実装を見てみましょう

//org.apache.commons.lang3.concurrent.BasicThreadFactory
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
    new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());
executorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        //do something
    }
},initialDelay,period, TimeUnit.HOURS);
ログイン後にコピー

どちらもスケジュールされたタスクを実装できますが、その違いは何ですか? Alibaba p3cを使用すると、提案と違いが得られます

多线程并行处理定时任务时,Timer运行多个TimeTask时,只要其中之一没有捕获抛出的异常,其它任务便会自动终止运行,使用ScheduledExecutorService则没有这个问题。
ログイン後にコピー

提案の観点からは、

ScheduledExecutorService を選択する必要があります。ソース コードを見て、問題がある場合に Timer が実行を終了する理由を確認してみましょう。

/**
 * The timer thread.
 */
private final TimerThread thread = new TimerThread(queue);
public Timer() {
    this("Timer-" + serialNumber());
}
public Timer(String name) {
    thread.setName(name);
    thread.start();
}
ログイン後にコピー

新しいオブジェクト、スレッドが開始されていることがわかります。では、このスレッドは何をしているのでしょうか?

class TimerThread extends Thread {
  boolean newTasksMayBeScheduled = true;
  /**
   * 每一件一个任务都是一个quene
   */
  private TaskQueue queue;
  TimerThread(TaskQueue queue) {
      this.queue = queue;
  }
  public void run() {
      try {
          mainLoop();
      } finally {
          // Someone killed this Thread, behave as if Timer cancelled
          synchronized(queue) {
              newTasksMayBeScheduled = false;
              queue.clear();  // 清除所有任务信息
          }
      }
  }
  /**
   * The main timer loop.  (See class comment.)
   */
  private void mainLoop() {
      while (true) {
          try {
              TimerTask task;
              boolean taskFired;
              synchronized(queue) {
                  // Wait for queue to become non-empty
                  while (queue.isEmpty() && newTasksMayBeScheduled)
                      queue.wait();
                  if (queue.isEmpty())
                      break; // Queue is empty and will forever remain; die
                  // Queue nonempty; look at first evt and do the right thing
                  long currentTime, executionTime;
                  task = queue.getMin();
                  synchronized(task.lock) {
                      if (task.state == TimerTask.CANCELLED) {
                          queue.removeMin();
                          continue;  // No action required, poll queue again
                      }
                      currentTime = System.currentTimeMillis();
                      executionTime = task.nextExecutionTime;
                      if (taskFired = (executionTime<=currentTime)) {
                          if (task.period == 0) { // Non-repeating, remove
                              queue.removeMin();
                              task.state = TimerTask.EXECUTED;
                          } else { // Repeating task, reschedule
                              queue.rescheduleMin(
                                task.period<0 ? currentTime   - task.period
                                              : executionTime + task.period);
                          }
                      }
                  }
                  if (!taskFired) // Task hasn&#39;t yet fired; wait
                      queue.wait(executionTime - currentTime);
              }
              if (taskFired)  // Task fired; run it, holding no locks
                  task.run();
          } catch(InterruptedException e) {
          }
      }
  }
}
ログイン後にコピー

を見てみましょう。

mainLoop() が実行され、その中に無限ループして時間を取得する while (true) メソッドがあることがわかります。プログラム内のタスクオブジェクトに現在時刻と比較し、同じであれば実行しますが、エラーが報告されると最後に入力し、タスク情報を全てクリアします。

現時点では、答えが見つかりました。タイマーがインスタンス化された後、スレッドが開始され、中断のないループ一致でタスクが実行されます。シングルスレッドです。エラーが報告されると、スレッドは終了します。したがって、後続のタスクは実行されず、ScheduledThreadPoolExecutor は複数のスレッドで実行され、いずれかのタスクがエラーを報告しても、他のスレッドの実行には影響しません。

2 ScheduledThreadPoolExecutor の使用

上記から、

ScheduledThreadPoolExecutor の使用は比較的簡単ですが、より洗練されたものを実現したいので、# # を選択します。 #TaskScheduler実現するには <div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:java;">@Component public class CronTaskRegistrar implements DisposableBean { private final Map&lt;Runnable, ScheduledTask&gt; scheduledTasks = new ConcurrentHashMap&lt;&gt;(16); @Autowired private TaskScheduler taskScheduler; public TaskScheduler getScheduler() { return this.taskScheduler; } public void addCronTask(Runnable task, String cronExpression) { addCronTask(new CronTask(task, cronExpression)); } private void addCronTask(CronTask cronTask) { if (cronTask != null) { Runnable task = cronTask.getRunnable(); if (this.scheduledTasks.containsKey(task)) { removeCronTask(task); } this.scheduledTasks.put(task, scheduleCronTask(cronTask)); } } public void removeCronTask(Runnable task) { Set&lt;Runnable&gt; runnables = this.scheduledTasks.keySet(); Iterator it1 = runnables.iterator(); while (it1.hasNext()) { SchedulingRunnable schedulingRunnable = (SchedulingRunnable) it1.next(); Long taskId = schedulingRunnable.getTaskId(); SchedulingRunnable cancelRunnable = (SchedulingRunnable) task; if (taskId.equals(cancelRunnable.getTaskId())) { ScheduledTask scheduledTask = this.scheduledTasks.remove(schedulingRunnable); if (scheduledTask != null){ scheduledTask.cancel(); } } } } public ScheduledTask scheduleCronTask(CronTask cronTask) { ScheduledTask scheduledTask = new ScheduledTask(); scheduledTask.future = this.taskScheduler.schedule(cronTask.getRunnable(), cronTask.getTrigger()); return scheduledTask; } @Override public void destroy() throws Exception { for (ScheduledTask task : this.scheduledTasks.values()) { task.cancel(); } this.scheduledTasks.clear(); } }</pre><div class="contentsignin">ログイン後にコピー</div></div>

TaskScheduler

はこの関数実装のコア クラスですが、インターフェイス <div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:java;">public interface TaskScheduler { /** * Schedule the given {@link Runnable}, invoking it whenever the trigger * indicates a next execution time. * &lt;p&gt;Execution will end once the scheduler shuts down or the returned * {@link ScheduledFuture} gets cancelled. * @param task the Runnable to execute whenever the trigger fires * @param trigger an implementation of the {@link Trigger} interface, * e.g. a {@link org.springframework.scheduling.support.CronTrigger} object * wrapping a cron expression * @return a {@link ScheduledFuture} representing pending completion of the task, * or {@code null} if the given Trigger object never fires (i.e. returns * {@code null} from {@link Trigger#nextExecutionTime}) * @throws org.springframework.core.task.TaskRejectedException if the given task was not accepted * for internal reasons (e.g. a pool overload handling policy or a pool shutdown in progress) * @see org.springframework.scheduling.support.CronTrigger */ @Nullable ScheduledFuture&lt;?&gt; schedule(Runnable task, Trigger trigger);</pre><div class="contentsignin">ログイン後にコピー</div></div> 前のコードからわかるように、このクラスをクラスに挿入しましたが、これはインターフェイスです。どの実装クラスであるかをどうやって知ることができますか? 以前は、このようなことが起こった場合、実装されたクラスを実行するためにクラスに @Primony または @Quality を追加する必要がありました。しかし、別の方法で実装されているため、注入にはマークがありません

@Configuration
public class SchedulingConfig {
    @Bean
    public TaskScheduler taskScheduler() {
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        // 定时任务执行线程池核心线程数
        taskScheduler.setPoolSize(4);
        taskScheduler.setRemoveOnCancelPolicy(true);
        taskScheduler.setThreadNamePrefix("TaskSchedulerThreadPool-");
        return taskScheduler;
    }
}
ログイン後にコピー

Spring の初期化時に Bean TaskScheduler が登録され、その実装が ThreadPoolTask​​Scheduler であることがわかります。オンライン情報 ThreadPoolTask​​Scheduler は、TaskScheduler のデフォルトの実装クラスです。実際には、そうではありません。まだ指定する必要があります。このように、実装を置き換える場合は、構成クラスを変更するだけで済み、非常に柔軟です。

これがよりエレガントな実装であると言われる理由は、そのコアも ScheduledThreadPoolExecutor を通じて実装されているためです

public ScheduledExecutorService getScheduledExecutor() throws IllegalStateException {
   Assert.state(this.scheduledExecutor != null, "ThreadPoolTaskScheduler not initialized");
   return this.scheduledExecutor;
}
ログイン後にコピー

マルチノード タスクの実行に関する 3 つの問題

今回は実装中にxxl-job は分散プログラム スケジューリング システムであり、アプリケーションがスケジュールされたタスクを実行するときに使用されます。アプリケーションにデプロイされているノードの数に関係なく、xxl-job はスケジュールされたタスクの実行ノードとしてノードの 1 つだけを選択するため、スケジュールされたタスクが異なるノードで同時に実行されず、繰り返し実行の問題が発生することはありません。 TaskScheduler を実装するには、複数のノードで繰り返し実行される問題を考慮する必要があります。もちろん、問題があるので解決策はあります。

· 最初のオプションは、スケジュールされたタスク関数を分離して個別にデプロイし、1 つのノードのみをデプロイすることです。2 番目のオプションは、redis setNx を使用して、同時に実行されるタスクは 1 つだけです。実行

私は 2 番目のオプションを選択して実行しました。もちろん、タスクが繰り返し実行されないようにするには、いくつかの方法があります。ここでは詳しく説明しません。これが私の実装です

public void executeTask(Long taskId) {
    if (!redisService.setIfAbsent(String.valueOf(taskId),"1",2L, TimeUnit.SECONDS)) {
        log.info("已有执行中定时发送短信任务,本次不执行!");
        return;
    }
ログイン後にコピー

以上がSpringBootスケジュールタスク機能の実装方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

関連ラベル:
ソース:yisu.com
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート