0

0

如何将键值列表拆分为独立的 Kafka 消息并写入 Topic

碧海醫心

碧海醫心

发布时间:2026-01-20 20:33:08

|

853人浏览过

|

来源于php中文网

原创

如何将键值列表拆分为独立的 Kafka 消息并写入 Topic

本文介绍在 kafka streams 中,如何将包含多个键和值的 list 结构(如 `list` 和 `list`)逐对展开为独立的 `(k, v)` 消息,并分别序列化后写入目标 topic。核心方案是使用 `process()`(v3.0+)或 `transform()`(旧版)自定义处理器实现流式扁平化。

在 Kafka Streams 应用中,当上游数据以批量形式组织(例如一个事件携带 List keys 和 List values),而下游消费者期望接收单键单值的原子消息时,必须对数据流进行「扁平化」(flattening)。原代码中直接调用 selectKey() 和 mapValues() 仅能替换或转换当前记录的键/值,无法生成多条新记录——这是 Kafka Streams 的“一进一出”语义限制。

✅ 正确做法是使用有状态处理算子:process()(推荐,Kafka Streams ≥ 3.0)transform()(旧版),它们允许在 ProcessorContext 中多次调用 context.forward(),从而将单条输入记录映射为多条输出记录。

以下是以 Kafka Streams 3.4+ 为例的完整实现:

Text-To-Song
Text-To-Song

免费的实时语音转换器和调制器

下载
// 定义 ProcessorSupplier(推荐使用 lambda + anonymous class 简化)
stream.process(
    () -> new Processor<String, GenericRecord, String, GenericRecord>() {
        private ProcessorContext<String, GenericRecord> context;

        @Override
        public void init(ProcessorContext<String, GenericRecord> context) {
            this.context = context;
        }

        @Override
        public void process(String key, GenericRecord value) {
            // 假设 util.fetchKeys/fetchValues 接收原始 value 并返回对应列表
            List<String> keys   = util.fetchKeys(key, value);   // 或仅传 value,依业务而定
            List<GenericRecord> values = util.fetchValues(value);

            // 安全校验:确保长度一致,避免 IndexOutOfBoundsException
            int size = Math.min(keys.size(), values.size());
            for (int i = 0; i < size; i++) {
                context.forward(
                    keys.get(i),
                    values.get(i),
                    To.all().withTimestamp(context.timestamp()) // 可选:继承原始时间戳
                );
            }
        }
    },
    Named.as("flatten-processor")
).to("out-topic", 
    Produced.with(Serdes.String(), yourAvroValueSerde) // keySerde 与 valueSerde 需匹配实际类型
);

⚠️ 注意事项:

  • 序列化器一致性:Produced.with(...) 中指定的 keySerde 和 valueSerde 必须与 context.forward() 所传对象的实际类型严格匹配(如 String 键配 Serdes.String(),Avro GenericRecord 值配对应的 SpecificAvroSerde 或自定义 Avro Serde)。
  • 空值/长度不匹配防护:务必校验 keys 和 values 列表非空且长度兼容,否则可能抛出 IndexOutOfBoundsException 或静默丢弃数据。
  • 时间戳处理:默认 forward() 使用系统当前时间,若需保留原始事件时间戳,请显式调用 To.all().withTimestamp(context.timestamp())。
  • 状态与容错:该 Processor 无本地状态,因此无需注册 StateStore;若后续需聚合或去重,可扩展为 Transformer 并启用 RocksDB 存储。

? 总结:Kafka Streams 不支持开箱即用的“一对多”映射,但通过 process() 自定义处理器可精准控制每条输入记录产生的输出数量与内容。这是处理嵌套结构、批量解包、协议转换等场景的标准实践。

相关文章

Kafka Eagle可视化工具
Kafka Eagle可视化工具

Kafka Eagle是一款结合了目前大数据Kafka监控工具的特点,重新研发的一块开源免费的Kafka集群优秀的监控工具。它可以非常方便的监控生产环境中的offset、lag变化、partition分布、owner等,有需要的小伙伴快来保存下载体验吧!

下载

