0

0

Apache Beam Java 实现 JSON 数据按键聚合合并教程

聖光之護

聖光之護

发布时间:2026-02-23 10:20:01

|

132人浏览过

|

来源于php中文网

原创

Apache Beam Java 实现 JSON 数据按键聚合合并教程

本文介绍如何在 Apache Beam(Java)分布式并行环境下,将多个结构相同但 data 字段分散的 JSON 对象,按 companyId 和 asOfDate 联合键高效聚合为单个完整 JSON,兼顾可扩展性与语义一致性。

本文介绍如何在 apache beam(java)分布式并行环境下,将多个结构相同但 `data` 字段分散的 json 对象,按 `companyid` 和 `asofdate` 联合键高效聚合为单个完整 json,兼顾可扩展性与语义一致性。

在 Apache Beam 中实现 JSON 的“逻辑合并”(而非简单字符串拼接),核心在于语义分组 + 结构化聚合。由于 Beam 天然支持大规模并行处理,直接拼接 JSON 字符串会破坏数据一致性且无法保证顺序;正确做法是:先提取业务主键、解析为强类型对象,再通过 GroupByKey 汇聚同键数据,最后在 ParDo 中完成 data 数组的合并与序列化。

1. 定义数据模型与联合键

首先,为输入 JSON 建立清晰的 Java 类型。注意:data 是一个 List,而 companyId 与 asOfDate 共同构成逻辑唯一标识:

public static class DemographicEntry {
  public String demographicVariable;
  public Object value; // 支持 String/Number 等多种类型
}

public static class CompanySnapshot {
  public String companyId;
  public String asOfDate;
  public List<DemographicEntry> data;

  // 无参构造函数(供 Jackson 反序列化)
  public CompanySnapshot() {}

  // 辅助方法:生成联合键(用于 GroupByKey)
  public String getKey() {
    return companyId + "|" + asOfDate;
  }
}

关键设计点:getKey() 返回唯一字符串键,确保相同公司+日期的数据被路由至同一 worker 进行聚合。

2. 构建 Beam Pipeline 实现聚合

以下为完整 Pipeline 核心逻辑(基于 DirectRunner 或 DataflowRunner):

立即学习Java免费学习笔记(深入)”;

智标领航
智标领航

专注招投标业务流程的AI助手,智能、高效、精准、易用!

下载
Pipeline pipeline = Pipeline.create(options);

pipeline
  .apply("ReadJSONLines", TextIO.read().from("gs://your-bucket/input/*.json"))
  .apply("ParseAndExtractKey", ParDo.of(new DoFn<String, KV<String, CompanySnapshot>>() {
    @ProcessElement
    public void processElement(@Element String json, OutputReceiver<KV<String, CompanySnapshot>> out) {
      try {
        CompanySnapshot snapshot = new ObjectMapper().readValue(json, CompanySnapshot.class);
        out.output(KV.of(snapshot.getKey(), snapshot));
      } catch (IOException e) {
        throw new RuntimeException("Failed to parse JSON: " + json, e);
      }
    }
  }))
  .apply("GroupByCompanyAndDate", GroupByKey.create())
  .apply("MergeDataArrays", ParDo.of(new DoFn<KV<String, Iterable<CompanySnapshot>>, String>() {
    @ProcessElement
    public void processElement(@Element KV<String, Iterable<CompanySnapshot>> kv, OutputReceiver<String> out) {
      // 提取首个 snapshot 作为模板(确保 companyId/asOfDate 一致)
      Iterator<CompanySnapshot> iter = kv.getValue().iterator();
      CompanySnapshot merged = iter.next().clone(); // 需实现 clone() 或重建
      merged.data = new ArrayList<>();

      // 合并所有 data 条目
      iter.forEachRemaining(s -> merged.data.addAll(s.data));

      try {
        String resultJson = new ObjectMapper().writeValueAsString(merged);
        out.output(resultJson);
      } catch (JsonProcessingException e) {
        throw new RuntimeException("Failed to serialize merged JSON", e);
      }
    }
  }))
  .apply("WriteMergedJSON", TextIO.write().to("gs://your-bucket/output/merged").withSuffix(".json"));

