大数跨境
0
0

自从用了SpringBatch,效率飙升500%!

自从用了SpringBatch,效率飙升500%! Java技术图谱
2025-12-12
0

先说结论:那个“效率飙升 500%”不是标题党,真的是从一晚处理 5 万条 → 稳定干到 30 万条以后,我才敢写这几个字的。

一、以前我是怎么“手搓批处理”的

场景特别典型,你肯定也见过那种需求:

  • 每晚跑一批对账数据
  • 把一堆 CSV / Excel 导进库
  • 按天清洗历史日志、同步第三方接口

我之前的做法很“朴素”——写个 @Scheduled,然后 while 循环一条条查、一条条处理、一条条插库,大概长这样:

@Scheduled(cron = "0 0 1 * * ?")
public void oldStyleJob() {
    int page = 0;
    int size = 1000;
    List<Order> list;
    do {
        list = orderDao.queryPage(page, size);
        for (Order order : list) {
            // 业务处理
            process(order);
            // 单条更新
            orderDao.update(order);
        }
        page++;
    } while (!list.isEmpty());
}

刚开始数据量不大,还挺丝滑。后来业务上了量:

  • 一次任务要处理十几万条
  • 中途报错了只能从头再来
  • 偶尔重启一下服务,整个任务直接废了

更要命的是,这段代码:

  1. 没有事务边界的概念:手动控制得很累
  2. 没有统一监控:到底跑了多少条,只能靠日志瞎 grep
  3. 扩展多线程?你得自己造一堆轮子

那次加班到凌晨三点,我一边看着日志一点点刷,一边在想:这玩意儿肯定有人替我造过轮子吧?

于是就去摸了 Spring Batch。

二、Spring Batch 到底解决了什么痛点

先别管概念,先看“换了之后”我得到了什么:

  • 处理逻辑被拆成 Job → Step → Reader/Processor/Writer,结构比之前那个大 while 清爽太多
  • 天然支持 chunk 批量处理,比如“每 100 条一起提交事务、一块写库”
  • 任务状态(跑到第几条、成败、参数)统统记录到数据库,失败了可以从断点继续
  • 套几行配置,直接上 多线程 / 分区并行

你可以把 Spring Batch 理解成:专门帮你干批处理活儿的一整套框架,你只需要填空题: “我从哪读数据 → 我要怎么处理 → 我往哪儿写”。

三、最小可用 Demo:从 CSV 导数据进 MySQL

先上代码,感受一下一个最基础的 Spring Batch Job 是啥样的。

1. 配置类长啥样

@Configuration
@EnableBatchProcessing
publicclass ImportJobConfig {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private DataSource dataSource;

    @Bean
    public Job importUserJob(Step importUserStep) {
        return jobBuilderFactory.get("importUserJob")
                .start(importUserStep)
                .build();
    }

    @Bean
    public Step importUserStep(ItemReader<User> reader,
                               ItemProcessor<User, User> processor,
                               ItemWriter<User> writer)
 
{
        return stepBuilderFactory.get("importUserStep")
                .<User, User>chunk(1000)           // 每 1000 条为一批,提交一次事务
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .build();
    }

    @Bean
    public FlatFileItemReader<User> reader() {
        FlatFileItemReader<User> reader = new FlatFileItemReader<>();
        reader.setResource(new FileSystemResource("/data/users.csv"));
        reader.setLinesToSkip(1); // 跳过表头

        DefaultLineMapper<User> lineMapper = new DefaultLineMapper<>();
        lineMapper.setLineTokenizer(new DelimitedLineTokenizer() {{
            setNames("id""name""age");
        }});
        lineMapper.setFieldSetMapper(fieldSet -> {
            User user = new User();
            user.setId(fieldSet.readLong("id"));
            user.setName(fieldSet.readString("name"));
            user.setAge(fieldSet.readInt("age"));
            return user;
        });

        reader.setLineMapper(lineMapper);
        return reader;
    }

    @Bean
    public ItemProcessor<User, User> processor() {
        return user -> {
            // 简单的业务规则:过滤掉未成年用户
            if (user.getAge() < 18) {
                returnnull// 返回 null 会被跳过
            }
            user.setName(user.getName().trim());
            return user;
        };
    }

    @Bean
    public JdbcBatchItemWriter<User> writer() {
        JdbcBatchItemWriter<User> writer = new JdbcBatchItemWriter<>();
        writer.setDataSource(dataSource);
        writer.setSql("INSERT INTO t_user (id, name, age) VALUES (:id, :name, :age)");
        writer.setItemSqlParameterSourceProvider(
                new BeanPropertyItemSqlParameterSourceProvider<>());
        return writer;
    }
}

对比一下以前那个 while 循环,这里几件事特别关键:

  • chunk(1000):框架帮你做了“分页 + 事务控制”,一批 1000 条,处理完一起提交
  • reader / processor / writer 各司其职,逻辑非常清晰
  • JdbcBatchItemWriter 默认就是批量写数据库,不是你那种一条条 update

光是这个最基础的 Job,在我们当时的场景里,从 一晚 8 万条 → 轻松 20 万条,什么都没加,只是换了写法。

四、效率真的从哪儿来的?

“500%”这种数字听起来很玄乎,拆开来说其实就三个点:

1. 真正的批量写入

以前我是一条条 update,哪怕你在 JDBC 层面手动 addBatch,也很容易写成这样:

for (Order order : list) {
    preparedStatement.setLong(1, order.getId());
    // ...
    preparedStatement.addBatch();
    preparedStatement.executeBatch(); // 写错地方了…
}

