0

0

PySpark 中对数组列计算均值与众数的完整实践指南

碧海醫心

碧海醫心

发布时间:2026-03-10 13:04:20

|

403人浏览过

|

来源于php中文网

原创

PySpark 中对数组列计算均值与众数的完整实践指南

本文详解如何在 PySpark DataFrame 中高效、稳定地为 array 类型的数值列计算均值、为 array 类型的类别列计算众数,并安全添加新列,避免 UDF 常见的类型/空值/序列化错误。

本文详解如何在 pyspark dataframe 中高效、稳定地为 array 类型的数值列计算均值、为 array 类型的类别列计算众数,并安全添加新列,避免 udf 常见的类型/空值/序列化错误。

在 PySpark 中直接对数组列(如 score: array 或 review: array)进行聚合统计(如均值、众数)时,若盲目使用 Python 原生 statistics 模块配合 UDF,极易因数据类型不匹配、空数组、序列化限制或 Pandas 兼容性问题导致运行时失败(如 calculation failed during evaluation)。根本原因在于:原始 schema 中 score 被错误定义为 array,而实际数据是浮点数字符串;同时 statistics.mode() 在存在多个众数时会抛出 StatisticsError,且 UDF 缺乏对 null/空数组的鲁棒处理。

推荐方案:混合使用内置函数 + 安全 UDF
兼顾性能(避免全量 shuffle)、可读性与健壮性:

1. 正确声明 Schema 并预处理数据类型

首先确保数组元素类型准确。若原始数据中 score 存储为字符串数组,需先转为 double 数组:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, ArrayType, DoubleType, StringType
from pyspark.sql.functions import col, explode, avg, udf, when, size, array
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("ArrayStats").getOrCreate()

# ✅ 正确定义 schema:score 元素为 DoubleType(非 String)
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("score", ArrayType(DoubleType(), True), True),
    StructField("review", ArrayType(StringType(), True), True)
])

# 若原始数据 score 是 string 数组,需先 cast(示例):
# df = df.withColumn("score", F.col("score").cast(ArrayType(DoubleType())))

2. 高效计算数组均值:用 aggregate 内置高阶函数(Spark 3.4+)

无需 UDF、无需 explode、零 shuffle,性能最优:

# ✅ 推荐:使用 aggregate(Spark ≥ 3.4)
df = df.withColumn(
    "scoreMean",
    F.when(F.size("score") == 0, None)
     .otherwise(
         F.aggregate(
             col("score"),
             F.struct(F.lit(0.0).alias("sum"), F.lit(0).alias("count")),
             lambda acc, x: F.struct(acc["sum"] + x, acc["count"] + 1),
             lambda acc: acc["sum"] / acc["count"]
         )
     )
)

⚠️ 注意:aggregate 要求 Spark 3.4+。若版本较低,改用 explode + groupBy(见下文),但需注意 id 必须参与分组以保行级对应。

艺映AI
艺映AI

艺映AI - 免费AI视频创作工具

下载

3. 安全计算众数:鲁棒 UDF(兼容空数组、多众数)

statistics.mode() 易崩溃,应手动实现并处理边界:

from collections import Counter

def safe_mode(arr):
    if not arr or len(arr) == 0:
        return None
    # 统计频次,返回最高频元素(遇到并列时取第一个,符合多数业务预期)
    counter = Counter(arr)
    return counter.most_common(1)[0][0] if counter else None

mode_udf = udf(safe_mode, StringType())

df = df.withColumn("reviewMode", mode_udf(col("review")))

4. 完整可运行示例(兼容 Spark 3.0+)

结合 explode(通用方案)与安全 UDF:

from pyspark.sql.types import IntegerType, StringType, ArrayType, DoubleType
from pyspark.sql.functions import explode, avg, col, when, size, struct, lit

# 构造示例数据(注意:score 直接为 double 数组)
data = [
    (1, [83.52, 81.79, 84.0, 75.0], ["P", "N", "P", "P"]),
    (2, [86.13, 85.48], ["N", "N", "N", "P"])
]
df = spark.createDataFrame(data, ["id", "score", "review"])

# ✅ 步骤1:计算 scoreMean(explode + groupBy,保留 id)
df_with_mean = (
    df.select("id", "review", explode("score").alias("score_val"))
      .groupBy("id", "review")
      .agg(avg("score_val").alias("scoreMean"))
)

