🎉🎉《Spring Boot实战案例合集》目前已更新152个案例,我们将持续不断的更新。文末有电子书目录。
💪💪永久更新承诺
我们郑重承诺,所有订阅合集的粉丝都将享受永久免费的后续更新服务。
💌💌如何获取
订阅我们的合集《点我订阅》,并通过私信联系我们,我们将第一时间将电子书发送给您。
环境:SpringBoot3.4.2
关于Spring Batch相关基础知识请查看下面文章:
SpringBoot整合Spring Batch批处理框架,处理大数据新方案
Spring Batch 秒级处理百万级数据,内存占用降低95%
1. 简介
Spring Batch默认是单线程的。为了实现并行处理,我们需要对批处理作业的步骤进行分区。
如上图所示,作业在左侧以一系列步骤(Step)实例的顺序运行,其中一个步骤实例被标记为管理者(manager)。图中的所有工作节点(workers)均为同一步骤(Step)的相同实例,实际上这些工作节点完全可以替代管理者的角色,且对作业的最终结果不会产生任何影响。工作节点通常为远程服务,但也可以是本地执行线程。
Spring Batch允许将输入数据从Manager步骤传递到Worker步骤,以便每个Worker都确切知道要做什么。JobRepository确保在作业的单个执行过程中,每个Worker只被执行一次。
分区使用多个线程来处理一系列数据集。数据集的范围可以通过编程方式定义。根据用例,我们可以决定在分区中创建多少个线程来使用。线程的数量纯粹基于需求/要求。
当我们需要从源系统中读取数百万条记录,并且不能仅依赖单个线程来处理所有记录(这可能会很耗时)时,分区就非常有用。我们希望使用多个线程来读取和处理数据,以有效地利用系统资源。
接下来,我们将通过Spring Batch分区功能实现百万数据的读取处理。
依赖管理
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-batch</artifactId></dependency>
开启批处理:
@Configuration@EnableBatchProcessingpublic class AppConfig {}
准备数据如下:
数据总是为:
2.1 自定义分区
分区器(Partitioner)是用于为分区步骤创建输入参数的核心策略接口,其输入参数形式为ExecutionContext实例。通常目标是生成一组互不重叠的输入值集合,例如一组无重叠的主键范围或唯一文件名。
在此示例中,我们通过查询表获取最大ID值和最小ID值(假设ID是连续的,如果你的ID是UUID,那么可以通过时间字段来划分),并基于此在所有记录间创建分区。
对于分区器,我们将网格大小(gridSize)设置为线程数量。可根据实际需求使用自定义值。
public class IdRangePartitioner implements Partitioner {private final JdbcClient jdbcClient;public IdRangePartitioner(JdbcClient jdbcClient) {this.jdbcClient = jdbcClient;}public Map<String, ExecutionContext> partition(int gridSize) {int min = this.jdbcClient.sql("SELECT MIN(id) FROM o_user").query(Integer.class).single() ;int max = this.jdbcClient.sql("SELECT MAX(id) FROM o_user").query(Integer.class).single() ;int targetSize = (max - min) / gridSize + 1;Map<String, ExecutionContext> result = new HashMap<>();int number = 0;int start = min;int end = start + targetSize - 1;while (start <= max) {ExecutionContext value = new ExecutionContext();result.put("partition" + number, value);if (end >= max) {end = max;}// 这里设置的值每一个Worker,都将会在具体的ItemReader中通过SpEL动态获取value.putInt("minValue", start);value.putInt("maxValue", end);start += targetSize;end += targetSize;number++;}return result ;}}
2.2 配置Job
接下来,我们需要配置job执行所需要的bean(Reader,Writer,Step)。
public class JobConfig {private final JobRepository jobRepository;private final PlatformTransactionManager transactionManager;private final DataSource dataSource ;public JobConfig(JobRepository jobRepository, PlatformTransactionManager transactionManager,DataSource dataSource) {this.jobRepository = jobRepository;this.transactionManager = transactionManager;this.dataSource = dataSource ;}IdRangePartitioner partitioner(JdbcClient jdbcClient) {IdRangePartitioner columnRangePartitioner = new IdRangePartitioner(jdbcClient);return columnRangePartitioner;}JdbcPagingItemReader<User> pagingItemReader(("#{stepExecutionContext['minValue']}") Long minValue,("#{stepExecutionContext['maxValue']}") Long maxValue) {System.out.println("reading " + minValue + " to " + maxValue);Map<String, Order> sortKeys = new HashMap<>();sortKeys.put("id", Order.ASCENDING);MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();queryProvider.setSelectClause("id, name, age, phone, sex");queryProvider.setFromClause("from o_user");queryProvider.setWhereClause("where id >= " + minValue + " and id < " + maxValue);queryProvider.setSortKeys(sortKeys);JdbcPagingItemReader<User> reader = new JdbcPagingItemReader<>();reader.setDataSource(this.dataSource);reader.setFetchSize(1000);reader.setRowMapper(new UserRowMapper());reader.setQueryProvider(queryProvider);return reader;}JdbcBatchItemWriter<User> userItemWriter() {JdbcBatchItemWriter<User> itemWriter = new JdbcBatchItemWriter<>();itemWriter.setDataSource(dataSource);itemWriter.setSql("insert into n_user values (:id, :name, :age, :phone, :sex)");itemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());return itemWriter;}// 主StepStep stepMaster(IdRangePartitioner partitioner) {return new StepBuilder("stepMaster", jobRepository).partitioner(slaveStep().getName(), partitioner).step(slaveStep())// 配置线程数.gridSize(50).taskExecutor(new SimpleAsyncTaskExecutor()).build();}Step slaveStep() {return new StepBuilder("slaveStep", jobRepository).<User, User>chunk(1000, transactionManager).reader(pagingItemReader(null, null)).writer(userItemWriter()).build();}Job job(("stepMaster") Step stepMaster) {return new JobBuilder("job", jobRepository).start(stepMaster).build();}}
说明:
stepMaster中设置了任务执行器SimpleAsyncTaskExecutor,使用该执行器我们也可以非常方便的设置使用虚拟线程执行(setVirtualThreads(true))。
步骤(Step)中使用的资源(如数据源DataSource)可能存在并发限制。
IdRangePartitioner:用于为分区步骤创建输入参数的核心策略接口,输入参数形式为ExecutionContext实例。
JdbcPagingItemReader:此Bean使用分页方式读取数据,并根据范围接受最小值(minValue)和最大值(maxValue),获取该范围内的数据。同时我们设置每次读取1000条。
JdbcBatchItemWriter:此Bean将数据写入另一个表。
Step:这是批处理作业中配置的步骤,负责数据的读取和写入操作。
Job:表示批处理作业的批处理领域对象。
2.3 其它辅助类
User实体对象
public class User {private Integer id ;private String name ;private Integer age ;private String phone ;private String sex ;// getters, setters}
表数据到对象的映射Mapper类
public class UserRowMapper implements RowMapper<User> {public User mapRow(ResultSet rs, int rowNum) throws SQLException {User user = new User();user.setId(rs.getInt("id"));user.setName(rs.getString("name"));user.setPhone(rs.getString("phone"));user.setSex(rs.getString("sex"));user.setAge(rs.getInt("age"));return user;}}
2.4 测试
public class TaskRunner implements CommandLineRunner {private final JobLauncher jobLauncher;private final Job job;public TaskRunner(JobLauncher jobLauncher, Job job) {this.jobLauncher = jobLauncher;this.job = job;}public void run(String... args) throws Exception {long start = System.currentTimeMillis() ;JobParameters jobParameters = new JobParametersBuilder().addString("JobId", String.valueOf(System.currentTimeMillis())).toJobParameters();JobExecution execution = jobLauncher.run(job, jobParameters);System.err.println("STATUS :: " + execution.getStatus());}}
启动服务后,控制台首先输出的是分区信息:
当完成以后输出结果:
推荐文章
Spring AI + RAG:网页智能解析,精准问答一键开启!
小心此Bug!Spring Boot 事务方法没有异常,为什么事务还是回滚了?
Tika 与 Spring Boot 的完美结合:支持任意文档解析的神器
优雅!基于Spring Boot字段加密后的模糊查询,支持MyBatis, JPA
查漏补缺!OpenFeign整合Resilience4j,你真的会用吗?
API接口优化!基于Spring Boot 实现Deflate压缩技术


