大数跨境
0
0

绝了!Spring Batch 百万数据分区处理,仅需5秒搞定

绝了!Spring Batch 百万数据分区处理,仅需5秒搞定 Spring全家桶实战案例
2025-08-01
0
导读:绝了!Spring Batch 分区处理,百万数据秒级拿下
Spring Boot 3实战案例锦集PDF电子书已更新至130篇!
图片

🎉🎉《Spring Boot实战案例合集》目前已更新152个案例,我们将持续不断的更新。文末有电子书目录。

💪💪永久更新承诺

我们郑重承诺,所有订阅合集的粉丝都将享受永久免费的后续更新服务

💌💌如何获取
订阅我们的合集点我订阅,并通过私信联系我们,我们将第一时间将电子书发送给您。

→ 现在就订阅合集

环境:SpringBoot3.4.2



关于Spring Batch相关基础知识请查看下面文章:

SpringBoot整合Spring Batch批处理框架,处理大数据新方案

Spring Batch 秒级处理百万级数据,内存占用降低95%

1. 简介

Spring Batch默认是单线程的。为了实现并行处理,我们需要对批处理作业的步骤进行分区。

如上图所示,作业在左侧以一系列步骤(Step)实例的顺序运行,其中一个步骤实例被标记为管理者(manager)。图中的所有工作节点(workers)均为同一步骤(Step)的相同实例,实际上这些工作节点完全可以替代管理者的角色,且对作业的最终结果不会产生任何影响。工作节点通常为远程服务,但也可以是本地执行线程。

Spring Batch允许将输入数据从Manager步骤传递到Worker步骤,以便每个Worker都确切知道要做什么。JobRepository确保在作业的单个执行过程中,每个Worker只被执行一次。

分区使用多个线程来处理一系列数据集。数据集的范围可以通过编程方式定义。根据用例,我们可以决定在分区中创建多少个线程来使用。线程的数量纯粹基于需求/要求。

当我们需要从源系统中读取数百万条记录,并且不能仅依赖单个线程来处理所有记录(这可能会很耗时)时,分区就非常有用。我们希望使用多个线程来读取和处理数据,以有效地利用系统资源。

接下来,我们将通过Spring Batch分区功能实现百万数据的读取处理。

2.实战案例

依赖管理

<dependency>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-batch</artifactId></dependency>

开启批处理:

@Configuration@EnableBatchProcessingpublic class AppConfig {}

准备数据如下:

数据总是为:

2.1 自定义分区

分区器(Partitioner)是用于为分区步骤创建输入参数的核心策略接口,其输入参数形式为ExecutionContext实例。通常目标是生成一组互不重叠的输入值集合,例如一组无重叠的主键范围或唯一文件名。

在此示例中,我们通过查询表获取最大ID值和最小ID值(假设ID是连续的,如果你的ID是UUID,那么可以通过时间字段来划分),并基于此在所有记录间创建分区。

对于分区器,我们将网格大小(gridSize)设置为线程数量。可根据实际需求使用自定义值。

