プロジェクトにはスケジュールされたタスクを動的に追加できる機能が必要です。プロジェクトでは現在 xxl-job スケジュールされたタスクのスケジューリング システムを使用していますが、xxl-job 関数をある程度理解した後、次のことがわかりました。 xxl-job のプロジェクトへのスケジュールされたタスクの動的追加と動的削除に対するサポートはあまり優れていないため、スケジュールされたタスクの機能を手動で実装する必要があります
#二動的スケジュールされたタスクのスケジューリング1 テクノロジーの選択
Timer または
ScheduledExecutorService
public class MyTimerTask extends TimerTask { private String name; public MyTimerTask(String name){ this.name = name; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public void run() { //task Calendar instance = Calendar.getInstance(); System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(instance.getTime())); } } Timer timer = new Timer(); MyTimerTask timerTask = new MyTimerTask("NO.1"); //首次执行,在当前时间的1秒以后,之后每隔两秒钟执行一次 timer.schedule(timerTask,1000L,2000L);
//org.apache.commons.lang3.concurrent.BasicThreadFactory ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build()); executorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { //do something } },initialDelay,period, TimeUnit.HOURS);
多线程并行处理定时任务时,Timer运行多个TimeTask时,只要其中之一没有捕获抛出的异常,其它任务便会自动终止运行,使用ScheduledExecutorService则没有这个问题。
ScheduledExecutorService を選択する必要があります。ソース コードを見て、問題がある場合に
Timer が実行を終了する理由を確認してみましょう。
/** * The timer thread. */ private final TimerThread thread = new TimerThread(queue); public Timer() { this("Timer-" + serialNumber()); } public Timer(String name) { thread.setName(name); thread.start(); }
class TimerThread extends Thread { boolean newTasksMayBeScheduled = true; /** * 每一件一个任务都是一个quene */ private TaskQueue queue; TimerThread(TaskQueue queue) { this.queue = queue; } public void run() { try { mainLoop(); } finally { // Someone killed this Thread, behave as if Timer cancelled synchronized(queue) { newTasksMayBeScheduled = false; queue.clear(); // 清除所有任务信息 } } } /** * The main timer loop. (See class comment.) */ private void mainLoop() { while (true) { try { TimerTask task; boolean taskFired; synchronized(queue) { // Wait for queue to become non-empty while (queue.isEmpty() && newTasksMayBeScheduled) queue.wait(); if (queue.isEmpty()) break; // Queue is empty and will forever remain; die // Queue nonempty; look at first evt and do the right thing long currentTime, executionTime; task = queue.getMin(); synchronized(task.lock) { if (task.state == TimerTask.CANCELLED) { queue.removeMin(); continue; // No action required, poll queue again } currentTime = System.currentTimeMillis(); executionTime = task.nextExecutionTime; if (taskFired = (executionTime<=currentTime)) { if (task.period == 0) { // Non-repeating, remove queue.removeMin(); task.state = TimerTask.EXECUTED; } else { // Repeating task, reschedule queue.rescheduleMin( task.period<0 ? currentTime - task.period : executionTime + task.period); } } } if (!taskFired) // Task hasn't yet fired; wait queue.wait(executionTime - currentTime); } if (taskFired) // Task fired; run it, holding no locks task.run(); } catch(InterruptedException e) { } } } }
mainLoop() が実行され、その中に無限ループして時間を取得する
while (true) メソッドがあることがわかります。プログラム内のタスクオブジェクトに現在時刻と比較し、同じであれば実行しますが、エラーが報告されると最後に入力し、タスク情報を全てクリアします。
2 ScheduledThreadPoolExecutor の使用
上記から、ScheduledThreadPoolExecutor の使用は比較的簡単ですが、より洗練されたものを実現したいので、# # を選択します。 #TaskScheduler
実現するには <div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:java;">@Component
public class CronTaskRegistrar implements DisposableBean {
private final Map<Runnable, ScheduledTask> scheduledTasks = new ConcurrentHashMap<>(16);
@Autowired
private TaskScheduler taskScheduler;
public TaskScheduler getScheduler() {
return this.taskScheduler;
}
public void addCronTask(Runnable task, String cronExpression) {
addCronTask(new CronTask(task, cronExpression));
}
private void addCronTask(CronTask cronTask) {
if (cronTask != null) {
Runnable task = cronTask.getRunnable();
if (this.scheduledTasks.containsKey(task)) {
removeCronTask(task);
}
this.scheduledTasks.put(task, scheduleCronTask(cronTask));
}
}
public void removeCronTask(Runnable task) {
Set<Runnable> runnables = this.scheduledTasks.keySet();
Iterator it1 = runnables.iterator();
while (it1.hasNext()) {
SchedulingRunnable schedulingRunnable = (SchedulingRunnable) it1.next();
Long taskId = schedulingRunnable.getTaskId();
SchedulingRunnable cancelRunnable = (SchedulingRunnable) task;
if (taskId.equals(cancelRunnable.getTaskId())) {
ScheduledTask scheduledTask = this.scheduledTasks.remove(schedulingRunnable);
if (scheduledTask != null){
scheduledTask.cancel();
}
}
}
}
public ScheduledTask scheduleCronTask(CronTask cronTask) {
ScheduledTask scheduledTask = new ScheduledTask();
scheduledTask.future = this.taskScheduler.schedule(cronTask.getRunnable(), cronTask.getTrigger());
return scheduledTask;
}
@Override
public void destroy() throws Exception {
for (ScheduledTask task : this.scheduledTasks.values()) {
task.cancel();
}
this.scheduledTasks.clear();
}
}</pre><div class="contentsignin">ログイン後にコピー</div></div>
はこの関数実装のコア クラスですが、インターフェイス <div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:java;">public interface TaskScheduler {
/**
* Schedule the given {@link Runnable}, invoking it whenever the trigger
* indicates a next execution time.
* <p>Execution will end once the scheduler shuts down or the returned
* {@link ScheduledFuture} gets cancelled.
* @param task the Runnable to execute whenever the trigger fires
* @param trigger an implementation of the {@link Trigger} interface,
* e.g. a {@link org.springframework.scheduling.support.CronTrigger} object
* wrapping a cron expression
* @return a {@link ScheduledFuture} representing pending completion of the task,
* or {@code null} if the given Trigger object never fires (i.e. returns
* {@code null} from {@link Trigger#nextExecutionTime})
* @throws org.springframework.core.task.TaskRejectedException if the given task was not accepted
* for internal reasons (e.g. a pool overload handling policy or a pool shutdown in progress)
* @see org.springframework.scheduling.support.CronTrigger
*/
@Nullable
ScheduledFuture<?> schedule(Runnable task, Trigger trigger);</pre><div class="contentsignin">ログイン後にコピー</div></div>
前のコードからわかるように、このクラスをクラスに挿入しましたが、これはインターフェイスです。どの実装クラスであるかをどうやって知ることができますか? 以前は、このようなことが起こった場合、実装されたクラスを実行するためにクラスに @Primony または @Quality を追加する必要がありました。しかし、別の方法で実装されているため、注入にはマークがありません
@Configuration public class SchedulingConfig { @Bean public TaskScheduler taskScheduler() { ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); // 定时任务执行线程池核心线程数 taskScheduler.setPoolSize(4); taskScheduler.setRemoveOnCancelPolicy(true); taskScheduler.setThreadNamePrefix("TaskSchedulerThreadPool-"); return taskScheduler; } }
Spring の初期化時に Bean TaskScheduler が登録され、その実装が ThreadPoolTaskScheduler であることがわかります。オンライン情報 ThreadPoolTaskScheduler は、TaskScheduler のデフォルトの実装クラスです。実際には、そうではありません。まだ指定する必要があります。このように、実装を置き換える場合は、構成クラスを変更するだけで済み、非常に柔軟です。
これがよりエレガントな実装であると言われる理由は、そのコアも ScheduledThreadPoolExecutor を通じて実装されているためです
public ScheduledExecutorService getScheduledExecutor() throws IllegalStateException { Assert.state(this.scheduledExecutor != null, "ThreadPoolTaskScheduler not initialized"); return this.scheduledExecutor; }
マルチノード タスクの実行に関する 3 つの問題
· 最初のオプションは、スケジュールされたタスク関数を分離して個別にデプロイし、1 つのノードのみをデプロイすることです。2 番目のオプションは、redis setNx を使用して、同時に実行されるタスクは 1 つだけです。実行
私は 2 番目のオプションを選択して実行しました。もちろん、タスクが繰り返し実行されないようにするには、いくつかの方法があります。ここでは詳しく説明しません。これが私の実装です
public void executeTask(Long taskId) { if (!redisService.setIfAbsent(String.valueOf(taskId),"1",2L, TimeUnit.SECONDS)) { log.info("已有执行中定时发送短信任务,本次不执行!"); return; }
以上がSpringBootスケジュールタスク機能の実装方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。