0

0

如何使用PySpark对多组数据执行K-Means聚类分析

聖光之護

聖光之護

发布时间:2025-10-26 09:45:22

|

834人浏览过

|

来源于php中文网

原创

如何使用pyspark对多组数据执行k-means聚类分析

本文旨在解决PySpark中对不同类别数据独立执行K-Means聚类时遇到的`SparkSession`序列化错误。我们将深入探讨Spark的驱动器-执行器架构,解释为何不能在执行器中调用`createDataFrame`等`SparkSession`操作。文章将提供一个基于Spark ML库的解决方案,通过迭代方式在驱动器上为每个类别独立运行K-Means,并给出详细的代码示例和注意事项,帮助读者正确高效地实现分类数据聚类任务。

在PySpark中,对数据进行K-Means聚类是常见的机器学习任务。当需要针对数据集中的不同类别(或分组)独立执行K-Means时,开发者可能会遇到一些挑战,尤其是涉及到Spark的分布式执行模型和对象序列化问题。一个常见的错误是尝试在Spark执行器(executor)中调用SparkSession相关的方法,例如createDataFrame,这会导致pickle.PicklingError。

理解Spark的分布式执行与序列化

Spark采用驱动器-执行器(Driver-Executor)架构。

  • 驱动器(Driver):负责运行应用程序的main函数,创建SparkSession,调度任务,并协调执行器的工作。所有SparkSession对象都存在于驱动器上。
  • 执行器(Executor):运行在工作节点上,负责执行由驱动器分配的任务。当驱动器将任务发送给执行器时,任务中的所有对象(包括函数、变量等)都必须能够被序列化(pickled),以便通过网络传输到执行器。

SparkSession是一个复杂的、与JVM紧密关联的驱动器端对象。它无法被序列化并发送到执行器。因此,任何尝试在执行器中(例如,在一个RDD的map或foreach转换中)直接引用或使用SparkSession对象来创建新的DataFrame,都将导致序列化错误。

为什么sparkSession.createDataFrame在执行器中会失败?

在您提供的原始代码片段中,kmeans函数被设计为在RDD的map操作中执行:

groupedData.rdd.map(lambda row: kmeans(row.point_list, row.category))

def kmeans(points, category):
  # ...
  df = sparkSession.createDataFrame([(Vectors.dense(x),) for x in points], ["features"])
  # ...

这里的kmeans函数会在执行器上运行。当它尝试调用sparkSession.createDataFrame时,执行器会发现它没有一个可用的sparkSession实例,或者更准确地说,它无法反序列化从驱动器传递过来的sparkSession引用。这就是导致pickle.PicklingError和Py4JError的根本原因。createDataFrame需要一个活动的SparkSession实例来构建DataFrame,而这个实例只能在驱动器上访问。

使用Spark MLlib/ML实现按类别K-Means聚类

为了正确地在PySpark中实现按类别K-Means聚类,同时避免上述序列化问题,我们应该将SparkSession相关的操作保留在驱动器上。以下是一种推荐的实现方法,它利用Spark ML库的K-Means算法,并在驱动器上迭代处理每个类别。

Multiavatar
Multiavatar

Multiavatar是一个免费开源的多元文化头像生成器,可以生成高达120亿个虚拟头像

下载

1. 初始化Spark会话并加载数据

首先,确保您的Spark会话已正确初始化,并且能够访问Hive表。

from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import col, udf
from pyspark.sql.types import ArrayType, DoubleType

# 初始化SparkSession并启用Hive支持
spark = SparkSession.builder \
    .appName("PerCategoryKMeans") \
    .enableHiveSupport() \
    .getOrCreate()

# 从Hive表加载原始数据
# 假设您的Hive表 'my_table' 包含 'category' 字符串列和 'point' 数组(或列表)列
# 'point' 列的每个元素代表一个数据点的特征向量,例如 [1.0, 2.0, 3.0]
rawData = spark.sql('select category, point from my_table')

# 打印数据模式以确认 'point' 列的类型
rawData.printSchema()
# 示例:
# root
#  |-- category: string (nullable = true)
#  |-- point: array (nullable = true)
#  |    |-- element: double (containsNull = true)

2. 数据预处理:将特征转换为Vector类型

Spark ML库的K-Means算法要求输入DataFrame包含一个features列,其类型为VectorUDT(即pyspark.ml.linalg.Vector)。如果您的point列已经是数值数组类型(ArrayType(DoubleType)),我们需要将其转换为VectorUDT。

# 定义一个UDF,将Python列表(或ArrayType)转换为Spark的VectorUDT
# VectorUDT 是pyspark.ml.linalg.Vector的内部表示类型
array_to_vector_udf = udf(lambda arr: Vectors.dense(arr), VectorUDT())

