在您的个人或专业项目中,您有时会处理大量数据。数据批处理是处理大量数据的有效方法,其中数据被收集、处理,然后产生批处理结果。批处理可以应用于许多用例。批处理的一个常见用例是将大量 CSV 或 JSON 文件转换为可供进一步处理的结构化格式。
在本教程中,我们将尝试了解如何使用 Spring Boot 来设置此架构,Spring Boot 是一个促进基于 Spring 的应用程序开发的框架。
Spring Batch 是一个用于批处理的开源框架。它是一个轻量级的综合解决方案,旨在支持现代企业系统中常见的强大批处理应用程序的开发。它的开发是 SpringSource 和 Accenture 合作的结果。
它可以克服批量开发过程中反复出现的问题:
注意:在 IT 中,批处理是一个独立运行的程序,对大量数据执行一组处理操作。
为了管理批量数据,我们主要使用以下三个工具:
JobLauncher:这是负责启动/启动批处理程序的组件。它可以配置为自行触发或由外部事件触发(手动启动)。在 Spring Batch 工作流程中,JobLauncher 负责执行 Job。
作业:这是代表任务的组件,该任务被委派负责程序中解决的业务需求。它负责顺序启动一个或多个步骤。
步骤:这是包含需要解决的业务核心的组件。它负责定义结构如下的三个子组件:
ItemReader:这是负责读取要处理的输入数据的组件。它们可以来自各种来源(数据库、平面文件(csv、xml、xls 等)、队列);
ItemProcessor:这是负责转换读取的数据的组件。所有管理规则都在其中实施。
ItemWriter:此组件将处理器转换后的数据保存在一个或多个所需容器(数据库、平面文件(csv、xml、xls 等)、云)中。
JobRepository:这是负责记录每次执行时对 JobLauncher、Job 和 Step(s) 的监视统计数据的组件。它提供了两种可能的技术来存储这些统计数据:使用数据库或使用映射。当统计信息存储在数据库中并因此以持久的方式保留时,这允许随着时间的推移连续监控批次,以便分析发生故障时可能出现的问题。相反,当它在 Map 中时,持久化的统计信息将在每个 Batch 执行实例结束时丢失。在所有情况下,都必须配置其中之一。
欲了解更多信息,我建议您查阅 Spring 网站。
在简要解释了 Spring Batch 架构之后,现在让我们尝试展示如何设置 Spring Batch 作业,该作业将从 CSV 文件中读取数据,随后将其插入数据库中。”让我们开始编码”。
生成 Spring Boot 项目的最简单方法是使用 Spring Boot Tool,步骤如下:
项目生成后,您必须将其解压缩然后将其导入到您的 IDE 中。
使用的技术:
所有项目依赖项都在 pom.xml 文件中。 POM 三个字母是项目对象模型的缩写。它的 XML 表示形式由 Maven 转换为表示项目模型的数据结构。
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.3.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.pathus</groupId> <artifactId>SpringBatchExample</artifactId> <version>0.0.1-SNAPSHOT</version> <name>SpringBatchExample</name> <description>Demo of spring batch project </description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
项目结构如下:
要启用批处理,我们需要在配置类上添加@EnableBatchProcessing注解。然后,我们必须创建一个读取器来读取 CSV 文件,创建一个处理器来在写入之前处理输入数据,创建一个写入器来写入数据库。
import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.LineMapper; import org.springframework.batch.item.file.mapping.DefaultLineMapper; import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ClassPathResource; import com.pathus90.springbatchexample.batch.StudentProcessor; import com.pathus90.springbatchexample.batch.StudentWriter; import com.pathus90.springbatchexample.model.Student; import com.pathus90.springbatchexample.model.StudentFieldSetMapper; @Configuration @EnableBatchProcessing @EnableScheduling public class BatchConfig { private static final String FILE_NAME = "results.csv"; private static final String JOB_NAME = "listStudentsJob"; private static final String STEP_NAME = "processingStep"; private static final String READER_NAME = "studentItemReader"; @Value("${header.names}") private String names; @Value("${line.delimiter}") private String delimiter; @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Bean public Step studentStep() { return stepBuilderFactory.get(STEP_NAME) .<Student, Student>chunk(5) .reader(studentItemReader()) .processor(studentItemProcessor()) .writer(studentItemWriter()) .build(); } @Bean public Job listStudentsJob(Step step1) { return jobBuilderFactory.get(JOB_NAME) .start(step1) .build(); } @Bean public ItemReader<Student> studentItemReader() { FlatFileItemReader<Student> reader = new FlatFileItemReader<>(); reader.setResource(new ClassPathResource(FILE_NAME)); reader.setName(READER_NAME); reader.setLinesToSkip(1); reader.setLineMapper(lineMapper()); return reader; } @Bean public LineMapper<Student> lineMapper() { final DefaultLineMapper<Student> defaultLineMapper = new DefaultLineMapper<>(); final DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer(); lineTokenizer.setDelimiter(delimiter); lineTokenizer.setStrict(false); lineTokenizer.setNames(names.split(delimiter)); final StudentFieldSetMapper fieldSetMapper = new StudentFieldSetMapper(); defaultLineMapper.setLineTokenizer(lineTokenizer); defaultLineMapper.setFieldSetMapper(fieldSetMapper); return defaultLineMapper; } @Bean public ItemProcessor<Student, Student> studentItemProcessor() { return new StudentProcessor(); } @Bean public ItemWriter<Student> studentItemWriter() { return new StudentWriter(); } }
第一个方法定义作业,第二个方法定义单个步骤。作业是通过步骤创建的,其中每个步骤都可以涉及读取器、处理器和写入器。 在步骤定义中,我们定义一次写入的数据量,在我们的示例中,一次最多写入 5 条记录。然后,我们使用之前注入的 beans 配置读取器、处理器和写入器。在定义我们的工作时,它将能够通过精确的顺序定义我们执行中的不同步骤。步骤 StudentStep 将由作业 listStudentsJob 执行。
@Bean public Step studentStep() { return stepBuilderFactory.get(STEP_NAME) .<Student, Student>chunk(5) .reader(studentItemReader()) .processor(studentItemProcessor()) .writer(studentItemWriter()) .build(); } @Bean public Job listStudentsJob(Step step1) { return jobBuilderFactory.get(JOB_NAME) .start(step1) .build(); }
在我们的批处理配置中,Reader 读取数据源并在一个步骤中连续调用并返回为其定义的对象(在我们的示例中为学生)。
@Bean public ItemReader<Student> studentItemReader() { FlatFileItemReader<Student> reader = new FlatFileItemReader<>(); reader.setResource(new ClassPathResource(FILE_NAME)); reader.setName(READER_NAME); reader.setLinesToSkip(1); reader.setLineMapper(lineMapper()); return reader; }
FlatFileItemReader 类使用 DefaultLineMapper 类,而后者又使用 DelimitedLineTokenizer 类。 DelimitedLineTokenizer 的作用是将每一行分解为 FieldSet 对象,names 属性给出文件头的格式并允许识别每一行的数据。数据转换实现类使用此名称属性通过 FieldSet 对象将其转换为业务对象。这是由 fieldSetMapper (StudentFieldSetMapper) 属性指示的类。
import org.springframework.batch.item.file.mapping.FieldSetMapper; import org.springframework.batch.item.file.transform.FieldSet; public class StudentFieldSetMapper implements FieldSetMapper<Student> { @Override public Student mapFieldSet(FieldSet fieldSet) { return Student.builder() .rank(fieldSet.readString(0)) .firstName(fieldSet.readString(1)) .lastName(fieldSet.readString(2)) .center(fieldSet.readString(3)) .pv(fieldSet.readString(4)) .origin(fieldSet.readString(5)) .mention(fieldSet.readString(6)) .build(); } }
LineMapper 接口用于将行(字符串)映射到通常用于映射从文件读取的行的对象
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.3.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.pathus</groupId> <artifactId>SpringBatchExample</artifactId> <version>0.0.1-SNAPSHOT</version> <name>SpringBatchExample</name> <description>Demo of spring batch project </description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
与 Reader 不同,Processor 的实现更多的是为了功能需求。它不是强制性的,如果在我们的处理中没有预见到功能需要,我们可以不使用它。在我们的示例中,我们编写了一个简单的处理器,它仅将学生对象的一些属性转换为大写,我们可以使用以下代码超越此示例。更具体的功能案例。
import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.LineMapper; import org.springframework.batch.item.file.mapping.DefaultLineMapper; import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ClassPathResource; import com.pathus90.springbatchexample.batch.StudentProcessor; import com.pathus90.springbatchexample.batch.StudentWriter; import com.pathus90.springbatchexample.model.Student; import com.pathus90.springbatchexample.model.StudentFieldSetMapper; @Configuration @EnableBatchProcessing @EnableScheduling public class BatchConfig { private static final String FILE_NAME = "results.csv"; private static final String JOB_NAME = "listStudentsJob"; private static final String STEP_NAME = "processingStep"; private static final String READER_NAME = "studentItemReader"; @Value("${header.names}") private String names; @Value("${line.delimiter}") private String delimiter; @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Bean public Step studentStep() { return stepBuilderFactory.get(STEP_NAME) .<Student, Student>chunk(5) .reader(studentItemReader()) .processor(studentItemProcessor()) .writer(studentItemWriter()) .build(); } @Bean public Job listStudentsJob(Step step1) { return jobBuilderFactory.get(JOB_NAME) .start(step1) .build(); } @Bean public ItemReader<Student> studentItemReader() { FlatFileItemReader<Student> reader = new FlatFileItemReader<>(); reader.setResource(new ClassPathResource(FILE_NAME)); reader.setName(READER_NAME); reader.setLinesToSkip(1); reader.setLineMapper(lineMapper()); return reader; } @Bean public LineMapper<Student> lineMapper() { final DefaultLineMapper<Student> defaultLineMapper = new DefaultLineMapper<>(); final DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer(); lineTokenizer.setDelimiter(delimiter); lineTokenizer.setStrict(false); lineTokenizer.setNames(names.split(delimiter)); final StudentFieldSetMapper fieldSetMapper = new StudentFieldSetMapper(); defaultLineMapper.setLineTokenizer(lineTokenizer); defaultLineMapper.setFieldSetMapper(fieldSetMapper); return defaultLineMapper; } @Bean public ItemProcessor<Student, Student> studentItemProcessor() { return new StudentProcessor(); } @Bean public ItemWriter<Student> studentItemWriter() { return new StudentWriter(); } }
写入器写入来自处理器的数据(或直接由读取器读取)。在我们的例子中,它从处理器接收转换后的对象,每个对象随后将被保存在我们的数据库中,并且交易将被验证。
@Bean public Step studentStep() { return stepBuilderFactory.get(STEP_NAME) .<Student, Student>chunk(5) .reader(studentItemReader()) .processor(studentItemProcessor()) .writer(studentItemWriter()) .build(); } @Bean public Job listStudentsJob(Step step1) { return jobBuilderFactory.get(JOB_NAME) .start(step1) .build(); }
@Bean public ItemReader<Student> studentItemReader() { FlatFileItemReader<Student> reader = new FlatFileItemReader<>(); reader.setResource(new ClassPathResource(FILE_NAME)); reader.setName(READER_NAME); reader.setLinesToSkip(1); reader.setLineMapper(lineMapper()); return reader; }
import org.springframework.batch.item.file.mapping.FieldSetMapper; import org.springframework.batch.item.file.transform.FieldSet; public class StudentFieldSetMapper implements FieldSetMapper<Student> { @Override public Student mapFieldSet(FieldSet fieldSet) { return Student.builder() .rank(fieldSet.readString(0)) .firstName(fieldSet.readString(1)) .lastName(fieldSet.readString(2)) .center(fieldSet.readString(3)) .pv(fieldSet.readString(4)) .origin(fieldSet.readString(5)) .mention(fieldSet.readString(6)) .build(); } }
完成批量配置设置后,让我们看看上面所说的一切是否有效
要运行应用程序,您需要查找包含注释@SpringBootApplication的文件,这是我们应用程序的主要部分。
@Bean public LineMapper<Student> lineMapper() { final DefaultLineMapper<Student> defaultLineMapper = new DefaultLineMapper<>(); final DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer(); lineTokenizer.setDelimiter(delimiter); lineTokenizer.setStrict(false); lineTokenizer.setNames(names.split(delimiter)); final StudentFieldSetMapper fieldSetMapper = new StudentFieldSetMapper(); defaultLineMapper.setLineTokenizer(lineTokenizer); defaultLineMapper.setFieldSetMapper(fieldSetMapper); return defaultLineMapper; }
启动上面的 main 将开始我们的工作,批处理启动器如下所示:
import org.springframework.batch.item.ItemProcessor; import com.pathus90.springbatchexample.model.Student; public class StudentProcessor implements ItemProcessor<Student, Student> { @Override public Student process(Student student) { student.setFirstName(student.getFirstName().toUpperCase()); student.setLastName(student.getLastName().toUpperCase()); student.setCenter(student.getCenter().toUpperCase()); student.setOrigin(student.getOrigin().toUpperCase()); student.setMention(student.getMention().toUpperCase()); return student; } }
已设置调度程序以允许自动触发批次。在此示例中,批量启动后将每 8 秒运行一次。您可以通过更改 fixedDelay 值(以毫秒为单位)来使用它。
import java.util.List; import org.springframework.batch.item.ItemWriter; import org.springframework.beans.factory.annotation.Autowired; import com.pathus90.springbatchexample.model.Student; import com.pathus90.springbatchexample.service.IStudentService; import lombok.extern.slf4j.Slf4j; @Slf4j public class StudentWriter implements ItemWriter<Student> { @Autowired private IStudentService studentService; @Override public void write(List<? extends Student> students) { students.stream().forEach(student -> { log.info("Enregistrement en base de l'objet {}", student); studentService.insertStudent(student); }); } }
除了运行上面的主文件来启动批处理之外,您还可以在使用命令提示符时运行命令 mvn spring-boot:run。
您还可以使用 JAR 存档文件启动应用程序,在这种情况下,您必须:
使用命令提示符转到项目的父文件夹并执行命令 mvn clean package 这将打包我们的项目。
在目标文件夹中,将创建一个 jar 文件。
要运行应用程序,请使用命令java -jar target/ generated_file_name-0.0.1-SNAPSHOT.jar
还要确保在启动我们的 Spring Batch 应用程序时 H2 控制台已经启动,并且自动生成数据库以及创建 Student 表。
我们可以清楚地看到我们的文件已经很好地集成到我们的数据库中。
注意:如果我们还想手动启动批处理而不传递将根据我们的设置触发的调度程序,我已经公开了一个使用控制器来调用的 API春季工作批次。
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.3.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.pathus</groupId> <artifactId>SpringBatchExample</artifactId> <version>0.0.1-SNAPSHOT</version> <name>SpringBatchExample</name> <description>Demo of spring batch project </description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
只需启动 URL:http://localhost:8080/load,批处理就会启动
我们第一次学习使用 Spring 框架的批处理编程已经结束了。如果您有任何意见或问题,请留言!
祝大家学习愉快,希望第一篇教程对您有所帮助。
您可以在这里找到可用的源代码
以上是使用 SPRING BATCH 开始编程的详细内容。更多信息请关注PHP中文网其他相关文章!