0

0

在 Apache Flink 中消费带键 Kafka 记录的实践教程

心靈之曲

心靈之曲

发布时间:2025-11-05 17:43:14

|

900人浏览过

|

来源于php中文网

原创

在 Apache Flink 中消费带键 Kafka 记录的实践教程

本教程旨在指导您如何在 apache flink 中高效消费带有键的 kafka 记录。文章详细介绍了使用自定义 `kafkarecorddeserializationschema` 来解析 kafka `consumerrecord` 中的键、值、时间戳等信息,并提供了完整的 flink 应用程序代码示例。通过遵循本文的步骤,您可以轻松地构建能够处理复杂 kafka 消息结构的 flink 流处理应用。

1. 理解带键 Kafka 记录及其重要性

在 Kafka 中,消息(记录)通常包含一个可选的键(Key)和一个值(Value)。键在许多场景下都至关重要,例如:

  • 消息顺序保证:同一个键的所有消息会被发送到同一个分区,从而保证了这些消息的消费顺序。
  • 状态管理:在 Flink 等流处理框架中,键是进行有状态操作(如聚合、连接)的基础。
  • 数据路由:消费者可以根据键来过滤或路由消息。

当使用 kafka-console-producer.sh 并指定 --property "parse.key=true" --property "key.separator=:" 时,生产者会从输入中解析出键和值,并将它们作为独立的字段发送到 Kafka。例如,myKey:myValue 会被解析为键 myKey 和值 myValue。

2. Flink KafkaSource 的默认行为与限制

Apache Flink 提供了 KafkaSource 作为消费 Kafka 数据的首选连接器。然而,当您使用 KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class) 这样的默认配置时,KafkaSource 仅会反序列化 Kafka 记录的值部分,而忽略其键、时间戳、分区、偏移量以及头部信息。这对于只需要处理消息值的场景是足够的,但对于需要访问键或其它元数据的应用来说,这种方式就显得力不从心。

以下是仅读取非带键记录的示例代码,它无法获取 Kafka 记录的键:

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.kafka.common.serialization.StringDeserializer;

public class FlinkValueOnlyKafkaConsumer {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        String bootstrapServers = "localhost:9092"; // 替换为您的Kafka地址

        KafkaSource source = KafkaSource.builder()
                .setBootstrapServers(bootstrapServers)
                .setTopics("test3")
                .setGroupId("1")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
                .build();

        DataStream stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        stream.map((MapFunction) value -> "Receiving from Kafka : " + value).print();

        env.execute("Flink Value-Only Kafka Consumer");
    }
}

3. 自定义 KafkaRecordDeserializationSchema 读取带键记录

要从 Kafka 记录中获取键、值、时间戳等所有信息,您需要实现一个自定义的 KafkaRecordDeserializationSchema。这个接口的 deserialize 方法会接收一个 ConsumerRecord 对象,该对象提供了对原始字节形式的键、值、时间戳、分区、偏移量以及头部信息的完全访问。

3.1 定义自定义反序列化器

首先,创建一个实现 KafkaRecordDeserializationSchema 接口的类。在这个示例中,我们将反序列化键和值都为 String 类型,并将它们与时间戳一起封装到一个 Tuple3 对象中输出。

知元AI
知元AI

AI智能语音聊天 对讲问答 AI绘画 AI写作 AI创作助手工具

下载
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;

import java.io.IOException;

/**
 * 自定义 Kafka 记录反序列化器,用于解析键、值和时间戳。
 * 输出类型为 Tuple3
 */
public class KeyedKafkaRecordDeserializationSchema implements KafkaRecordDeserializationSchema> {

    // transient 关键字确保这些反序列化器不会被 Flink 的序列化机制尝试序列化
    private transient StringDeserializer keyDeserializer;
    private transient StringDeserializer valueDeserializer;