# ✅ 步骤2:计算 reviewMode(安全 UDF)
df_with_mean = df_with_mean.withColumn(
    "reviewMode", 
    mode_udf(col("review"))
)

# ✅ 步骤3:还原原始 score 列(若需保留)
result_df = df_with_mean.join(
    df.select("id", "score"), 
    on="id"
).select("id", "score", "review", "scoreMean", "reviewMode")

result_df.show(truncate=False)

输出结果:

+---+------------------+---------+----------+--------+
|id |score             |review   |scoreMean |reviewMode|
+---+------------------+---------+----------+--------+
|1  |[83.52, 81.79, 84.0, 75.0]|[P, N, P, P]|81.0775   |P       |
|2  |[86.13, 85.48]     |[N, N, N, P]|85.805    |N       |
+---+------------------+---------+----------+--------+

关键注意事项总结

  • ? Schema 优先:务必确认数组元素类型(DoubleType 而非 StringType),否则 eval() 或 float() 强转会失败;
  • ? UDF 鲁棒性:始终检查 None、空数组、单元素边界;避免 statistics.mode(),改用 Counter.most_common(1);
  • ? 性能权衡:aggregate > explode + groupBy > Python UDF;生产环境优先用高阶函数;
  • ? Null 处理:用 when(size(col) === 0, None).otherwise(...) 显式控制空数组逻辑;
  • ? 版本兼容:aggregate 需 Spark 3.4+;旧版本请严格使用 explode 并确保 groupBy 包含所有原行标识字段(如 id)。

通过以上方法,即可稳定、高效、专业地完成 PySpark 数组列的均值与众数计算任务。

本站声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
Python 时间序列分析与预测
Python 时间序列分析与预测

本专题专注讲解 Python 在时间序列数据处理与预测建模中的实战技巧,涵盖时间索引处理、周期性与趋势分解、平稳性检测、ARIMA/SARIMA 模型构建、预测误差评估,以及基于实际业务场景的时间序列项目实操,帮助学习者掌握从数据预处理到模型预测的完整时序分析能力。

78

2025.12.04

Python 数据清洗与预处理实战
Python 数据清洗与预处理实战

本专题系统讲解 Python 在数据清洗与预处理中的核心技术,包括使用 Pandas 进行缺失值处理、异常值检测、数据格式化、特征工程与数据转换,结合 NumPy 高效处理大规模数据。通过实战案例,帮助学习者掌握 如何处理混乱、不完整数据,为后续数据分析与机器学习模型训练打下坚实基础。

12

2026.01.31

数据类型有哪几种
数据类型有哪几种

数据类型有整型、浮点型、字符型、字符串型、布尔型、数组、结构体和枚举等。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

335

2023.10.31

php数据类型
php数据类型

本专题整合了php数据类型相关内容,阅读专题下面的文章了解更多详细内容。

223

2025.10.31

c语言 数据类型
c语言 数据类型

本专题整合了c语言数据类型相关内容,阅读专题下面的文章了解更多详细内容。

138

2026.02.12

string转int
string转int

在编程中,我们经常会遇到需要将字符串(str)转换为整数(int)的情况。这可能是因为我们需要对字符串进行数值计算,或者需要将用户输入的字符串转换为整数进行处理。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

990

2023.08.02

css中float用法
css中float用法

css中float属性允许元素脱离文档流并沿其父元素边缘排列,用于创建并排列、对齐文本图像、浮动菜单边栏和重叠元素。想了解更多float的相关内容,可以阅读本专题下面的文章。

594

2024.04.28

C++中int、float和double的区别
C++中int、float和double的区别

本专题整合了c++中int和double的区别,阅读专题下面的文章了解更多详细内容。

105

2025.10.23

Kotlin Android模块化架构与组件化开发实践
Kotlin Android模块化架构与组件化开发实践

本专题围绕 Kotlin 在 Android 应用开发中的架构实践展开,重点讲解模块化设计与组件化开发的实现思路。内容包括项目模块拆分策略、公共组件封装、依赖管理优化、路由通信机制以及大型项目的工程化管理方法。通过真实项目案例分析,帮助开发者构建结构清晰、易扩展且维护成本低的 Android 应用架构体系,提升团队协作效率与项目迭代速度。

24

2026.03.09

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号