而 Spring Batch 的 JdbcBatchItemWriter,默认就是按 chunk 帮你分批:

  • 你在 writer 里写一条 SQL 模板
  • 它会帮你把这一批 N 条数据拼成一组 batch 发给数据库
  • 再配合 DB 端合理的参数(如 MySQL 的 rewriteBatchedStatements=true),写入效率提升非常明显

2. 少了很多“无效重跑”

以前任务半路挂了,只能重头再来。 现在 Job / Step 状态会落库,失败了可以从上次停的 Step 继续,甚至可以做到从读到第几条记录的位置继续(用数据库或状态表配合一下)。

Spring Batch 自己会维护一张表 BATCH_JOB_EXECUTIONBATCH_STEP_EXECUTION 之类的,把每次执行的参数、起止时间、状态都记下来,你不用自己琢磨“我到底跑到哪儿了”。

3. 多线程 + 分区并行

单线程 chunk 跑到瓶颈之后,我没改一行业务代码,只是给 Step 加了个 taskExecutor,性能又上了一个台阶。

@Bean
public Step importUserStep(ItemReader<User> reader,
                           ItemProcessor<User, User> processor,
                           ItemWriter<User> writer)
 
{
    return stepBuilderFactory.get("importUserStep")
            .<User, User>chunk(1000)
            .reader(reader)
            .processor(processor)
            .writer(writer)
            .taskExecutor(taskExecutor())   // 开启多线程
            .throttleLimit(8)               // 最多并发 8 个线程
            .build();
}

@Bean
public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(8);
    executor.setMaxPoolSize(16);
    executor.setQueueCapacity(100);
    executor.setThreadNamePrefix("import-user-");
    executor.initialize();
    return executor;
}

如果数据本身可以按某个 key 分片(比如按日期、分库分表等),还可以用 Partitioner 做多分区并行,比单 Step 多线程又高一个档次。

这些累加起来,把原来 4~5 小时的任务压到不到 1 小时,其实就很正常了。

五、和业务结合一下:一个“对账任务”改造实战

举个我们真实改造过的例子,大致流程:

  1. 从第三方拉取前一天的订单流水(HTTP 接口)
  2. 和我们自家订单做字段比对
  3. 生成对账结果,写入结果表 & 导出 CSV 给财务

改造前代码是这样:

  • 定时任务里自己分页调接口
  • 每页数据拉回来,循环比对,单条插入结果表
  • 任何地方抛错,一般都是整批重跑

改造后拆成了几个 Step:

  1. fetchRemoteStep:先把第三方数据全量拉到临时表(这个 Step 也用 Batch 来做)
  2. compareStep:以我们本地订单为 Reader,临时表为 join,对比生成结果
  3. exportStep:再从结果表读,写到 CSV

核心 Step 的配置有点像这样(简化版):

@Bean
public Step compareStep(ItemReader<LocalOrder> localOrderReader,
                        ItemProcessor<LocalOrder, CompareResult> compareProcessor,
                        ItemWriter<CompareResult> compareWriter)
 
{
    return stepBuilderFactory.get("compareStep")
            .<LocalOrder, CompareResult>chunk(500)
            .reader(localOrderReader)
            .processor(compareProcessor)
            .writer(compareWriter)
            .faultTolerant()
            .skipLimit(100)            // 最多允许 100 条异常数据
            .skip(Exception.class)     // 捕获异常,跳过问题记录
            .build()
;
}

这里几点体验特别好:

  • faultTolerant().skip(...) 把“坏数据”过滤掉,整批任务不会因为个别脏数据中断
  • 每个 Step 的执行时间、处理条数,都能在 Batch 的元数据表里看到
  • 真要排查问题,反查那几条被 skip 的记录就行,不用翻几万行日志

这类“每天一次”的对账任务,之前偶尔会跑到第二天早上还没结束,现在稳定在几十分钟,偶尔失败也能快速恢复。

六、Spring Batch 不是什么时候都适合上

吹了半天,也得说些冷静的话,不然你到处乱上。

有几个场景,我自己现在是不会用 Spring Batch 的:

  1. 实时接口:比如下单接口里顺手改几条记录,这种直接事务 + Service 就行了,上 Batch 纯属折腾
  2. 数据量不大、逻辑又简单:比如每天处理几百条、简单更新状态,手写个定时任务足够
  3. 你团队没人维护这块:Batch 一旦用起来,后面人得看得懂 Job/Step 这些概念,否则 Debug 成本会上升

但只要满足这三点,我现在都会优先考虑 Spring Batch:

  • 明显的“批处理”特征(按天、按小时、大批量)
  • 处理过程比较复杂:校验、清洗、多表写入,还可能要重试
  • 希望有 可观测性:能看到跑到哪儿、成败记录、重跑能力

如果你现在正好也有那种“每天跑一大坨脚本”的批处理任务,老老实实抽半天时间把主流程迁到 Spring Batch 上,基本上不会亏。剩下那些“500%”到底能不能达到,就看你敢不敢顺手再开两档多线程了。😆

-END-

我为大家打造了一份RPA教程,完全免费:songshuhezi.com/rpa.html


🔥东哥私藏精品🔥


东哥作为一名老码农,整理了全网最全《Java高级架构师资料合集》。总量高达650GB

【声明】内容源于网络
0
0
Java技术图谱
回复 java,领取Java面试题。分享AI编程,AI工具,Java教程,Java下载,Java技术栈,Java源码,Java课程,Java技术架构,Java基础教程,Java高级教程,idea教程,Java架构师,Java微服务架构。
内容 1111
粉丝 0
Java技术图谱 回复 java,领取Java面试题。分享AI编程,AI工具,Java教程,Java下载,Java技术栈,Java源码,Java课程,Java技术架构,Java基础教程,Java高级教程,idea教程,Java架构师,Java微服务架构。
总阅读69
粉丝0
内容1.1k