
本教程旨在指导您如何在 apache flink 中高效消费带有键的 kafka 记录。文章详细介绍了使用自定义 `kafkarecorddeserializationschema` 来解析 kafka `consumerrecord` 中的键、值、时间戳等信息,并提供了完整的 flink 应用程序代码示例。通过遵循本文的步骤,您可以轻松地构建能够处理复杂 kafka 消息结构的 flink 流处理应用。
1. 理解带键 Kafka 记录及其重要性
在 Kafka 中,消息(记录)通常包含一个可选的键(Key)和一个值(Value)。键在许多场景下都至关重要,例如:
- 消息顺序保证:同一个键的所有消息会被发送到同一个分区,从而保证了这些消息的消费顺序。
- 状态管理:在 Flink 等流处理框架中,键是进行有状态操作(如聚合、连接)的基础。
- 数据路由:消费者可以根据键来过滤或路由消息。
当使用 kafka-console-producer.sh 并指定 --property "parse.key=true" --property "key.separator=:" 时,生产者会从输入中解析出键和值,并将它们作为独立的字段发送到 Kafka。例如,myKey:myValue 会被解析为键 myKey 和值 myValue。
2. Flink KafkaSource 的默认行为与限制
Apache Flink 提供了 KafkaSource 作为消费 Kafka 数据的首选连接器。然而,当您使用 KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class) 这样的默认配置时,KafkaSource 仅会反序列化 Kafka 记录的值部分,而忽略其键、时间戳、分区、偏移量以及头部信息。这对于只需要处理消息值的场景是足够的,但对于需要访问键或其它元数据的应用来说,这种方式就显得力不从心。
以下是仅读取非带键记录的示例代码,它无法获取 Kafka 记录的键:
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.kafka.common.serialization.StringDeserializer;
public class FlinkValueOnlyKafkaConsumer {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String bootstrapServers = "localhost:9092"; // 替换为您的Kafka地址
KafkaSource source = KafkaSource.builder()
.setBootstrapServers(bootstrapServers)
.setTopics("test3")
.setGroupId("1")
.setStartingOffsets(OffsetsInitializer.earliest())
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build();
DataStream stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
stream.map((MapFunction) value -> "Receiving from Kafka : " + value).print();
env.execute("Flink Value-Only Kafka Consumer");
}
} 3. 自定义 KafkaRecordDeserializationSchema 读取带键记录
要从 Kafka 记录中获取键、值、时间戳等所有信息,您需要实现一个自定义的 KafkaRecordDeserializationSchema。这个接口的 deserialize 方法会接收一个 ConsumerRecord
3.1 定义自定义反序列化器
首先,创建一个实现 KafkaRecordDeserializationSchema 接口的类。在这个示例中,我们将反序列化键和值都为 String 类型,并将它们与时间戳一起封装到一个 Tuple3
import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.util.Collector; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; import java.io.IOException; /** * 自定义 Kafka 记录反序列化器,用于解析键、值和时间戳。 * 输出类型为 Tuple3*/ public class KeyedKafkaRecordDeserializationSchema implements KafkaRecordDeserializationSchema > { // transient 关键字确保这些反序列化器不会被 Flink 的序列化机制尝试序列化 private transient StringDeserializer keyDeserializer; private transient StringDeserializer valueDeserializer; /** * 在反序列化器初始化时调用,用于设置内部状态。 * 通常在这里初始化 Kafka 客户端的反序列化器。 */ @Override public void open(DeserializationSchema.InitializationContext context) throws Exception { // 根据 Kafka 生产者实际使用的序列化器来选择这里的反序列化器 // 假设键和值都是字符串,使用 StringDeserializer keyDeserializer = new StringDeserializer(); valueDeserializer = new StringDeserializer(); } /** * 核心反序列化逻辑。 * * @param record Kafka 原始的 ConsumerRecord 对象,包含字节数组形式的键和值。 * @param out 用于收集反序列化结果的 Collector。 * @throws IOException 如果反序列化过程中发生 I/O 错误。 */ @Override public void deserialize(ConsumerRecord record, Collector > out) throws IOException { // 反序列化键 String key = (record.key() != null) ? keyDeserializer.deserialize(record.topic(), record.key()) : null; // 反序列化值 String value = (record.value() != null) ? valueDeserializer.deserialize(record.topic(), record.value()) : null; // 获取时间戳 long timestamp = record.timestamp(); // 将反序列化后的键、值和时间戳封装成 Tuple3 并发出 out.collect(new Tuple3<>(key, value, timestamp)); } /** * 返回此反序列化器生产的数据类型信息。 * Flink 使用此信息进行类型检查和序列化。 */ @Override public TypeInformation > getProducedType() { // 使用 TypeHint 来获取泛型类型信息 return TypeInformation.of(new org.apache.flink.api.java.typeutils.TypeHint >() {}); } }
注意事项:
- open 方法:在反序列化器首次使用时调用,用于初始化资源。将 Kafka 客户端的反序列化器(如 StringDeserializer)放在这里初始化可以避免在每次 deserialize 调用时重复创建对象,提高效率。
- deserialize 方法:这是核心逻辑所在。ConsumerRecord 提供了 key()、value()、timestamp()、topic()、partition()、offset() 和 headers() 等方法。您可以使用 Kafka 客户端提供的反序列化器(例如 StringDeserializer、LongDeserializer 或自定义的 Avro/Protobuf 反序列化器)来将 byte[] 转换为实际的数据类型。
- getProducedType 方法:必须返回此反序列化器将发出的数据流的 TypeInformation。这对于 Flink 的类型系统至关重要。
3.2 在 Flink KafkaSource 中使用自定义反序列化器
接下来,将我们自定义的 KeyedKafkaRecordDeserializationSchema 应用到 KafkaSource 中:
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
public class FlinkKeyedKafkaConsumer {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String bootstrapServers = "localhost:9092"; // 替换为您的 Kafka 地址
String topic = "test3";
String groupId = "1";
// 构建 KafkaSource,并指定我们自定义的反序列化器
KafkaSource> source = KafkaSource.>builder()
.setBootstrapServers(bootstrapServers)
.setTopics(topic)
.setGroupId(groupId)
.setStartingOffsets(OffsetsInitializer.earliest())
.setDeserializer(new KeyedKafkaRecordDeserializationSchema()) // 使用自定义反序列化器
.build();
// 从 KafkaSource 创建数据流
DataStream> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Keyed Kafka Source");
// 对数据流进行操作,现在可以访问键、值和时间戳
stream.map(record -> "Key: " + record.f0 + ", Value: " + record.f1 + ", Timestamp: " + record.f2)
.print();
// 执行 Flink 作业
env.execute("Flink Keyed Kafka Consumer");
}
} 3.3 Kafka 生产者示例(用于测试)
为了测试上述 Flink 消费者,您可以使用以下命令启动一个 Kafka 控制台生产者,它会生成带键的记录:
bin/kafka-console-producer.sh --topic test3 --property "parse.key=true" --property "key.separator=:" --bootstrap-server localhost:9092
然后,在控制台中输入 myKey:myValue 这样的消息,Flink 消费者将能够正确解析出 myKey 作为键,myValue 作为值。
4. 总结
通过实现自定义的 KafkaRecordDeserializationSchema,您可以完全控制 Flink 如何从 Kafka 的原始 ConsumerRecord 中提取和反序列化数据。这不仅限于键和值,还可以包括时间戳、主题、分区、偏移量甚至自定义头部信息。这种灵活性使得 Flink 能够处理各种复杂的 Kafka 消息格式,为构建强大的流处理应用提供了坚实的基础。在实际应用中,请确保自定义反序列化器中使用的 Kafka 客户端反序列化器与生产者使用的序列化器保持一致。