本站声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
kafka消费者组有什么作用
kafka消费者组有什么作用

kafka消费者组的作用:1、负载均衡;2、容错性;3、广播模式;4、灵活性;5、自动故障转移和领导者选举;6、动态扩展性;7、顺序保证;8、数据压缩;9、事务性支持。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

175

2024.01.12

kafka消费组的作用是什么
kafka消费组的作用是什么

kafka消费组的作用:1、负载均衡;2、容错性;3、灵活性;4、高可用性;5、扩展性;6、顺序保证;7、数据压缩;8、事务性支持。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

159

2024.02.23

rabbitmq和kafka有什么区别
rabbitmq和kafka有什么区别

rabbitmq和kafka的区别:1、语言与平台;2、消息传递模型;3、可靠性;4、性能与吞吐量;5、集群与负载均衡;6、消费模型;7、用途与场景;8、社区与生态系统;9、监控与管理;10、其他特性。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

207

2024.02.23

Java 流式处理与 Apache Kafka 实战
Java 流式处理与 Apache Kafka 实战

本专题专注讲解 Java 在流式数据处理与消息队列系统中的应用,系统讲解 Apache Kafka 的基础概念、生产者与消费者模型、Kafka Streams 与 KSQL 流式处理框架、实时数据分析与监控,结合实际业务场景,帮助开发者构建 高吞吐量、低延迟的实时数据流管道,实现高效的数据流转与处理。

167

2026.02.04

string转int
string转int

在编程中,我们经常会遇到需要将字符串(str)转换为整数(int)的情况。这可能是因为我们需要对字符串进行数值计算,或者需要将用户输入的字符串转换为整数进行处理。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

1010

2023.08.02

Go高并发任务调度与Goroutine池化实践
Go高并发任务调度与Goroutine池化实践

本专题围绕 Go 语言在高并发任务处理场景中的实践展开,系统讲解 Goroutine 调度模型、Channel 通信机制以及并发控制策略。内容包括任务队列设计、Goroutine 池化管理、资源限制控制以及并发任务的性能优化方法。通过实际案例演示,帮助开发者构建稳定高效的 Go 并发任务处理系统,提高系统在高负载环境下的处理能力与稳定性。

22

2026.03.10

Kotlin Android模块化架构与组件化开发实践
Kotlin Android模块化架构与组件化开发实践

本专题围绕 Kotlin 在 Android 应用开发中的架构实践展开,重点讲解模块化设计与组件化开发的实现思路。内容包括项目模块拆分策略、公共组件封装、依赖管理优化、路由通信机制以及大型项目的工程化管理方法。通过真实项目案例分析,帮助开发者构建结构清晰、易扩展且维护成本低的 Android 应用架构体系,提升团队协作效率与项目迭代速度。

48

2026.03.09

JavaScript浏览器渲染机制与前端性能优化实践
JavaScript浏览器渲染机制与前端性能优化实践

本专题围绕 JavaScript 在浏览器中的执行与渲染机制展开,系统讲解 DOM 构建、CSSOM 解析、重排与重绘原理,以及关键渲染路径优化方法。内容涵盖事件循环机制、异步任务调度、资源加载优化、代码拆分与懒加载等性能优化策略。通过真实前端项目案例,帮助开发者理解浏览器底层工作原理,并掌握提升网页加载速度与交互体验的实用技巧。

93

2026.03.06

Rust内存安全机制与所有权模型深度实践
Rust内存安全机制与所有权模型深度实践

本专题围绕 Rust 语言核心特性展开,深入讲解所有权机制、借用规则、生命周期管理以及智能指针等关键概念。通过系统级开发案例,分析内存安全保障原理与零成本抽象优势,并结合并发场景讲解 Send 与 Sync 特性实现机制。帮助开发者真正理解 Rust 的设计哲学,掌握在高性能与安全性并重场景中的工程实践能力。

216

2026.03.05

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Django 教程
Django 教程

共28课时 | 4.9万人学习

SciPy 教程
SciPy 教程

共10课时 | 1.9万人学习

Sass 教程
Sass 教程

共14课时 | 0.9万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号