
本文介绍如何使用 kafka streams 将一个包含多个键和多个值的列表结构,逐对展开为独立的键值对,并分别发送到指定 kafka topic,适用于 avro 序列化场景。
在 Kafka Streams 中,当输入流的每条记录携带的是 List
此时,自定义 Processor(或 Transformer/ProcessorSupplier)是推荐且最灵活的解决方案。它允许你在处理每条输入记录时,显式控制转发逻辑,包括多次调用 context.forward() 发送多条输出消息。
✅ 推荐实现方式(Kafka Streams ≥ 3.0)
使用 process()(替代已弃用的 transform())配合 ProcessorSupplier:
stream.process(
() -> new KeyValueExpandingProcessor<>(),
Named.as("expand-key-value-lists"),
"out-topic"
);其中 KeyValueExpandingProcessor 实现如下(泛型适配 Avro 类型,如 SpecificRecord):
public class KeyValueExpandingProcessorimplements Processor { private ProcessorContext context; @Override public void init(ProcessorContext context) { this.context = context; } @Override public void process(Record record) { K inputKey = record.key(); V inputValue = record.value(); // 假设工具类可从 inputKey/inputValue 中提取对应列表(注意:实际中 key 可能不参与解析) List keys = util.fetchKeys(inputKey, inputValue); // 或仅基于 inputValue List values = util.fetchValues(inputValue); // 安全校验:长度一致,避免 IndexOutOfBoundsException int size = Math.min(keys.size(), values.size()); for (int i = 0; i < size; i++) { context.forward( Record. create( "out-topic", // target topic(可选,若使用 to() 则无需指定) keys.get(i), values.get(i), record.timestamp() ) ); } } @Override public void close() {} }
? 关键说明: context.forward() 在 process() 中可被调用多次,每次生成一条独立输出记录; 输出的 Key 和 Value 类型需与配置的 keySerde 和 valueSerde 兼容(如 SpecificAvroSerde / SpecificAvroSerde); 若使用 to("out-topic", keySerde, valueSerde),则 process() 内部无需指定 topic,只需 forward() 即可,最终由 .to() 统一落库; Kafka Streams 会自动保证状态一致性与恰好一次语义(EOS),前提是启用了 processing.guarantee=exactly_once_v2。
⚠️ 注意事项
- ❌ 避免在 mapValues() 或 flatMapValues() 中尝试“返回多个值”——这些算子设计为一对一或一对多 值变换,但不支持修改 key 或产生多条带不同 key 的记录;
- ✅ process() 是底层 Processor API,赋予你完全控制权,适合此类“解包+多路转发”场景;
- ? 若 util.fetchKeys()/fetchValues() 依赖外部状态(如查表),建议在 init() 中初始化客户端,并在 close() 中释放资源;
- ? 测试建议:使用 TopologyTestDriver 构造输入 ConsumerRecord,验证 Processor 是否按预期转发了 N 条 ProducerRecord。
✅ 总结
将列表型键值对展开为独立 Kafka 消息的核心在于脱离声明式 DSL,进入命令式 Processor 层。通过 process() + 自定义 Processor,你可以安全、可控、高效地完成多对一 → 一对多的拓扑转换,同时无缝兼容 Avro 序列化与 Kafka Streams 的容错机制。这是处理复杂消息结构(如嵌套数组、批量解析结果)的标准实践。