? 说明

  • GroupByKey 自动将相同 key 的所有 CompanySnapshot 归集到单个 Iterable,由 Beam 保证其原子性与容错性;
  • MergeDataArrays 中的 clone() 或重建逻辑确保不污染原始对象(避免并发修改风险);
  • 使用 ObjectMapper 进行类型安全的序列化,优于字符串拼接,天然支持嵌套结构与类型推断。

3. 注意事项与最佳实践

  • 内存与规模控制:GroupByKey 会将整个 key 对应的所有数据加载进单个 worker 内存。若某 companyId|asOfDate 组包含数万条 data 记录,可能触发 OOM。建议:

    • 预估单 key 最大数据量,设置 --maxNumWorkers 和 --workerMachineType;
    • 对超大组添加监控日志(如 iter.forEachRemaining(...) 前记录 size);
    • 必要时引入 Combine 替代 GroupByKey(需自定义 CombineFn)。
  • 流式场景补充:若输入为 Pub/Sub 流,必须配合窗口(如 FixedWindows.of(Duration.standardHours(1)))和触发器(如 .triggering(AfterWatermark())),否则 GroupByKey 将无限等待。参考 Beam Windowing Guide

  • 键设计优化:若 asOfDate 精确到毫秒或存在时区差异,建议标准化为 ISO date-only 格式(如 "2022-12-11"),避免因微小时间差导致无效分组。

通过以上步骤,你即可在完全分布式、容错、可伸缩的前提下,精准实现多源 JSON 的语义级合并——既符合业务逻辑,又充分发挥 Apache Beam 的并行优势。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
什么是分布式
什么是分布式

分布式是一种计算和数据处理的方式,将计算任务或数据分散到多个计算机或节点中进行处理。本专题为大家提供分布式相关的文章、下载、课程内容,供大家免费下载体验。

397

2023.08.11

分布式和微服务的区别
分布式和微服务的区别

分布式和微服务的区别在定义和概念、设计思想、粒度和复杂性、服务边界和自治性、技术栈和部署方式等。本专题为大家提供分布式和微服务相关的文章、下载、课程内容,供大家免费下载体验。

247

2023.10.07

json数据格式
json数据格式

JSON是一种轻量级的数据交换格式。本专题为大家带来json数据格式相关文章,帮助大家解决问题。

445

2023.08.07

json是什么
json是什么

JSON是一种轻量级的数据交换格式,具有简洁、易读、跨平台和语言的特点,JSON数据是通过键值对的方式进行组织,其中键是字符串,值可以是字符串、数值、布尔值、数组、对象或者null,在Web开发、数据交换和配置文件等方面得到广泛应用。本专题为大家提供json相关的文章、下载、课程内容,供大家免费下载体验。

544

2023.08.23

jquery怎么操作json
jquery怎么操作json

操作的方法有:1、“$.parseJSON(jsonString)”2、“$.getJSON(url, data, success)”;3、“$.each(obj, callback)”;4、“$.ajax()”。更多jquery怎么操作json的详细内容,可以访问本专题下面的文章。

322

2023.10.13

go语言处理json数据方法
go语言处理json数据方法

本专题整合了go语言中处理json数据方法,阅读专题下面的文章了解更多详细内容。

81

2025.09.10

js 字符串转数组
js 字符串转数组

js字符串转数组的方法:1、使用“split()”方法;2、使用“Array.from()”方法;3、使用for循环遍历;4、使用“Array.split()”方法。本专题为大家提供js字符串转数组的相关的文章、下载、课程内容,供大家免费下载体验。

617

2023.08.03

js截取字符串的方法
js截取字符串的方法

js截取字符串的方法有substring()方法、substr()方法、slice()方法、split()方法和slice()方法。本专题为大家提供字符串相关的文章、下载、课程内容,供大家免费下载体验。

217

2023.09.04

pixiv网页版官网登录与阅读指南_pixiv官网直达入口与在线访问方法
pixiv网页版官网登录与阅读指南_pixiv官网直达入口与在线访问方法

本专题系统整理pixiv网页版官网入口及登录访问方式,涵盖官网登录页面直达路径、在线阅读入口及快速进入方法说明,帮助用户高效找到pixiv官方网站,实现便捷、安全的网页端浏览与账号登录体验。

1030

2026.02.13

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Kotlin 教程
Kotlin 教程

共23课时 | 3.8万人学习

C# 教程
C# 教程

共94课时 | 10万人学习

Java 教程
Java 教程

共578课时 | 70.7万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号