0

0

如何在 PySpark 中基于非空条件动态聚合 DataFrame 数据

聖光之護

聖光之護

发布时间:2026-01-17 20:53:46

|

825人浏览过

|

来源于php中文网

原创

如何在 PySpark 中基于非空条件动态聚合 DataFrame 数据

本文介绍如何使用 pyspark 实现“按非空字段灵活匹配并聚合”的场景:对主数据表按总计表中每行定义的、含 null 的过滤条件进行动态关联与求和,避免逐行循环,兼顾性能与准确性。

在实际数据分析中,常遇到一类特殊聚合需求:总计表(lookup table)中的每一行定义了一组“半宽松”匹配规则——仅对非 null 字段生效,null 字段则视为通配符(即不参与约束)。这不同于标准等值连接,也无法直接用 groupby + agg 解决,因为每条总计记录的 join key 是动态的。

以本例为例,flat_data_df 包含交易明细,totals_df 定义了若干统计口径(通过 id 标识),其 attribute1 和 attribute2 列存在 null 值。目标是:对每个 id,找出 flat_data_df 中所有满足「年份、月份、operator 完全一致」且「所有非 null 的 attribute 字段也完全匹配」的记录,对其 value 求和。

核心技巧在于:将 null 条件转化为逻辑或(OR)表达式。对于任一属性列(如 attribute1),匹配逻辑为:
(flat.attribute1 == total.attribute1) OR (total.attribute1 IS NULL)
该表达式确保:当 totals 表中该列为 null 时,该条件恒为真,实现“跳过此字段”的效果;仅当其有值时,才强制要求 flat 表对应字段必须相等。

以下是完整可运行的 PySpark 实现:

import pyspark.sql.functions as f
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("conditional-aggregation").getOrCreate()

# 构建示例数据
flat_data = {
    'year': [2022, 2022, 2022, 2023, 2023, 2023, 2023, 2023, 2023],
    'month': [1, 1, 2, 1, 2, 2, 3, 3, 3],
    'operator': ['A', 'A', 'B', 'A', 'B', 'B', 'C', 'C', 'C'],
    'value': [10, 15, 20, 8, 12, 15, 30, 40, 50],
    'attribute1': ['x', 'x', 'y', 'x', 'y', 'z', 'x', 'z', 'x'],
    'attribute2': ['apple', 'apple', 'banana', 'apple', 'banana', 'banana', 'apple', 'banana', 'banana'],
    'attribute3': ['dog', 'cat', 'dog', 'cat', 'rabbit', 'tutle', 'cat', 'dog', 'dog'],
}
totals = {
    'year': [2022, 2022, 2023, 2023, 2023],
    'month': [1, 2, 1, 2, 3],
    'operator': ['A', 'B', 'A', 'B', 'C'],
    'id': ['id1', 'id2', 'id1', 'id2', 'id3'], 
    'attribute1': [None, 'y', 'x', 'z', 'x'],
    'attribute2': ['apple', None, 'apple', 'banana', 'banana'],
}

flat_df = spark.createDataFrame([(v[i] for v in flat_data.values()) for i in range(len(flat_data['year']))], flat_data.keys())
totals_df = spark.createDataFrame([(v[i] for v in totals.values()) for i in range(len(totals['year']))], totals.keys())

# 关键:构建动态匹配条件(支持任意数量的 attribute 列)
join_condition = (
    (flat_df.year == totals_df.year) &
    (flat_df.month == totals_df.month) &
    (flat_df.operator == totals_df.operator) &
    ((flat_df.attribute1 == totals_df.attribute1) | totals_df.attribute1.isNull()) &
    ((flat_df.attribute2 == totals_df.attribute2) | totals_df.attribute2.isNull())
)

# 执行连接 → 分组聚合
result = (
    flat_df.alias("flat")
    .join(totals_df.alias("total"), join_condition, "inner")
    .select("flat.year", "flat.month", "flat.operator", "total.id", "flat.value")
    .groupBy("year", "month", "operator", "id")
    .agg(f.sum("value").alias("sum"))
)

result.show()

✅ 输出结果:

与光AI
与光AI

一站式AI视频工作流创作平台

