先说结论:那个“效率飙升 500%”不是标题党,真的是从一晚处理 5 万条 → 稳定干到 30 万条以后,我才敢写这几个字的。
一、以前我是怎么“手搓批处理”的
场景特别典型,你肯定也见过那种需求:
-
每晚跑一批对账数据 -
把一堆 CSV / Excel 导进库 -
按天清洗历史日志、同步第三方接口
我之前的做法很“朴素”——写个 @Scheduled,然后 while 循环一条条查、一条条处理、一条条插库,大概长这样:
@Scheduled(cron = "0 0 1 * * ?")
public void oldStyleJob() {
int page = 0;
int size = 1000;
List<Order> list;
do {
list = orderDao.queryPage(page, size);
for (Order order : list) {
// 业务处理
process(order);
// 单条更新
orderDao.update(order);
}
page++;
} while (!list.isEmpty());
}
刚开始数据量不大,还挺丝滑。后来业务上了量:
-
一次任务要处理十几万条 -
中途报错了只能从头再来 -
偶尔重启一下服务,整个任务直接废了
更要命的是,这段代码:
-
没有事务边界的概念:手动控制得很累 -
没有统一监控:到底跑了多少条,只能靠日志瞎 grep -
扩展多线程?你得自己造一堆轮子
那次加班到凌晨三点,我一边看着日志一点点刷,一边在想:这玩意儿肯定有人替我造过轮子吧?
于是就去摸了 Spring Batch。
二、Spring Batch 到底解决了什么痛点
先别管概念,先看“换了之后”我得到了什么:
-
处理逻辑被拆成 Job → Step → Reader/Processor/Writer,结构比之前那个大 while 清爽太多 -
天然支持 chunk 批量处理,比如“每 100 条一起提交事务、一块写库” -
任务状态(跑到第几条、成败、参数)统统记录到数据库,失败了可以从断点继续 -
套几行配置,直接上 多线程 / 分区并行
你可以把 Spring Batch 理解成:专门帮你干批处理活儿的一整套框架,你只需要填空题: “我从哪读数据 → 我要怎么处理 → 我往哪儿写”。
三、最小可用 Demo:从 CSV 导数据进 MySQL
先上代码,感受一下一个最基础的 Spring Batch Job 是啥样的。
1. 配置类长啥样
@Configuration
@EnableBatchProcessing
publicclass ImportJobConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
@Bean
public Job importUserJob(Step importUserStep) {
return jobBuilderFactory.get("importUserJob")
.start(importUserStep)
.build();
}
@Bean
public Step importUserStep(ItemReader<User> reader,
ItemProcessor<User, User> processor,
ItemWriter<User> writer) {
return stepBuilderFactory.get("importUserStep")
.<User, User>chunk(1000) // 每 1000 条为一批,提交一次事务
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
@Bean
public FlatFileItemReader<User> reader() {
FlatFileItemReader<User> reader = new FlatFileItemReader<>();
reader.setResource(new FileSystemResource("/data/users.csv"));
reader.setLinesToSkip(1); // 跳过表头
DefaultLineMapper<User> lineMapper = new DefaultLineMapper<>();
lineMapper.setLineTokenizer(new DelimitedLineTokenizer() {{
setNames("id", "name", "age");
}});
lineMapper.setFieldSetMapper(fieldSet -> {
User user = new User();
user.setId(fieldSet.readLong("id"));
user.setName(fieldSet.readString("name"));
user.setAge(fieldSet.readInt("age"));
return user;
});
reader.setLineMapper(lineMapper);
return reader;
}
@Bean
public ItemProcessor<User, User> processor() {
return user -> {
// 简单的业务规则:过滤掉未成年用户
if (user.getAge() < 18) {
returnnull; // 返回 null 会被跳过
}
user.setName(user.getName().trim());
return user;
};
}
@Bean
public JdbcBatchItemWriter<User> writer() {
JdbcBatchItemWriter<User> writer = new JdbcBatchItemWriter<>();
writer.setDataSource(dataSource);
writer.setSql("INSERT INTO t_user (id, name, age) VALUES (:id, :name, :age)");
writer.setItemSqlParameterSourceProvider(
new BeanPropertyItemSqlParameterSourceProvider<>());
return writer;
}
}
对比一下以前那个 while 循环,这里几件事特别关键:
-
chunk(1000):框架帮你做了“分页 + 事务控制”,一批 1000 条,处理完一起提交 -
reader / processor / writer各司其职,逻辑非常清晰 -
JdbcBatchItemWriter默认就是批量写数据库,不是你那种一条条update
光是这个最基础的 Job,在我们当时的场景里,从 一晚 8 万条 → 轻松 20 万条,什么都没加,只是换了写法。
四、效率真的从哪儿来的?
“500%”这种数字听起来很玄乎,拆开来说其实就三个点:
1. 真正的批量写入
以前我是一条条 update,哪怕你在 JDBC 层面手动 addBatch,也很容易写成这样:
for (Order order : list) {
preparedStatement.setLong(1, order.getId());
// ...
preparedStatement.addBatch();
preparedStatement.executeBatch(); // 写错地方了…
}
而 Spring Batch 的 JdbcBatchItemWriter,默认就是按 chunk 帮你分批:
-
你在 writer里写一条 SQL 模板 -
它会帮你把这一批 N 条数据拼成一组 batch 发给数据库 -
再配合 DB 端合理的参数(如 MySQL 的 rewriteBatchedStatements=true),写入效率提升非常明显
2. 少了很多“无效重跑”
以前任务半路挂了,只能重头再来。 现在 Job / Step 状态会落库,失败了可以从上次停的 Step 继续,甚至可以做到从读到第几条记录的位置继续(用数据库或状态表配合一下)。
Spring Batch 自己会维护一张表 BATCH_JOB_EXECUTION、BATCH_STEP_EXECUTION 之类的,把每次执行的参数、起止时间、状态都记下来,你不用自己琢磨“我到底跑到哪儿了”。
3. 多线程 + 分区并行
单线程 chunk 跑到瓶颈之后,我没改一行业务代码,只是给 Step 加了个 taskExecutor,性能又上了一个台阶。
@Bean
public Step importUserStep(ItemReader<User> reader,
ItemProcessor<User, User> processor,
ItemWriter<User> writer) {
return stepBuilderFactory.get("importUserStep")
.<User, User>chunk(1000)
.reader(reader)
.processor(processor)
.writer(writer)
.taskExecutor(taskExecutor()) // 开启多线程
.throttleLimit(8) // 最多并发 8 个线程
.build();
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(8);
executor.setMaxPoolSize(16);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("import-user-");
executor.initialize();
return executor;
}
如果数据本身可以按某个 key 分片(比如按日期、分库分表等),还可以用 Partitioner 做多分区并行,比单 Step 多线程又高一个档次。
这些累加起来,把原来 4~5 小时的任务压到不到 1 小时,其实就很正常了。
五、和业务结合一下:一个“对账任务”改造实战
举个我们真实改造过的例子,大致流程:
-
从第三方拉取前一天的订单流水(HTTP 接口) -
和我们自家订单做字段比对 -
生成对账结果,写入结果表 & 导出 CSV 给财务
改造前代码是这样:
-
定时任务里自己分页调接口 -
每页数据拉回来,循环比对,单条插入结果表 -
任何地方抛错,一般都是整批重跑
改造后拆成了几个 Step:
-
fetchRemoteStep:先把第三方数据全量拉到临时表(这个 Step 也用 Batch 来做) -
compareStep:以我们本地订单为 Reader,临时表为 join,对比生成结果 -
exportStep:再从结果表读,写到 CSV
核心 Step 的配置有点像这样(简化版):
@Bean
public Step compareStep(ItemReader<LocalOrder> localOrderReader,
ItemProcessor<LocalOrder, CompareResult> compareProcessor,
ItemWriter<CompareResult> compareWriter) {
return stepBuilderFactory.get("compareStep")
.<LocalOrder, CompareResult>chunk(500)
.reader(localOrderReader)
.processor(compareProcessor)
.writer(compareWriter)
.faultTolerant()
.skipLimit(100) // 最多允许 100 条异常数据
.skip(Exception.class) // 捕获异常,跳过问题记录
.build();
}
这里几点体验特别好:
-
faultTolerant().skip(...)把“坏数据”过滤掉,整批任务不会因为个别脏数据中断 -
每个 Step 的执行时间、处理条数,都能在 Batch 的元数据表里看到 -
真要排查问题,反查那几条被 skip 的记录就行,不用翻几万行日志
这类“每天一次”的对账任务,之前偶尔会跑到第二天早上还没结束,现在稳定在几十分钟,偶尔失败也能快速恢复。
六、Spring Batch 不是什么时候都适合上
吹了半天,也得说些冷静的话,不然你到处乱上。
有几个场景,我自己现在是不会用 Spring Batch 的:
-
实时接口:比如下单接口里顺手改几条记录,这种直接事务 + Service 就行了,上 Batch 纯属折腾 -
数据量不大、逻辑又简单:比如每天处理几百条、简单更新状态,手写个定时任务足够 -
你团队没人维护这块:Batch 一旦用起来,后面人得看得懂 Job/Step 这些概念,否则 Debug 成本会上升
但只要满足这三点,我现在都会优先考虑 Spring Batch:
-
明显的“批处理”特征(按天、按小时、大批量) -
处理过程比较复杂:校验、清洗、多表写入,还可能要重试 -
希望有 可观测性:能看到跑到哪儿、成败记录、重跑能力
如果你现在正好也有那种“每天跑一大坨脚本”的批处理任务,老老实实抽半天时间把主流程迁到 Spring Batch 上,基本上不会亏。剩下那些“500%”到底能不能达到,就看你敢不敢顺手再开两档多线程了。😆
-END-
我为大家打造了一份RPA教程,完全免费:songshuhezi.com/rpa.html
🔥东哥私藏精品🔥
东哥作为一名老码农,整理了全网最全《Java高级架构师资料合集》。总量高达650GB。

