0

0

Apache Beam 动态分键写入 Avro 文件的完整实现指南

霞舞

霞舞

发布时间:2026-02-12 16:13:50

|

537人浏览过

|

来源于php中文网

原创

Apache Beam 动态分键写入 Avro 文件的完整实现指南

本文详解如何在 apache beam java 管道中,基于 `kv>` 的分组结果,为每个 key 动态生成独立子目录并写入 avro 文件,核心在于自定义 `dynamicavrodestinations` 实现路径与 schema 的键级动态路由。

在 Apache Beam 中,当使用 GroupByKey 得到 PCollection>> 后,若需按 key(如模型名、用户 ID、业务类别等)将数据分别写入不同路径下的 Avro 文件(例如 output/model_a/part-00000-of-00010.avro),不能直接在 AvroIO.writeGenericRecords() 中硬编码 ${key} 占位符——因为 .to() 方法接收的是静态字符串或 DynamicAvroDestinations 实例,而非运行时可变表达式。

正确做法是:继承 DynamicAvroDestinations 并重写关键方法,使 Beam 在运行时能根据每条记录(此处为 KV 的 key)动态决定输出路径、Schema 和文件命名策略。以下是以你的场景(String 类型 key + 预定义 Avro schema)为基础的完整实现:

Qoder
Qoder

阿里巴巴推出的AI编程工具

下载

✅ 步骤一:定义动态目的地类(Key-aware)

import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.io.DynamicAvroDestinations;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;

import java.util.Collections;
import java.util.List;

// 注意:destination 类型为 String,与你的 KV<String, ...> 的 key 类型一致
public class KeyedAvroDestinations
    extends DynamicAvroDestinations<KV<String, Iterable<String>>, String, GenericRecord> {

  private final ValueProvider<String> baseOutputDir;
  private final String schemaString; // 可预加载的统一 schema(若所有 key 共享同一 schema)

  public KeyedAvroDestinations(ValueProvider<String> baseOutputDir, String schemaString) {
    this.baseOutputDir = baseOutputDir;
    this.schemaString = schemaString;
  }

  @Override
  public Schema getSchema(String destination) {
    return new Schema.Parser().parse(schemaString);
  }

  @Override
  public GenericRecord formatRecord(KV<String, Iterable<String>> record) {
    // 将 KV<String, Iterable<String>> 转为 GenericRecord(示例结构:{"key": "model_a", "values": ["v1","v2"]})
    Schema schema = getSchema(record.getKey());
    GenericRecordBuilder builder = new GenericRecordBuilder(schema);
    builder.set("key", record.getKey());
    builder.set("values", record.getValue()); // 假设 schema 中有 string array 字段 "values"
    return builder.build();
  }

  @Override
  public String getDestination(KV<String, Iterable<String>> record) {
    return record.getKey(); // 每个 KV 的 key 即为逻辑 destination ID
  }

  @Override
  public String getDefaultDestination() {
    return "default"; // fallback key(如空 key 场景)
  }

  @Override
  public FilenamePolicy getFilenamePolicy(String destination) {
    String base = baseOutputDir.get();
    // 生成形如: base/model_a/part-NNNNN-of-XXXXX.avro
    ResourceId outputDir = FileSystems.matchNewResource(base, true)
        .resolve(destination, ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY);
    return DefaultFilenamePolicy.fromBaseDirectory(outputDir)
        .withExtension(".avro");
  }

  // 若无需侧输入(side input),返回空列表即可
  @Override
  public List<PCollectionView<?>> getSideInputs() {
    return Collections.emptyList();
  }
}

✅ 步骤二:在 Pipeline 中应用

// 假设 schemaFile 是已读取的 Avro Schema 字符串(如通过 Files.toString(...))
String avroSchemaJson = Files.toString(schemaFile, StandardCharsets.UTF_8);

PCollection<KV<String, Iterable<String>>> successResponseCollection =
    pipeline
        .apply("Extract Success Response for " + model, ParDo.of(new ExtractSuccessResponseFn()))
        .apply(GroupByKey.create());

// 定义输出根目录(支持模板化,如 "gs://my-bucket/output" 或 "file:///tmp/output")
ValueProvider<String> outputBase = StaticValueProvider.of("output");