下载
+----+-----+--------+---+---+
|year|month|operator| id|sum|
+----+-----+--------+---+---+
|2022|    1|       A|id1| 25|
|2022|    2|       B|id2| 20|
|2023|    1|       A|id1|  8|
|2023|    2|       B|id2| 15|
|2023|    3|       C|id3| 50|
+----+-----+--------+---+---+

? 关键注意事项

  • 扩展性处理:若 totals 表含 80+ 属性列,建议将 join 条件封装为函数自动生成,例如遍历 attribute_columns = ['attribute1', 'attribute2', ..., 'attribute80'] 动态构造 & 链式条件;
  • 性能优化:对高频 join 字段(如 year, month, operator)确保数据已分区或提前 filter;大数据量下可考虑广播小表 totals_df(.hint("broadcast"));
  • null 安全性:务必使用 col.isNull() 而非 col == None,后者在 PySpark 中返回 null(三值逻辑),导致条件失效;
  • 语义验证:id1 在 2022-01-A 中匹配 attribute2='apple'(attribute1 为 null,忽略),故聚合 value=10+15=25,符合预期。

该方案彻底规避了低效的 for-loop 或 UDF,充分利用 Spark 的 Catalyst 优化器与分布式执行能力,是处理“条件式多维汇总”问题的标准范式。

相关专题

更多
什么是分布式
什么是分布式

分布式是一种计算和数据处理的方式,将计算任务或数据分散到多个计算机或节点中进行处理。本专题为大家提供分布式相关的文章、下载、课程内容,供大家免费下载体验。

325

2023.08.11

分布式和微服务的区别
分布式和微服务的区别

分布式和微服务的区别在定义和概念、设计思想、粒度和复杂性、服务边界和自治性、技术栈和部署方式等。本专题为大家提供分布式和微服务相关的文章、下载、课程内容,供大家免费下载体验。

232

2023.10.07

c语言中null和NULL的区别
c语言中null和NULL的区别

c语言中null和NULL的区别是:null是C语言中的一个宏定义,通常用来表示一个空指针,可以用于初始化指针变量,或者在条件语句中判断指针是否为空;NULL是C语言中的一个预定义常量,通常用来表示一个空值,用于表示一个空的指针、空的指针数组或者空的结构体指针。

231

2023.09.22

java中null的用法
java中null的用法

在Java中,null表示一个引用类型的变量不指向任何对象。可以将null赋值给任何引用类型的变量,包括类、接口、数组、字符串等。想了解更多null的相关内容,可以阅读本专题下面的文章。

436

2024.03.01

常用的数据库软件
常用的数据库软件

常用的数据库软件有MySQL、Oracle、SQL Server、PostgreSQL、MongoDB、Redis、Cassandra、Hadoop、Spark和Amazon DynamoDB。更多关于数据库软件的内容详情请看本专题下面的文章。php中文网欢迎大家前来学习。

970

2023.11.02

数据分析的方法
数据分析的方法

数据分析的方法有:对比分析法,分组分析法,预测分析法,漏斗分析法,AB测试分析法,象限分析法,公式拆解法,可行域分析法,二八分析法,假设性分析法。php中文网为大家带来了数据分析的相关知识、以及相关文章等内容。

465

2023.07.04

数据分析方法有哪几种
数据分析方法有哪几种

数据分析方法有:1、描述性统计分析;2、探索性数据分析;3、假设检验;4、回归分析;5、聚类分析。本专题为大家提供数据分析方法的相关的文章、下载、课程内容,供大家免费下载体验。

278

2023.08.07

网站建设功能有哪些
网站建设功能有哪些

网站建设功能包括信息发布、内容管理、用户管理、搜索引擎优化、网站安全、数据分析、网站推广、响应式设计、社交媒体整合和电子商务等功能。这些功能可以帮助网站管理员创建一个具有吸引力、可用性和商业价值的网站,实现网站的目标。

725

2023.10.16

高德地图升级方法汇总
高德地图升级方法汇总

本专题整合了高德地图升级相关教程,阅读专题下面的文章了解更多详细内容。

42

2026.01.16

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
CSS3 教程
CSS3 教程

共18课时 | 4.6万人学习

PostgreSQL 教程
PostgreSQL 教程

共48课时 | 7.3万人学习

Django 教程
Django 教程

共28课时 | 3.2万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号