    /**
     * 在反序列化器初始化时调用,用于设置内部状态。
     * 通常在这里初始化 Kafka 客户端的反序列化器。
     */
    @Override
    public void open(DeserializationSchema.InitializationContext context) throws Exception {
        // 根据 Kafka 生产者实际使用的序列化器来选择这里的反序列化器
        // 假设键和值都是字符串,使用 StringDeserializer
        keyDeserializer = new StringDeserializer();
        valueDeserializer = new StringDeserializer();
    }

    /**
     * 核心反序列化逻辑。
     *
     * @param record Kafka 原始的 ConsumerRecord 对象,包含字节数组形式的键和值。
     * @param out    用于收集反序列化结果的 Collector。
     * @throws IOException 如果反序列化过程中发生 I/O 错误。
     */
    @Override
    public void deserialize(ConsumerRecord record, Collector> out) throws IOException {
        // 反序列化键
        String key = (record.key() != null) ? keyDeserializer.deserialize(record.topic(), record.key()) : null;
        // 反序列化值
        String value = (record.value() != null) ? valueDeserializer.deserialize(record.topic(), record.value()) : null;
        // 获取时间戳
        long timestamp = record.timestamp();

        // 将反序列化后的键、值和时间戳封装成 Tuple3 并发出
        out.collect(new Tuple3<>(key, value, timestamp));
    }

    /**
     * 返回此反序列化器生产的数据类型信息。
     * Flink 使用此信息进行类型检查和序列化。
     */
    @Override
    public TypeInformation> getProducedType() {
        // 使用 TypeHint 来获取泛型类型信息
        return TypeInformation.of(new org.apache.flink.api.java.typeutils.TypeHint>() {});
    }
}

注意事项:

  • open 方法:在反序列化器首次使用时调用,用于初始化资源。将 Kafka 客户端的反序列化器(如 StringDeserializer)放在这里初始化可以避免在每次 deserialize 调用时重复创建对象,提高效率。
  • deserialize 方法:这是核心逻辑所在。ConsumerRecord 提供了 key()、value()、timestamp()、topic()、partition()、offset() 和 headers() 等方法。您可以使用 Kafka 客户端提供的反序列化器(例如 StringDeserializer、LongDeserializer 或自定义的 Avro/Protobuf 反序列化器)来将 byte[] 转换为实际的数据类型。
  • getProducedType 方法:必须返回此反序列化器将发出的数据流的 TypeInformation。这对于 Flink 的类型系统至关重要。

3.2 在 Flink KafkaSource 中使用自定义反序列化器

接下来,将我们自定义的 KeyedKafkaRecordDeserializationSchema 应用到 KafkaSource 中:

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;

public class FlinkKeyedKafkaConsumer {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        String bootstrapServers = "localhost:9092"; // 替换为您的 Kafka 地址
        String topic = "test3";
        String groupId = "1";

        // 构建 KafkaSource,并指定我们自定义的反序列化器
        KafkaSource> source = KafkaSource.>builder()
                .setBootstrapServers(bootstrapServers)
                .setTopics(topic)
                .setGroupId(groupId)
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setDeserializer(new KeyedKafkaRecordDeserializationSchema()) // 使用自定义反序列化器
                .build();

        // 从 KafkaSource 创建数据流
        DataStream> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Keyed Kafka Source");

        // 对数据流进行操作,现在可以访问键、值和时间戳
        stream.map(record -> "Key: " + record.f0 + ", Value: " + record.f1 + ", Timestamp: " + record.f2)
              .print();

        // 执行 Flink 作业
        env.execute("Flink Keyed Kafka Consumer");
    }
}

3.3 Kafka 生产者示例(用于测试)

为了测试上述 Flink 消费者,您可以使用以下命令启动一个 Kafka 控制台生产者,它会生成带键的记录:

bin/kafka-console-producer.sh --topic test3 --property "parse.key=true" --property "key.separator=:" --bootstrap-server localhost:9092

然后,在控制台中输入 myKey:myValue 这样的消息,Flink 消费者将能够正确解析出 myKey 作为键,myValue 作为值。