successResponseCollection
    .apply("WriteByKeys",
        AvroIO.<String>writeCustomTypeToGenericRecords()
            .to(new KeyedAvroDestinations(outputBase, avroSchemaJson))
            .withSuffix(".avro")
            .withNumShards(options.getNumberShards() == null ? 0 : options.getNumberShards())
            .withWindowedWrites()); // 如需按窗口分目录,保留;否则可移除

⚠️ 关键注意事项

  • Schema 一致性:本例假设所有 key 共享同一 Avro schema。若不同 key 需不同 schema(如多模型异构结构),需配合 PCollectionView> 侧输入,在 getSchema(String destination) 中查表获取对应 schema 字符串。
  • 文件命名与目录结构:FilenamePolicy 决定了最终路径。DefaultFilenamePolicy.fromBaseDirectory(...) 自动处理目录创建与分片命名,无需手动拼接 "key/part"。
  • 类型对齐:DynamicAvroDestinations 的三个泛型必须准确匹配:
    • InputT:你的 PCollection 元素类型(KV>);
    • DestinationT:getDestination() 返回类型(String);
    • OutputT:写出的 Avro 记录类型(GenericRecord)。
  • 性能提示:避免在 getSchema() 或 formatRecord() 中做重操作(如反复解析 JSON Schema),建议提前解析并缓存。

✅ 总结

Apache Beam 的 DynamicAvroDestinations 是实现“一 key 一目录/一 Schema/一策略”写入的核心机制。它将路径、Schema、序列化逻辑解耦至运行时决策层,既保持了流批统一的抽象能力,又满足了生产中常见的多租户、多模型、多格式落地需求。掌握其生命周期方法(getDestination, getSchema, formatRecord, getFilenamePolicy)的语义与调用时机,是构建健壮数据管道的关键进阶技能。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
json数据格式
json数据格式

JSON是一种轻量级的数据交换格式。本专题为大家带来json数据格式相关文章,帮助大家解决问题。

436

2023.08.07

json是什么
json是什么

JSON是一种轻量级的数据交换格式,具有简洁、易读、跨平台和语言的特点,JSON数据是通过键值对的方式进行组织,其中键是字符串,值可以是字符串、数值、布尔值、数组、对象或者null,在Web开发、数据交换和配置文件等方面得到广泛应用。本专题为大家提供json相关的文章、下载、课程内容,供大家免费下载体验。

544

2023.08.23

jquery怎么操作json
jquery怎么操作json

操作的方法有:1、“$.parseJSON(jsonString)”2、“$.getJSON(url, data, success)”;3、“$.each(obj, callback)”;4、“$.ajax()”。更多jquery怎么操作json的详细内容,可以访问本专题下面的文章。

317

2023.10.13

go语言处理json数据方法
go语言处理json数据方法

本专题整合了go语言中处理json数据方法,阅读专题下面的文章了解更多详细内容。

81

2025.09.10

string转int
string转int

在编程中,我们经常会遇到需要将字符串(str)转换为整数(int)的情况。这可能是因为我们需要对字符串进行数值计算,或者需要将用户输入的字符串转换为整数进行处理。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

708

2023.08.02

js 字符串转数组
js 字符串转数组

js字符串转数组的方法:1、使用“split()”方法;2、使用“Array.from()”方法;3、使用for循环遍历;4、使用“Array.split()”方法。本专题为大家提供js字符串转数组的相关的文章、下载、课程内容,供大家免费下载体验。

509

2023.08.03

js截取字符串的方法
js截取字符串的方法

js截取字符串的方法有substring()方法、substr()方法、slice()方法、split()方法和slice()方法。本专题为大家提供字符串相关的文章、下载、课程内容,供大家免费下载体验。

214

2023.09.04

java基础知识汇总
java基础知识汇总

java基础知识有Java的历史和特点、Java的开发环境、Java的基本数据类型、变量和常量、运算符和表达式、控制语句、数组和字符串等等知识点。想要知道更多关于java基础知识的朋友,请阅读本专题下面的的有关文章,欢迎大家来php中文网学习。

1550

2023.10.24

c语言 数据类型
c语言 数据类型

本专题整合了c语言数据类型相关内容,阅读专题下面的文章了解更多详细内容。

4

2026.02.12

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
RunnerGo从入门到精通
RunnerGo从入门到精通

共22课时 | 1.8万人学习

尚学堂Mahout视频教程
尚学堂Mahout视频教程

共18课时 | 3.2万人学习

Linux优化视频教程
Linux优化视频教程

共14课时 | 3.2万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号