0

0

PySpark DataFrame多函数聚合结果行式展示教程

霞舞

霞舞

发布时间:2025-10-22 09:48:01

|

866人浏览过

|

来源于php中文网

原创

PySpark DataFrame多函数聚合结果行式展示教程

本教程将详细介绍如何在pyspark dataframe中,对所有指定列应用多个聚合函数(如`min`和`max`),并将不同聚合函数的结果以行式结构呈现。我们将通过`select`进行初步聚合,然后利用`unionbyname`巧妙地将不同聚合类型的数据行堆叠起来,最终实现清晰、易读的行式聚合报告。

PySpark DataFrame多函数聚合与行式结果呈现

在PySpark数据处理中,我们经常需要对DataFrame的多个列执行聚合操作,例如计算每列的最小值、最大值、平均值等。常见的df.agg()方法通常会将所有聚合结果合并到一行中,并且如果对同一列应用多个聚合函数,需要为每个结果提供唯一的别名。然而,有时我们的需求是希望将不同聚合函数的结果以行式结构展示,例如,一行包含所有列的最小值,另一行包含所有列的最大值。本教程将介绍一种有效的方法来实现这种自定义的行式聚合报告。

挑战与常见误区

初学者可能会尝试使用类似exprs = [min(c).alias(c), max(c).alias(c) for c in df.columns]并结合df.agg(*exprs)的方式。这种方法的问题在于,df.agg()期望为每个聚合结果生成一个独立的列。如果对同一列同时计算min和max并尝试使用相同的别名,PySpark会报错。即使使用不同的别名(如min_col1, max_col1),结果也会是一个单行多列的DataFrame,而不是我们期望的“最小值一行,最大值一行”的结构。

为了实现行式聚合,我们需要一种策略,将每个聚合函数的结果视为一个独立的“报告行”,然后将这些行堆叠起来。

解决方案:分步聚合与结果合并

核心思想是:

Sora
Sora

Sora是OpenAI发布的一种文生视频AI大模型,可以根据文本指令创建现实和富有想象力的场景。

下载
  1. 分别计算每种聚合函数(例如min和max)在所有列上的结果。
  2. 将每种聚合结果转换成统一的结构,包含一个标识聚合类型的列,以及原始列的聚合值。
  3. 使用unionByName将这些结构相同的聚合结果DataFrame合并。

下面通过一个具体的PySpark示例来演示这个过程。

import operator
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# 初始化SparkSession
spark = SparkSession.builder.appName("MultiFunctionAggregation").getOrCreate()

# 示例数据
_data = [
    (4, 123, 18, 29),
    (8, 5, 26, 187),
    (2, 97, 18, 29),
]
_schema = ['col_1', 'col2', 'col3', 'col_4']
df = spark.createDataFrame(_data, _schema)

print("原始DataFrame:")
df.show()

# 1. 计算所有列的最小值和最大值
# 为每个聚合结果创建带有特定前缀的别名,以避免列名冲突
min_vals_exprs = [F.min(c).alias(f'min_{c}') for c in df.columns]
max_vals_exprs = [F.max(c).alias(f'max_{c}') for c in df.columns]

# 使用select进行聚合。
# 注意:这里的结果是一个单行DataFrame,包含了所有列的min和max值,
# 但min和max是作为不同的列存在的。
df_aggregated_single_row = df.select(min_vals_exprs + max_vals_exprs)

print("初步聚合结果 (单行多列):")
df_aggregated_single_row.show()

# 优化:为了避免后续重复计算,可以对聚合结果进行缓存
df_aggregated_single_row.cache()

# 2. 准备用于合并的DataFrame
# 创建min_df:包含'agg_type'列和原始列的最小值
min_cols_selection = [F.lit('min').alias('agg_type')] + \
                     [F.col(f'min_{c}').alias(c) for c in df.columns]
min_df = df_aggregated_single_row.select(min_cols_selection)

# 创建max_df:包含'agg_type'列和原始列的最大值
max_cols_selection = [F.lit('max').alias('agg_type')] + \
                     [F.col(f'max_{c}').alias(c) for c in df.columns]
max_df = df_aggregated_single_row.select(max_cols_selection)

print("最小值DataFrame:")
min_df.show()
print("最大值DataFrame:")
max_df.show()

# 3. 使用unionByName合并结果
# unionByName要求合并的DataFrames具有相同的列名和数据类型,
# 且会根据列名进行匹配,忽略列的顺序。
result_df = min_df.unionByName(max_df)

print("最终行式聚合结果:")
result_df.show()

# 停止SparkSession
spark.stop()

