
在 Spring Batch 中,ItemProcessor 应始终返回单个处理后的对象,而非累积列表;跨 chunk 的状态共享(如类级别 List)会导致数据污染,正确做法是依赖框架自动聚合的 Chunk,由 ItemWriter 接收完整批次。
在 spring batch 中,`itemprocessor` 应始终返回单个处理后的对象,而非累积列表;跨 chunk 的状态共享(如类级别 `list`)会导致数据污染,正确做法是依赖框架自动聚合的 `chunk
Spring Batch 的核心设计原则之一是无状态、可重入、chunk 级隔离。当您将 ItemProcessor 实现为维护一个类级别 List
根本原因在于对 Spring Batch 数据流的误解。框架的执行流程如下:
- ItemReader 每次读取一个 InputObject;
- 该对象被单独传入 ItemProcessor.process(),预期返回单个转换/增强后的对象(如 ProcessedObject);
- Spring Batch 内部自动将 process() 的返回值收集进一个临时 Chunk
,直到达到配置的 chunk size(如 15)或数据源耗尽; - 最终,整个 Chunk(即 List
)作为单一参数传递给 ItemWriter.write()。
因此,您的原始实现:
@Override
public List<ProcessedObject> process(InputObject input) throws Exception {
// ... 构造 o
processedList.add(o); // ❌ 错误:跨 chunk 累积
return processedList; // ❌ 错误:返回 List<List<ProcessedObject>> 给 writer
}会导致 ItemWriter.write() 接收到形如 [[o1,o2,...,o15], [o16,o17,...,o30], ...] 的嵌套结构(即 List>),而非预期的扁平 List
✅ 正确做法是让 ItemProcessor 严格保持无状态、单入单出:
@Override
public ProcessedObject process(InputObject input) throws Exception {
ProcessedObject o = new ProcessedObject();
try {
// 设置业务字段、转换逻辑等
o.setId(input.getId());
o.setProcessedTime(Instant.now());
o.setStatus("SUCCESS");
} catch (Exception ex) {
o.setErrorFlag(true);
o.setErrorMessage(ex.getMessage());
// 注意:仍返回对象,便于 writer 统一处理错误项
}
return o; // ✅ 返回单个对象,交由框架聚合
}相应地,ItemWriter 可安全假设其 write(List extends ProcessedObject> items) 参数即为当前 chunk 的全部处理结果:
@Override
public void write(List<? extends ProcessedObject> items) throws Exception {
// items.size() == 15(或最后一次可能更少)
Map<String, List<ProcessedObject>> grouped = items.stream()
.collect(Collectors.groupingBy(o ->
o.isErrorFlag() ? "error" : "success"
));
// 分别写入 success.csv 和 error.csv
writeToFile(grouped.getOrDefault("success", Collections.emptyList()), "success.csv");
writeToFile(grouped.getOrDefault("error", Collections.emptyList()), "error.csv");
}⚠️ 关键注意事项:
- 禁止在 @Component 或 @Service 标注的 processor 中使用实例变量存储跨调用状态;若需临时上下文(如计数器、缓存),应使用 StepExecution 的 ExecutionContext(线程安全且作用域明确);
- 若业务强依赖 chunk 内部状态(如累计统计),应在 ItemWriter 中处理,因其天然接收完整 chunk;
- 所有 ItemProcessor 实现类应是无状态的(stateless),确保可被 Spring 容器单例复用,且支持并行处理(TaskExecutor);
- 单元测试时,可直接调用 processor.process(input) 验证单对象行为,无需模拟 chunk 流程。
总结:Spring Batch 的 ItemProcessor 是“逐项映射器”,不是“批量收集器”。放弃手动维护列表,拥抱框架内置的 chunk 聚合机制,是写出健壮、可维护、符合批处理范式的代码的前提。