4. 总结

通过实现自定义的 KafkaRecordDeserializationSchema,您可以完全控制 Flink 如何从 Kafka 的原始 ConsumerRecord 中提取和反序列化数据。这不仅限于键和值,还可以包括时间戳、主题、分区、偏移量甚至自定义头部信息。这种灵活性使得 Flink 能够处理各种复杂的 Kafka 消息格式,为构建强大的流处理应用提供了坚实的基础。在实际应用中,请确保自定义反序列化器中使用的 Kafka 客户端反序列化器与生产者使用的序列化器保持一致。

相关文章

Kafka Eagle可视化工具
Kafka Eagle可视化工具

Kafka Eagle是一款结合了目前大数据Kafka监控工具的特点,重新研发的一块开源免费的Kafka集群优秀的监控工具。它可以非常方便的监控生产环境中的offset、lag变化、partition分布、owner等,有需要的小伙伴快来保存下载体验吧!

下载

本站声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

相关专题

更多
kafka消费者组有什么作用
kafka消费者组有什么作用

kafka消费者组的作用:1、负载均衡;2、容错性;3、广播模式;4、灵活性;5、自动故障转移和领导者选举;6、动态扩展性;7、顺序保证;8、数据压缩;9、事务性支持。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

167

2024.01.12

kafka消费组的作用是什么
kafka消费组的作用是什么

kafka消费组的作用:1、负载均衡;2、容错性;3、灵活性;4、高可用性;5、扩展性;6、顺序保证;7、数据压缩;8、事务性支持。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

149

2024.02.23

rabbitmq和kafka有什么区别
rabbitmq和kafka有什么区别

rabbitmq和kafka的区别:1、语言与平台;2、消息传递模型;3、可靠性;4、性能与吞吐量;5、集群与负载均衡;6、消费模型;7、用途与场景;8、社区与生态系统;9、监控与管理;10、其他特性。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

202

2024.02.23

数据类型有哪几种
数据类型有哪几种

数据类型有整型、浮点型、字符型、字符串型、布尔型、数组、结构体和枚举等。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

307

2023.10.31

php数据类型
php数据类型

本专题整合了php数据类型相关内容,阅读专题下面的文章了解更多详细内容。

222

2025.10.31

string转int
string转int

在编程中,我们经常会遇到需要将字符串(str)转换为整数(int)的情况。这可能是因为我们需要对字符串进行数值计算,或者需要将用户输入的字符串转换为整数进行处理。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

338

2023.08.02

硬盘接口类型介绍
硬盘接口类型介绍

硬盘接口类型有IDE、SATA、SCSI、Fibre Channel、USB、eSATA、mSATA、PCIe等等。详细介绍:1、IDE接口是一种并行接口,主要用于连接硬盘和光驱等设备,它主要有两种类型:ATA和ATAPI,IDE接口已经逐渐被SATA接口;2、SATA接口是一种串行接口,相较于IDE接口,它具有更高的传输速度、更低的功耗和更小的体积;3、SCSI接口等等。

1049

2023.10.19

PHP接口编写教程
PHP接口编写教程

本专题整合了PHP接口编写教程,阅读专题下面的文章了解更多详细内容。

86

2025.10.17

Golang 性能分析与pprof调优实战
Golang 性能分析与pprof调优实战

本专题系统讲解 Golang 应用的性能分析与调优方法,重点覆盖 pprof 的使用方式,包括 CPU、内存、阻塞与 goroutine 分析,火焰图解读,常见性能瓶颈定位思路,以及在真实项目中进行针对性优化的实践技巧。通过案例讲解,帮助开发者掌握 用数据驱动的方式持续提升 Go 程序性能与稳定性。

8

2026.01.22

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Kotlin 教程
Kotlin 教程

共23课时 | 2.8万人学习

C# 教程
C# 教程

共94课时 | 7.3万人学习

Java 教程
Java 教程

共578课时 | 49.3万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号