代码解析

  1. 数据准备: 创建一个示例DataFrame df,包含多列数据。
  2. 初步聚合:
    • min_vals_exprs 和 max_vals_exprs:分别生成列表表达式,用于计算每列的最小值和最大值。关键在于使用 f'min_{c}' 和 f'max_{c}' 为聚合结果列创建唯一的别名,例如 min_col_1, max_col_1。
    • df.select(min_vals_exprs + max_vals_exprs):执行这些聚合。select 可以在聚合函数后直接跟列名,将所有聚合结果放在一个单行DataFrame中。
    • df_aggregated_single_row.cache():对这个中间结果进行缓存,因为后续的 min_df 和 max_df 的创建都会从 df_aggregated_single_row 中读取数据。缓存可以避免重复计算,提高效率。
  3. 准备合并:
    • min_cols_selection 和 max_cols_selection:这是转换步骤的核心。
      • F.lit('min').alias('agg_type'):添加一个字面量列 agg_type,用于标识该行数据代表的是哪种聚合('min'或'max')。
      • [F.col(f'min_{c}').alias(c) for c in df.columns]:从 df_aggregated_single_row 中选择带有 min_ 前缀的列,并将其别名改回原始列名(例如,min_col_1 变为 col_1)。
    • min_df = df_aggregated_single_row.select(min_cols_selection) 和 max_df = df_aggregated_single_row.select(max_cols_selection):分别创建包含最小值和最大值的DataFrame。此时,min_df 和 max_df 都将具有 agg_type、col_1、col2 等相同的列名和结构。
  4. 合并结果:
    • result_df = min_df.unionByName(max_df):使用 unionByName 将 min_df 和 max_df 合并。unionByName 会根据列名进行匹配,即使列顺序不同也能正确合并,这对于这种动态生成列的场景非常方便。

拓展与注意事项

  • 更多聚合函数: 如果需要添加更多聚合函数(如 avg、stddev),只需重复“计算初步聚合”和“准备合并”的步骤,为每个函数创建对应的表达式和中间DataFrame,然后将它们链式地 unionByName 起来。
  • 性能考量: 对于非常宽(列数多)的DataFrame或聚合函数种类繁多的情况,生成大量的中间列和DataFrame可能会有性能开销。cache() 的使用有助于减轻重复计算的负担。
  • 列名管理: 确保在初步聚合时使用清晰的、可区分的列别名(如 min_col),并在最终准备阶段将其映射回原始列名,以保持结果的整洁和一致性。
  • 数据类型: unionByName 要求合并的DataFrame具有兼容的数据类型。通常,聚合函数会返回标准数据类型,因此这方面的问题较少。

总结

通过上述分步聚合和unionByName的策略,我们能够灵活地在PySpark中实现复杂的行式聚合报告。这种方法不仅解决了将不同聚合结果堆叠的需求,还通过清晰的步骤和中间DataFrame,使得整个数据处理流程更易于理解和维护。在需要对DataFrame进行多维度聚合分析并以特定格式展示结果时,这是一个非常实用的技巧。

相关专题

更多
数据类型有哪几种
数据类型有哪几种

数据类型有整型、浮点型、字符型、字符串型、布尔型、数组、结构体和枚举等。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

303

2023.10.31

php数据类型
php数据类型

本专题整合了php数据类型相关内容,阅读专题下面的文章了解更多详细内容。

222

2025.10.31

堆和栈的区别
堆和栈的区别

堆和栈的区别:1、内存分配方式不同;2、大小不同;3、数据访问方式不同;4、数据的生命周期。本专题为大家提供堆和栈的区别的相关的文章、下载、课程内容,供大家免费下载体验。

392

2023.07.18

堆和栈区别
堆和栈区别

堆(Heap)和栈(Stack)是计算机中两种常见的内存分配机制。它们在内存管理的方式、分配方式以及使用场景上有很大的区别。本文将详细介绍堆和栈的特点、区别以及各自的使用场景。php中文网给大家带来了相关的教程以及文章欢迎大家前来学习阅读。

572

2023.08.10

PHP WebSocket 实时通信开发
PHP WebSocket 实时通信开发

本专题系统讲解 PHP 在实时通信与长连接场景中的应用实践,涵盖 WebSocket 协议原理、服务端连接管理、消息推送机制、心跳检测、断线重连以及与前端的实时交互实现。通过聊天系统、实时通知等案例,帮助开发者掌握 使用 PHP 构建实时通信与推送服务的完整开发流程,适用于即时消息与高互动性应用场景。

3

2026.01.19

微信聊天记录删除恢复导出教程汇总
微信聊天记录删除恢复导出教程汇总

本专题整合了微信聊天记录相关教程大全,阅读专题下面的文章了解更多详细内容。

41

2026.01.18

高德地图升级方法汇总
高德地图升级方法汇总

本专题整合了高德地图升级相关教程,阅读专题下面的文章了解更多详细内容。

101

2026.01.16

全民K歌得高分教程大全
全民K歌得高分教程大全

本专题整合了全民K歌得高分技巧汇总,阅读专题下面的文章了解更多详细内容。

148

2026.01.16

C++ 单元测试与代码质量保障
C++ 单元测试与代码质量保障

本专题系统讲解 C++ 在单元测试与代码质量保障方面的实战方法,包括测试驱动开发理念、Google Test/Google Mock 的使用、测试用例设计、边界条件验证、持续集成中的自动化测试流程,以及常见代码质量问题的发现与修复。通过工程化示例,帮助开发者建立 可测试、可维护、高质量的 C++ 项目体系。

57

2026.01.16

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Java 教程
Java 教程

共578课时 | 47.8万人学习

国外Web开发全栈课程全集
国外Web开发全栈课程全集

共12课时 | 1.0万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号