public class IdRangePartitioner implements Partitioner {
  private final JdbcClient jdbcClient;  public IdRangePartitioner(JdbcClient jdbcClient) {    this.jdbcClient = jdbcClient;  }  @Override  public Map<String, ExecutionContext> partition(int gridSize) {    int min = this.jdbcClient.sql("SELECT MIN(id) FROM o_user").query(Integer.class).single() ;    int max = this.jdbcClient.sql("SELECT MAX(id) FROM o_user").query(Integer.class).single() ;    int targetSize = (max - min) / gridSize + 1;
    Map<String, ExecutionContext> result = new HashMap<>();    int number = 0;    int start = min;    int end = start + targetSize - 1;    while (start <= max) {      ExecutionContext value = new ExecutionContext();      result.put("partition" + number, value);      if (end >= max) {        end = max;      }      // 这里设置的值每一个Worker,都将会在具体的ItemReader中通过SpEL动态获取      value.putInt("minValue", start);      value.putInt("maxValue", end);      start += targetSize;      end += targetSize;      number++;    }    return result ;  }}

2.2 配置Job

接下来,我们需要配置job执行所需要的bean(Reader,Writer,Step)。

@Configurationpublic class JobConfig {
  private final JobRepository jobRepository;  private final PlatformTransactionManager transactionManager;  private final DataSource dataSource ;  public JobConfig(JobRepository jobRepository, PlatformTransactionManager transactionManager,       DataSource dataSource) {    this.jobRepository = jobRepository;    this.transactionManager = transactionManager;    this.dataSource = dataSource ;  }  @Bean  IdRangePartitioner partitioner(JdbcClient jdbcClient) {    IdRangePartitioner columnRangePartitioner = new IdRangePartitioner(jdbcClient);    return columnRangePartitioner;  }
  @Bean  @StepScope  JdbcPagingItemReader<UserpagingItemReader(      @Value("#{stepExecutionContext['minValue']}") Long minValue,      @Value("#{stepExecutionContext['maxValue']}") Long maxValue) {    System.out.println("reading " + minValue + " to " + maxValue);    Map<StringOrder> sortKeys = new HashMap<>();    sortKeys.put("id"Order.ASCENDING);    MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();    queryProvider.setSelectClause("id, name, age, phone, sex");    queryProvider.setFromClause("from o_user");    queryProvider.setWhereClause("where id >= " + minValue + " and id < " + maxValue);    queryProvider.setSortKeys(sortKeys);    JdbcPagingItemReader<User> reader = new JdbcPagingItemReader<>();    reader.setDataSource(this.dataSource);    reader.setFetchSize(1000);    reader.setRowMapper(new UserRowMapper());    reader.setQueryProvider(queryProvider);    return reader;  }
  @Bean  @StepScope  JdbcBatchItemWriter<UseruserItemWriter() {    JdbcBatchItemWriter<User> itemWriter = new JdbcBatchItemWriter<>();    itemWriter.setDataSource(dataSource);    itemWriter.setSql("insert into n_user values (:id, :name, :age, :phone, :sex)");    itemWriter.setItemSqlParameterSourceProvider        (new BeanPropertyItemSqlParameterSourceProvider<>());    return itemWriter;  }
  // 主Step  @Bean  Step stepMaster(IdRangePartitioner partitioner) {    return new StepBuilder("stepMaster", jobRepository)        .partitioner(slaveStep().getName(), partitioner)        .step(slaveStep())        // 配置线程数        .gridSize(50)        .taskExecutor(new SimpleAsyncTaskExecutor())        .build();  }
  @Bean  Step slaveStep() {    return new StepBuilder("slaveStep", jobRepository)        .<UserUser>chunk(1000, transactionManager)        .reader(pagingItemReader(nullnull))        .writer(userItemWriter())        .build();  }
  @Bean  Job job(@Qualifier("stepMaster") Step stepMaster) {    return new JobBuilder("job", jobRepository)        .start(stepMaster)        .build();  }}

说明:

  • stepMaster中设置了任务执行器SimpleAsyncTaskExecutor,使用该执行器我们也可以非常方便的设置使用虚拟线程执行(setVirtualThreads(true))。

  • 步骤(Step)中使用的资源(如数据源DataSource)可能存在并发限制。

  • IdRangePartitioner:用于为分区步骤创建输入参数的核心策略接口,输入参数形式为ExecutionContext实例。

  • JdbcPagingItemReader:此Bean使用分页方式读取数据,并根据范围接受最小值(minValue)和最大值(maxValue),获取该范围内的数据。同时我们设置每次读取1000条。

  • JdbcBatchItemWriter:此Bean将数据写入另一个表。

  • Step:这是批处理作业中配置的步骤,负责数据的读取和写入操作。

  • Job:表示批处理作业的批处理领域对象。

2.3 其它辅助类

User实体对象

public class User {  private Integer id ;  private String name ;  private Integer age ;  private String phone ;  private String sex ;  // getters, setters}

表数据到对象的映射Mapper类

public class UserRowMapper implements RowMapper<User> {  @Override  public User mapRow(ResultSet rs, int rowNum) throws SQLException {    User user = new User();    user.setId(rs.getInt("id"));    user.setName(rs.getString("name"));    user.setPhone(rs.getString("phone"));    user.setSex(rs.getString("sex"));    user.setAge(rs.getInt("age"));    return user;  }}

2.4 测试

@Componentpublic class TaskRunner implements CommandLineRunner {  private final JobLauncher jobLauncher;  private final Job job;  public TaskRunner(JobLauncher jobLauncher, Job job) {    this.jobLauncher = jobLauncher;    this.job = job;  }  @Override  public void run(String... args) throws Exception {    long start = System.currentTimeMillis() ;    JobParameters jobParameters = new JobParametersBuilder()        .addString("JobId", String.valueOf(System.currentTimeMillis()))        .toJobParameters();    JobExecution execution = jobLauncher.run(job, jobParameters);    System.err.println("STATUS :: " + execution.getStatus());  }}

启动服务后,控制台首先输出的是分区信息:

当完成以后输出结果:


以上是本篇文章的全部内容,如对你有帮助帮忙点赞+转发+收藏

推荐文章

Spring AI + RAG:网页智能解析,精准问答一键开启!

Spring Boot 这个接口非常强大

小心此Bug!Spring Boot 事务方法没有异常,为什么事务还是回滚了?

Spring Boot 拦截器与自定义注解的综合实践

3个经典案例,详解Spring Boot实时推送技术

关于Spring AOP的两个高级功能你用过吗?

Tika 与 Spring Boot 的完美结合:支持任意文档解析的神器

惊呆了!Controller接口返回值支持17种逆天类型

优雅!基于Spring Boot字段加密后的模糊查询,支持MyBatis, JPA

查漏补缺!OpenFeign整合Resilience4j,你真的会用吗?

API接口优化!基于Spring Boot 实现Deflate压缩技术

SpringBoot+Nginx+Lua接口性能提升N倍

图片
图片
图片
图片
图片
图片
图片
图片
图片

【声明】内容源于网络
0
0
Spring全家桶实战案例
Java全栈开发,前端Vue2/3全家桶;Spring, SpringBoot 2/3, Spring Cloud各种实战案例及源码解读
内容 832
粉丝 0
Spring全家桶实战案例 Java全栈开发,前端Vue2/3全家桶;Spring, SpringBoot 2/3, Spring Cloud各种实战案例及源码解读
总阅读285
粉丝0
内容832