Es gibt zwei Möglichkeiten, geplante Aufgaben dynamisch zu verwalten:
Methode 1: Web-Front-End-Konfiguration Trigger-Trigger (verbunden mit Cron), ThreadPoolTaskScheduler-Klasse erstellt einen Scheduler-Modus, um Planungsaufgaben dynamisch zu verwalten.
Methode 2: Basierend auf Die dynamische Verwaltung der erstellten Schedule-Planungsaufgabe besteht darin, den Schedule-Zeitplan mit der @Scheduled-Annotation der Komponentenklasse zu deklarieren und ihn einmal zu initialisieren, bevor das Programm gestartet wird, z. B.:
@Component public class TestTask { private DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); @Scheduled(cron = "0/2 * * * * ?") public void robReceiveExpireTask() { System.out.println(df.format(LocalDateTime.now()) + "测试测试"); } }
Defekt: Derzeit ist dies nicht möglich zum Hinzufügen von Zeitplan und Stopp, Start, Zurücksetzen und anderen Verwaltungsfunktionen während des Betriebs.
Die Architektur ist SpringBoot + Spring + mybatis-plus
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>merak-hyper-automation-boot</artifactId> <groupId>com.merak.automation</groupId> <version>1.0.0</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>automation-quartz</artifactId> <packaging>jar</packaging> <repositories> <repository> <id>aliyun</id> <name>aliyun Repository</name> <url>http://maven.aliyun.com/nexus/content/groups/public</url> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories> <dependencies> <!-- Spring框架基本的核心工具 --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> </dependency> <!-- SpringWeb模块 --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-web</artifactId> </dependency> <!-- mysql --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <!-- druid数据连接池 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> <dependency> <groupId>jakarta.validation</groupId> <artifactId>jakarta.validation-api</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> </dependency> <!--引入quartz定时框架--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-quartz</artifactId> <version>2.2.5.RELEASE</version> </dependency> </dependencies> <build> <plugins> <!-- 打包跳过测试 --> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
File/application.yml das Ressourcenverzeichnis:
spring:
profiles:
active: File/application-dev.yml im dev
resources-Verzeichnis:
server:
Port: 12105
Servlet:
Kontextpfad: /automation -quartzManagement:
Endpoints:
Web:
Exposure:
include: '*'# Federkonfiguration
spring:
resources:
static-locations: classpath:/static/,classpath:/templates/
mvc :
throw- Exception-if-no-handler-found: true
static-path-pattern: /**
application:
name: automation-workflow
main:
allow-bean-definition-overriding: true
# File Upload
Servlet:
multipart:
#Einzelne Dateigröße
maximale Dateigröße: 2000 MB
#Legen Sie die gesamte hochgeladene Dateigröße fest, maximale Anforderungsgröße: 4000 MB
#Json-Zeitstempel-Vereinheitlichungskonvertierung
jackson:
Datumsformat: yyyy -MM -dd HH:mm:ss
Zeitzone: GMT+8
aop:
Proxy-Zielklasse: true
Autoconfigure:
Ausschluss: com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure
Datenquelle:
dynamisch :
druid:
# Globale Druidenparameter, die meisten Werte stimmen mit den Standardwerten überein. (Die derzeit unterstützten Parameter sind wie folgt. Bitte setzen Sie sie nicht zufällig, wenn Sie die Bedeutung nicht kennen.) Zeit zwischen EvictionRunsMillis: 60000 ' ' s – poolPreparedStatements: true
maxPoolPreparedStatementPerConnectionSize: 20
# Konfigurieren Sie Filter für die Überwachung und das Abfangen von Statistiken. Nach dem Entfernen der Überwachungsschnittstelle kann SQL nicht gezählt werden.#mybatis plus 设置
mybatis-plus:
mapper-locations: classpath*:com/merak/hyper/automation/persist/**/xml/*Mapper.xml
global-config:
# 关闭MP3.0自带的banner
banner: false
db-config:
id-type: ID_WORKER_STR
# 默认数据库表下划线命名
table-underline: true
configuration:
log-impl: org.apache.ibatis.logging.slf4j.Slf4jImpl
# log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
logging:
level:
com.merar.hyper: debug
com.merak.hyper.automation.persist.**.mapper: debug
org.springframework: warn
启动MerakQuartzApplication类
package com.merak.hyper.automation; import org.mybatis.spring.annotation.MapperScan; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; /** * @author chenjun * @version 1.0 * @ClassName: MerakQuartzApplication * @description: 工单任务调度 * @date 2022/9/22 10:30 */ @EnableScheduling @EnableAsync @MapperScan(basePackages = {"com.merak.hyper.automation.persist.**.mapper"}) @SpringBootApplication(scanBasePackages = {"com.merak.hyper.automation.**"}, exclude = {SecurityAutoConfiguration.class}) public class MerakQuartzApplication { public static final Logger log = LoggerFactory.getLogger(MerakQuartzApplication.class); public static void main(String[] args) { SpringApplication.run(MerakQuartzApplication.class, args); } private int taskSchedulerCorePoolSize = 15; private int awaitTerminationSeconds = 60; private String threadNamePrefix = "taskExecutor-"; /** * @description: 实例化ThreadPoolTaskScheduler对象,用于创建ScheduledFuture<?> scheduledFuture */ @Bean public ThreadPoolTaskScheduler threadPoolTaskScheduler() { ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); taskScheduler.setPoolSize(taskSchedulerCorePoolSize); taskScheduler.setThreadNamePrefix(threadNamePrefix); taskScheduler.setWaitForTasksToCompleteOnShutdown(false); taskScheduler.setAwaitTerminationSeconds(awaitTerminationSeconds); /**需要实例化线程*/ taskScheduler.initialize(); // isinitialized = true; log.info("初始化ThreadPoolTaskScheduler ThreadNamePrefix=" + threadNamePrefix + ",PoolSize=" + taskSchedulerCorePoolSize + ",awaitTerminationSeconds=" + awaitTerminationSeconds); return taskScheduler; } /** * @description: 实例化ThreadPoolTaskExecutor对象,管理asyncTask启动的线程,应用类为 ScheduledHelper */ @Bean("asyncTaskExecutor") public Executor taskExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(5); taskExecutor.setMaxPoolSize(50); taskExecutor.setQueueCapacity(200); taskExecutor.setKeepAliveSeconds(60); taskExecutor.setThreadNamePrefix("asyncTaskExecutor-"); taskExecutor.setWaitForTasksToCompleteOnShutdown(true); taskExecutor.setAwaitTerminationSeconds(60); //修改拒绝策略为使用当前线程执行 taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //初始化线程池 taskExecutor.initialize(); return taskExecutor; } }
一、启动时项目启动时,加载任务关联的触发器,并全量执行流程。
initLineRunner类:
package com.merak.hyper.automation.Scheduling; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.merak.hyper.automation.persist.entity.AutoTriggerInfo; import com.merak.hyper.automation.persist.entity.BusWorkflow; import com.merak.hyper.automation.persist.service.IAutoTriggerInfoService; import com.merak.hyper.automation.persist.service.IBusWorkflowService; import com.merak.hyper.automation.util.CommonUtil; import com.merak.hyper.automation.util.ScheduleUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Iterator; import java.util.List; import java.util.Map; /** * 项目启动时,加载数字员工关联的触发器,并全量执行 * @Date: 2020/12/25:16:00 **/ @Component @Order(1) public class initLineRunner implements CommandLineRunner { public static final Logger log = LoggerFactory.getLogger(initLineRunner.class); private DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); @Autowired private TaskService taskService; @Autowired private IAutoTriggerInfoService triggerInfoService; @Autowired private IBusWorkflowService workflowService; @Override public void run(String... args) { log.info("项目启动:加载数字员工关联的触发器信息并全量执行," + df.format(LocalDateTime.now())); QueryWrapper<BusWorkflow> wrapper = new QueryWrapper<>(); wrapper.eq("wf_type", "3");//3:云托管 wrapper.eq("wf_state", "1"); List<BusWorkflow> busWorkflows = workflowService.list(wrapper); List<AutoTriggerInfo> triggerInfos = triggerInfoService.list(); if( 0 == busWorkflows.size() || 0 == triggerInfos.size() ){ log.info("数字员工关联的触发器信息不正确,员工记录数:"+busWorkflows.size()+",触发器记录数:"+triggerInfos.size()); } else{ //数字员工关联的触发器信息 Map<String,AutoTriggerInfo> loadWfidAndTriggerInfo = CommonUtil.loadWfidAndTriggerInfo(busWorkflows,triggerInfos); Iterator<Map.Entry<String, AutoTriggerInfo>> entries = loadWfidAndTriggerInfo.entrySet().iterator(); while (entries.hasNext()) { Map.Entry<String, AutoTriggerInfo> entry = entries.next(); String wfId = entry.getKey(); BusWorkflow workflow = busWorkflows.stream().filter( t -> wfId.equals(t.getWfId()) ).findAny().orElse(null); if( null != workflow ){ ScheduleUtil.start(new ScheduleTask(wfId,String.valueOf(workflow.getWfCreateuserId()),taskService), entry.getValue()); } } log.info("数字员工关联的触发器信息全量执行完成,数字员工定时个数:"+loadWfidAndTriggerInfo.size()+","+df.format(LocalDateTime.now())); } } } 核心代码: ```java ScheduleUtil.start(new ScheduleTask(wfId,String.valueOf(workflow.getWfCreateuserId()),taskService), entry.getValue());
Scheduler管理工具类:启动、取消、修改等管理
package com.merak.hyper.automation.util; import com.merak.hyper.automation.Scheduling.ScheduleTask; import com.merak.hyper.automation.persist.entity.AutoTriggerInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.scheduling.support.CronTrigger; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ScheduledFuture; /** * @version 1.0 * @ClassName: ScheduleUtil * @description: Scheduler管理工具类:启动、取消、修改等管理 */ public class ScheduleUtil { public static final Logger log = LoggerFactory.getLogger(ScheduleUtil.class); private static ThreadPoolTaskScheduler threadPoolTaskScheduler = SpringContextUtils.getBean(ThreadPoolTaskScheduler.class); //存储[数字员工wfI,dScheduledFuture]集合 private static Map<String, ScheduledFuture<?>> scheduledFutureMap = new HashMap<>(); /** * 启动 * * @param scheduleTask 定时任务 * @param triggerInfo */ public static boolean start(ScheduleTask scheduleTask, AutoTriggerInfo triggerInfo) { String wfId = scheduleTask.getId(); log.info("启动数字员工"+wfId+"定时任务线程" + scheduleTask.getId()); ScheduledFuture<?> scheduledFuture = threadPoolTaskScheduler.schedule(scheduleTask, new CronTrigger(triggerInfo.getLogicConfig())); scheduledFutureMap.put(wfId, scheduledFuture); return true; } /** * 取消 * * @param scheduleTask 定时任务 */ public static boolean cancel(ScheduleTask scheduleTask) { log.info("关闭定时任务线程 taskId " + scheduleTask.getId()); ScheduledFuture<?> scheduledFuture = scheduledFutureMap.get(scheduleTask.getId()); if (scheduledFuture != null && !scheduledFuture.isCancelled()) { scheduledFuture.cancel(false); } scheduledFutureMap.remove(scheduleTask.getId()); return true; } /** * 修改 * * @param scheduleTask 定时任务 * @param triggerInfo */ public static boolean reset(ScheduleTask scheduleTask, AutoTriggerInfo triggerInfo) { //先取消定时任务 cancel(scheduleTask); //然后启动新的定时任务 start(scheduleTask, triggerInfo); return true; } }
ScheduleTask类:ScheduleTask任务类
package com.merak.hyper.automation.Scheduling; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @version 1.0 * @ClassName: ScheduleTask * @description: ScheduleTask,关联任务id、用户id和具体执行的TaskService类,实现Runnable类 */ public class ScheduleTask implements Runnable { private static final int TIMEOUT = 30000; private String id; private String userId; private TaskService service; public static final Logger log = LoggerFactory.getLogger(ScheduleTask.class); public String getId() { return id; } /** * @param id 任务ID * @param service 业务类 */ public ScheduleTask(String id, String userId, TaskService service) { this.id = id; this.userId = userId; this.service = service; } @Override public void run() { log.info("ScheduleTask-执行数字员工消息的发送,id:"+ this.id + ",用户id:"+userId); service.work(this.id,this.userId); } }
/** * @version 1.0 * @ClassName: TaskService * @description: TaskService */ public interface TaskService { /** * 业务处理方法 * @param keyword 关键参数 * @param userId */ void work(String keyword,String userId); } /** * @description: TaskService实现类,具体执行定时调度的业务 */ @Service public class TaskServiceImpl implements TaskService { public static final Logger log = LoggerFactory.getLogger(TaskServiceImpl.class); @Autowired private IAutoDeviceInfoService deviceInfoService; @Override public void work(String wfId,String userId) { try { log.info("定时任务:根据数字员工wfId"+ wfId +",用户id:"+userId+",发送消息..."); //sendRobotMsg(wfId,userId); Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } }
二、通过WEB配置的变更,动态管理定时任务
ScheduledController类:scheduled Web业务层:启动、取消、修改等管理schedule
调度任务信息变更(如1:Trigger Cron变更 2:任务停止 3:任务新增加等)
package com.merak.hyper.automation.controller; import com.merak.hyper.automation.common.core.domain.AjaxResult; import com.merak.hyper.automation.common.core.vo.ScheduledApiVo; import com.merak.hyper.automation.persist.entity.AutoTriggerInfo; import com.merak.hyper.automation.persist.service.IAutoTriggerInfoService; import com.merak.hyper.automation.util.ScheduledHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; /** * @version 1.0 * @ClassName: ScheduledController * @description: scheduled Web业务层:启动、取消、修改等管理schedule */ @RestController @RequestMapping("/api/scheduled") public class ScheduledController { public static final Logger log = LoggerFactory.getLogger(ScheduledController.class); @Autowired private IAutoTriggerInfoService triggerInfoService; @Autowired private ScheduledHelper scheduledHelper; @PostMapping("/add") public AjaxResult addScheduleds(@RequestBody ScheduledApiVo scheduledApiVo){ AutoTriggerInfo autoTriggerInfo = triggerInfoService.getById(scheduledApiVo.getTriggerId()); scheduledHelper.addScheduleds(scheduledApiVo,autoTriggerInfo); return AjaxResult.success(); } @PostMapping("/reset") public AjaxResult resetScheduleds(@RequestBody ScheduledApiVo scheduledApiVo){ AutoTriggerInfo autoTriggerInfo = triggerInfoService.getById(scheduledApiVo.getTriggerId()); scheduledHelper.resetScheduleds(scheduledApiVo,autoTriggerInfo); return AjaxResult.success(); } @PostMapping("/stop") public AjaxResult stopScheduleds(@RequestBody ScheduledApiVo scheduledApiVo){ AutoTriggerInfo autoTriggerInfo = triggerInfoService.getById(scheduledApiVo.getTriggerId()); scheduledHelper.stopScheduleds(scheduledApiVo); return AjaxResult.success(); } } ScheduledHelper类:对外提供ScheduledHelper管理:创建、变更、停止 ```java package com.merak.hyper.automation.util; import com.merak.hyper.automation.Scheduling.ScheduleTask; import com.merak.hyper.automation.Scheduling.TaskService; import com.merak.hyper.automation.common.core.vo.ScheduledApiVo; import com.merak.hyper.automation.persist.entity.AutoTriggerInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; /** * @version 1.0 * @ClassName: ScheduledHelper * @description:对外提供ScheduledHelper管理:创建、变更、停止 */ @Component public class ScheduledHelper { public static final Logger log = LoggerFactory.getLogger(ScheduledHelper.class); /** * @description: 对外(Web)提供异步的Scheduleds增加操作 */ @Async("asyncTaskExecutor") public void addScheduleds(ScheduledApiVo scheduledApiVo, AutoTriggerInfo triggerInfo) { //addSchedule任务 log.warn("创建原数字员工["+scheduledApiVo.getWfId()+"],同步启动Schedule任务"); TaskService taskService = SpringContextUtils.getBean(TaskService.class); ScheduleUtil.start(new ScheduleTask(scheduledApiVo.getWfId(), scheduledApiVo.getUserId(), taskService), triggerInfo); } @Async("asyncTaskExecutor") public void resetScheduleds(ScheduledApiVo scheduledApiVo,AutoTriggerInfo triggerInfo) { //cron值改变,变更Schedule任务 log.warn("数字员工["+scheduledApiVo.getWfId()+"]关联的触发器信息cron值改变,变更Schedule任务"); TaskService taskService = SpringContextUtils.getBean(TaskService.class); ScheduleUtil.reset(new ScheduleTask(scheduledApiVo.getWfId(), scheduledApiVo.getUserId(), taskService), triggerInfo); } @Async("asyncTaskExecutor") public void stopScheduleds(ScheduledApiVo scheduledApiVo) { //移除Wfid,停止原Schedule任务 log.warn("原数字员工["+scheduledApiVo.getWfId()+"]无效,同步停止Schedule任务"); TaskService taskService = SpringContextUtils.getBean(TaskService.class); ScheduleUtil.cancel(new ScheduleTask(scheduledApiVo.getWfId(), scheduledApiVo.getUserId(), taskService)); } }
SpringContextUtils类:
package com.merak.hyper.automation.util; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; /** * @version 1.0 * @ClassName: SpringContextUtils * @description: 加载Class对象 */ @Component public class SpringContextUtils implements ApplicationContextAware { private static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { SpringContextUtils.applicationContext = applicationContext; } public static Object getBean(String name) { return applicationContext.getBean(name); } public static <T> T getBean(Class<T> requiredType) { return applicationContext.getBean(requiredType); } public static <T> T getBean(String name, Class<T> requiredType) { return applicationContext.getBean(name, requiredType); } public static boolean containsBean(String name) { return applicationContext.containsBean(name); } public static boolean isSingleton(String name) { return applicationContext.isSingleton(name); } public static Class<? extends Object> getType(String name) { return applicationContext.getType(name); } }
ScheduledApiVo类:
import java.io.Serializable; /** * @version 1.0 * @ClassName: ScheduledApiVo * @description: scheduled Web业务层Api传递参数Vo类 */ public class ScheduledApiVo implements Serializable { private String wfId; private String userId; private String triggerId; //set get 略 }
最终:Web端通过发送Http请求 ,调用ScheduledHelper管理类接口,实现Scheduled创建、变更、停止操作
log.info("3:云托管更新启动数字员工操作"); ScheduledApiVo scheduledApiVo = new ScheduledApiVo(); scheduledApiVo.setWfId(wfId); scheduledApiVo.setUserId(String.valueOf(updateUserId)); scheduledApiVo.setTriggerId(newTriggerInfo.getId()); String webHookBody = JSON.toJSONString(scheduledApiVo); EmsApiUtil.SendQuartzMessage(url, "add", webHookBody); ******************** 分隔 ************************ public static boolean SendQuartzMessage(String quartzUrl, String method, String webHookBody){ boolean result = false; try{ //org.apache.httpcomponents.httpclient sendPost,pom依赖如下dependency String resp = HttpClientUtil.sendPostByJson(quartzUrl+"/"+method, webHookBody,0); if( "error".equals(resp) || resp.contains("405 Not Allowed")){ log.error("调用任务调度中心消息发送失败,地址:"+quartzUrl); } else { JSONObject jsonObject = JSON.parseObject(resp); if( "200".equals(String.valueOf(jsonObject.get("code"))) ){ result = true; } else{ log.error("调用任务调度中心失败,msg:"+String.valueOf(jsonObject.get("msg"))); } } }catch (Exception e){ log.error("调用任务调度中心失败,msg:"+e.getMessage()); } return result; }
<dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.2</version> </dependency>
Das obige ist der detaillierte Inhalt vonWas ist die dynamische Verwaltungsmethode von SpringBoot Schedule-Planungsaufgaben?. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!