L'éditeur php Xigua peut rencontrer un problème lors de l'utilisation de Spring Batch : lorsque vous essayez de démarrer deux ou plusieurs tâches Spring Batch en même temps, une erreur sera générée : ORA-08177 : Impossible de sérialiser l'accès à cette transaction. Cette erreur peut prêter à confusion, mais elle est en réalité causée par un problème de verrouillage de la base de données. Avant de résoudre ce problème, nous devons comprendre certaines connaissances de base sur Spring Batch et les transactions de base de données.
J'ai rencontré une erreur en essayant d'utiliser completablefuture pour exécuter deux tâches Spring Batch en parallèle. Le message d'erreur est le suivant :
originalsql = insert into batch_job_instance(job_instance_id, job_name, job_key, version) values (?, ?, ?, ?) , error msg = ora-08177: can't serialize access for this transaction
J'utilise Spring Batch 5.x, Spring Boot 3.1.6, JDK 17
Propriétés de l'application
spring.batch.repository.isolationlevelforcreate=isolation_read_committed spring.batch.isolationlevel=read_committed spring.batch.jdbc.table-prefix=batch_ spring.batch.job.enabled=false
batchconfig.java
@configuration public class batchconfig { @bean("simpletaskexecutor") public taskexecutor simpletaskexecutor() { simpleasynctaskexecutor asynctaskexecutor = new simpleasynctaskexecutor("simpletaskexecutor"); asynctaskexecutor.setconcurrencylimit(concurrencycount); return asynctaskexecutor; } @bean("joblauncherasync") @scope("prototype") public joblauncher joblauncherasync(datasource datasource, jobrepository jobrepository) throws exception { taskexecutorjoblauncher joblauncher = new taskexecutorjoblauncher(); joblauncher.setjobrepository(jobrepository); joblauncher.settaskexecutor(simpletaskexecutor()); joblauncher.afterpropertiesset(); return joblauncher; } @bean public jpatransactionmanager transactionmanager(entitymanagerfactory entitymanagerfactory) { return new jpatransactionmanager(entitymanagerfactory); } @bean public batchjobexecutionlistener batchjobexecutionlistener() { return new batchjobexecutionlistener(); } @bean public batchjobstepexecutionlistner batchjobstepexecutionlistner() { return new batchjobstepexecutionlistner(); }
}
employeejobconfig.java
@configuration @import(batchconfig.class) public class employeejobconfig{ @autowired employeestatuswritter employeestatuswritter; @autowired employeependingprocessor employeependingprocessor; @bean("employeependingreader") public jpapagingitemreader<employee> employeependingreader(datasource ds,entitymanagerfactory entitymanagerfactory) { jpapagingitemreader<employee> jpareader = new jpapagingitemreader<>(); jpareader.setentitymanagerfactory(entitymanagerfactory); jpareader.setquerystring("select e from employee e"); jpareader.setpagesize(5000); return jpareader; } @bean("employeespinvokestep") public step employeespinvokestep(@qualifier("employeependingreader") itemreader<employee> reader, @qualifier("employeestatuswritter") itemwriter<employee> writer, @qualifier("employeependingprocessor") itemprocessor<employee, employee> processor,jobrepository jobrepository, platformtransactionmanager transactionmanager,batchjobstepexecutionlistner batchjobstepexecutionlistner,taskexecutor simpletaskexecutor) { return new stepbuilder("employeespinvokestep",jobrepository) .<employee, employee>chunk(50,transactionmanager).reader(reader) .processor(processor).writer(writer) .listener(batchjobstepexecutionlistner) .taskexecutor(simpletaskexecutor) .build(); } @bean("employeespjob") public job employeespjob(@qualifier("employeespinvokestep") step employeespinvokestep,jobrepository jobrepository,batchjobexecutionlistener batchjobexecutionlistener) { return new jobbuilder("employeespjob",jobrepository) .incrementer(new runidincrementer()) .listener(batchjobexecutionlistener) .start(employeespinvokestep) .build(); } }
managerconfig.java
@configuration @import(batchconfig.class) public class managerconfig { @autowired managerstatuswritter managerstatuswritter; @autowired managerpendingprocessor managerpendingprocessor; @bean("managerpendingreader") public jpapagingitemreader<manager> managerpendingreader(datasource ds,entitymanagerfactory entitymanagerfactory) { jpapagingitemreader<manager> jpareader = new jpapagingitemreader<>(); jpareader.setentitymanagerfactory(entitymanagerfactory); jpareader.setquerystring("select m from manager m"); jpareader.setpagesize(5000); return jpareader; } @bean("managerspinvokestep") public step indvinvoiceconsctlspinvokestep(@qualifier("managerpendingreader") itemreader<manager> reader, @qualifier("managerstatuswritter") itemwriter<manager> writer, @qualifier("managerpendingprocessor") itemprocessor<manager, manager> processor,jobrepository jobrepository, platformtransactionmanager transactionmanager,batchjobstepexecutionlistner batchjobstepexecutionlistner,taskexecutor simpletaskexecutor) { return new stepbuilder("managerspinvokestep",jobrepository) .<manager, manager>chunk(5000,transactionmanager).reader(reader) .processor(processor).writer(writer) .listener(batchjobstepexecutionlistner) .taskexecutor(simpletaskexecutor) .build(); } @bean("managerspjob") public job managerspjob(@qualifier("managerspinvokestep") step indvinvoiceconsctlspinvokestep,jobrepository jobrepository,batchjobexecutionlistener batchjobexecutionlistener) { return new jobbuilder("managerspjob",jobrepository) .incrementer(new runidincrementer()) .listener(batchjobexecutionlistener) .start(indvinvoiceconsctlspinvokestep) .build(); } }
batchjobmanager.java
@service public class batchjobmanager { @autowired applicationcontext context; @autowired batchexecutorservice batchexecutorservice; @autowired batchjobrunner batchjobrunner; public void startjob() { try { system.out.println("batchjobmanager called .. "+new date()); string[] invoicenames={"employeespjob","managerspjob"}; list<string> invoicenameslist = arrays.aslist(invoicenames); launchasyn(getbatchjoblist(invoicenameslist)); } catch (exception e) { system.out.println("while loading job.."); e.printstacktrace(); } } public list<batchjob> getbatchjoblist(list<string> jobnames) throws exception{ list<batchjob> batchjoblist=new arraylist<batchjob>(); for(string job:jobnames) { batchjob batchjob= batchjob.builder().jobname(invoicejob).build(); batchjoblist.add(batchjob); } return batchjoblist; } public void launchasyn( list<batchjob> batchjoblist) throws exception{ list<completablefuture<batchjob>> batchjobfuturelist = new arraylist<completablefuture<batchjob>>(); for(batchjob batchjob:batchjoblist) { completablefuture<batchjob> jobfuture = batchexecutorservice.execute(batchjob, asynctaskexecutor); batchjobfuturelist.add(jobfuture); } completablefuture<void> jobfutureresult = completablefuture .allof(batchjobfuturelist.toarray(new completablefuture[batchjobfuturelist.size()])); completablefuture<list<canbatchjob>> allcompletablefuture = jobfutureresult.thenapply(future -> { return batchjobfuturelist.stream().map(completablefuture -> completablefuture.join()) .collect(collectors.tolist()); }); list<batchjob> resultfuturelist=allcompletablefuture.get(); for(batchjob batch:resultfuturelist) { system.out.println("status "+batch.getiscompleted()); } } }
batchexecutorservice.java
@service public class batchexecutorservice { @autowired batchjobrunner batchjobrunner; public completablefuture<canbatchjob> execute(canbatchjob canbatchjob,taskexecutor threadpooltaskexecutor){ return completablefuture.supplyasync(() -> batchjobrunner.execute(canbatchjob),threadpooltaskexecutor); } }
batchjobrunner.java
@service public class batchjobrunner { @autowired applicationcontext context; @autowired @qualifier("joblauncherasync") joblauncher joblauncherasync; /* * @autowired joblauncher joblauncherasync; */ public batchjob execute(batchjob batchjob) { try { system.out.println(" batchjob"+batchjob.getjobname()+" called ..."); joblauncherasync.run(getjob(batchjob.getjobname()), getjobparameters(batchjob.getjobname())); thread.sleep(15000); batchjob.setiscompleted(true); system.out.println(" batchjob"+batchjob.getjobname()+" completed ..."); } catch(exception e) { system.out.println("exception "+e.getmessage()); batchjob.seterrordesc(e.getmessage().tostring()); e.printstacktrace(); } return canbatchjob; } public job getjob(string jobname) { return (job) context.getbean(jobname); } public jobparameters getjobparameters(string jobname) { jobparameters jobparameters = new jobparametersbuilder() .addstring("unique_id", uuid.randomuuid().tostring(), true) .addstring("job_name", jobname, true) .adddate("execution_start_date", date.from(instant.now()), true).tojobparameters(); return jobparameters; } }
batchjob.java
public class BatchJob { private String jobName; private Boolean isCompleted; private String errorDesc; }
Les tâches s'exécutent avec succès lorsqu'elles sont exécutées une par une ou dans l'ordre. Cependant, lors de l'utilisation de completeablefuture, j'ai rencontré un problème. Est-ce que démarrer les travaux par lots de printemps en même temps est la bonne manière ?
1. Ajoutez les propriétés suivantes à application.properties
.spring.main.allow-bean-definition-overriding=true
2. Joblauncher() mis à jour, comme indiqué dans batchconfig.java ci-dessous
@bean("joblauncher") public joblauncher joblauncher(datasource datasource, jobrepository jobrepository) throws exception { taskexecutorjoblauncher joblauncher = new taskexecutorjoblauncher(); joblauncher.setjobrepository(jobrepository); joblauncher.settaskexecutor(simpletaskexecutor()); joblauncher.afterpropertiesset(); return joblauncher; }
Suppression de completeablefuture.
batchexecutorservice.java supprimé
Ajout des méthodes suivantes dans batchjobmanager.java pour appeler des tâches.
public void launchSync(List<BatchJob> batchJobList) throws Exception { for(BatchJob batchJob:batchJobList) { batchJobRunner.execute(batchJob); } }
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!