# 将 'point' 列转换为 'features' 列,类型为VectorUDT
preparedData = rawData.withColumn("features", array_to_vector_udf(col("point")))

preparedData.printSchema()
# 示例:
# root
#  |-- category: string (nullable = true)
#  |-- point: array (nullable = true)
#  |    |-- element: double (containsNull = true)
#  |-- features: vector (nullable = true)

如果point列是一个单一的数值列,或者有多个独立的数值列需要组合成特征向量,则应使用VectorAssembler:

# 假设 'point_x', 'point_y' 是独立的数值列
# assembler = VectorAssembler(inputCols=["point_x", "point_y"], outputCol="features")
# preparedData = assembler.transform(rawData)

请根据您的实际数据结构选择合适的特征转换方法。

3. 迭代执行K-Means聚类

接下来,我们将在驱动器上迭代处理每个类别。这种方法虽然在驱动器上循环,但每次K-Means的fit和transform操作仍然会利用Spark集群的分布式能力。

# 获取所有不重复的类别
categories = preparedData.select("category").distinct().collect()

all_results = {} # 用于存储所有类别的聚类结果

# 遍历每个类别
for row in categories:
    category = row.category
    print(f"--- 正在处理类别: {category} ---")

    # 过滤出当前类别的数据
    category_df = preparedData.filter(col("category") == category)

    # 检查当前类别是否有足够的数据进行聚类
    # K-Means通常需要至少k个点,或者更多,以获得有意义

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
什么是分布式
什么是分布式

分布式是一种计算和数据处理的方式,将计算任务或数据分散到多个计算机或节点中进行处理。本专题为大家提供分布式相关的文章、下载、课程内容,供大家免费下载体验。

331

2023.08.11

分布式和微服务的区别
分布式和微服务的区别

分布式和微服务的区别在定义和概念、设计思想、粒度和复杂性、服务边界和自治性、技术栈和部署方式等。本专题为大家提供分布式和微服务相关的文章、下载、课程内容,供大家免费下载体验。

236

2023.10.07

php中foreach用法
php中foreach用法

本专题整合了php中foreach用法的相关介绍,阅读专题下面的文章了解更多详细教程。

76

2025.12.04

treenode的用法
treenode的用法

​在计算机编程领域,TreeNode是一种常见的数据结构,通常用于构建树形结构。在不同的编程语言中,TreeNode可能有不同的实现方式和用法,通常用于表示树的节点信息。更多关于treenode相关问题详情请看本专题下面的文章。php中文网欢迎大家前来学习。

539

2023.12.01

C++ 高效算法与数据结构
C++ 高效算法与数据结构

本专题讲解 C++ 中常用算法与数据结构的实现与优化,涵盖排序算法(快速排序、归并排序)、查找算法、图算法、动态规划、贪心算法等,并结合实际案例分析如何选择最优算法来提高程序效率。通过深入理解数据结构(链表、树、堆、哈希表等),帮助开发者提升 在复杂应用中的算法设计与性能优化能力。

21

2025.12.22

深入理解算法:高效算法与数据结构专题
深入理解算法:高效算法与数据结构专题

本专题专注于算法与数据结构的核心概念,适合想深入理解并提升编程能力的开发者。专题内容包括常见数据结构的实现与应用,如数组、链表、栈、队列、哈希表、树、图等;以及高效的排序算法、搜索算法、动态规划等经典算法。通过详细的讲解与复杂度分析,帮助开发者不仅能熟练运用这些基础知识,还能在实际编程中优化性能,提高代码的执行效率。本专题适合准备面试的开发者,也适合希望提高算法思维的编程爱好者。

28

2026.01.06

golang map内存释放
golang map内存释放

本专题整合了golang map内存相关教程,阅读专题下面的文章了解更多相关内容。

75

2025.09.05

golang map相关教程
golang map相关教程

本专题整合了golang map相关教程,阅读专题下面的文章了解更多详细内容。

36

2025.11.16

2026赚钱平台入口大全
2026赚钱平台入口大全

2026年最新赚钱平台入口汇总,涵盖任务众包、内容创作、电商运营、技能变现等多类正规渠道,助你轻松开启副业增收之路。阅读专题下面的文章了解更多详细内容。

33

2026.01.31

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
最新Python教程 从入门到精通
最新Python教程 从入门到精通

共4课时 | 22.4万人学习

Django 教程
Django 教程

共28课时 | 3.7万人学习

SciPy 教程
SciPy 教程

共10课时 | 1.3万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号