0

0

使用PySpark合并DataFrame并有条件地填充缺失值

花韻仙語

花韻仙語

发布时间:2025-10-14 10:05:01

|

505人浏览过

|

来源于php中文网

原创

使用pyspark合并dataframe并有条件地填充缺失值

本教程详细介绍了如何利用PySpark处理两个DataFrame之间的缺失值填充问题。通过分步执行左连接操作,并结合`coalesce`函数,我们能够根据不同的匹配键(如邮件或序列号)从源DataFrame中智能地补充目标DataFrame中的缺失数据,同时处理无匹配项的情况,确保数据完整性和准确性。

引言

在数据处理和集成任务中,我们经常需要从一个数据源(通常是更完整或最新的数据)中提取信息来补充另一个数据源中的缺失字段。当补充逻辑涉及多个匹配键和条件判断时,传统的合并操作可能无法直接满足需求。本教程将展示如何使用PySpark的DataFrame API,通过巧妙地结合多次左连接(Left Join)和coalesce函数,实现对缺失值的有条件填充。

问题描述与数据准备

假设我们有两个DataFrame:persons和people。persons是我们的主DataFrame,其中包含一些缺失的serial_no(序列号)和mail(邮箱)信息。people是辅助DataFrame,包含了更完整的序列号和邮箱数据。我们的目标是根据以下规则填充persons中的缺失值:

  1. 如果persons中serial_no缺失,尝试通过mail字段与people中的e_mail匹配来获取s_no(序列号)。
  2. 如果persons中mail缺失,尝试通过serial_no字段与people中的s_no匹配来获取e_mail(邮箱)。
  3. 如果以上匹配均未找到,则填充为字符串"NA"。

首先,我们创建示例DataFrame来模拟这个场景:

from pyspark.sql import SparkSession
from pyspark.sql.functions import coalesce, lit

# 初始化SparkSession
spark = SparkSession.builder.appName("FillMissingValues").getOrCreate()

# 创建persons DataFrame
data_persons = [
    ("John", 25, 100483, "john@example.com"),
    ("Sam", 49, 448900, "sam@example.com"),
    ("Will", 63, None, "will@example.com"),
    ("Robert", 20, 299011, None),
    ("Hill", 78, None, "hill@example.com")
]
columns_persons = ["name", "age", "serial_no", "mail"]
persons = spark.createDataFrame(data_persons, columns_persons)

# 创建people DataFrame
data_people = [
    ("John", 100483, "john@example.com"),
    ("Sam", 448900, "sam@example.com"),
    ("Will", 229809, "will@example.com"),
    ("Robert", 299011, None),
    ("Hill", 567233, "hill@example.com")
]
columns_people = ["name", "s_no", "e_mail"]
people = spark.createDataFrame(data_people, columns_people)

print("原始 persons DataFrame:")
persons.show()
print("原始 people DataFrame:")
people.show()

原始 persons DataFrame:

+------+---+---------+----------------+
|  name|age|serial_no|            mail|
+------+---+---------+----------------+
|  John| 25|   100483|john@example.com|
|   Sam| 49|   448900| sam@example.com|
|  Will| 63|     NULL|will@example.com|
|Robert| 20|   299011|            NULL|
|  Hill| 78|     NULL|hill@example.com|
+------+---+---------+----------------+

原始 people DataFrame:

+------+------+----------------+
|  name|  s_no|          e_mail|
+------+------+----------------+
|  John|100483|john@example.com|
|   Sam|448900| sam@example.com|
|  Will|229809|will@example.com|
|Robert|299011|            NULL|
|  Hill|567233|hill@example.com|
+------+------+----------------+

解决方案:分步左连接与coalesce

为了实现复杂的条件填充,我们将分两步进行左连接。每一步专注于填充一个特定的缺失字段。

步骤一:根据邮箱填充缺失的序列号

首先,我们通过persons的mail字段与people的e_mail字段进行左连接,以尝试填充serial_no的缺失值。

问小白
问小白

免费使用DeepSeek满血版

下载
# 步骤一:根据mail匹配,填充serial_no
serials_enriched = persons.alias("p") \
    .join(people.alias("pe1"), persons["mail"] == people["e_mail"], "left") \
    .select(
        persons["name"],
        persons["age"],
        # 使用coalesce函数:优先使用persons的serial_no,如果为null则使用people的s_no,
        # 如果两者都为null,则填充"NA"。
        coalesce(persons["serial_no"], people["s_no"], lit("NA")).alias("serial_no"),
        persons["mail"]
    )

print("根据邮箱填充序列号后的 DataFrame:")
serials_enriched.show()

serials_enriched DataFrame:

+------+---+---------+----------------+
|  name|age|serial_no|            mail|
+------+---+---------+----------------+
|  John| 25|   100483|john@example.com|
|   Sam| 49|   448900| sam@example.com|
|  Will| 63|   229809|will@example.com|
|Robert| 20|   299011|            NULL|
|  Hill| 78|   567233|hill@example.com|
+------+---+---------+----------------+

在这一步中,Will的serial_no成功从people中获取到229809。Robert的mail是NULL,所以无法通过邮箱匹配,其serial_no保持不变(因为persons中已有)。

步骤二:根据序列号填充缺失的邮箱

接下来,我们使用上一步得到的结果serials_enriched,通过serial_no字段与people的s_no字段进行左连接,以尝试填充mail的缺失值。

# 步骤二:根据serial_no匹配,填充mail
mail_enriched = serials_enriched.alias("se") \
    .join(people.alias("pe2"), serials_enriched["serial_no"] == people["s_no"], "left") \
    .select(
        serials_enriched["name"],
        serials_enriched["age"],
        serials_enriched["serial_no"],
        # 使用coalesce函数:优先使用serials_enriched的mail,如果为null则使用people的e_mail,
        # 如果两者都为null,则填充"NA"。
        coalesce(serials_enriched["mail"], people["e_mail"], lit("NA")).alias("mail")
    )

print("最终填充后的 DataFrame:")
mail_enriched.show()

mail_enriched DataFrame (最终结果):

+------+---+---------+----------------+
|  name|age|serial_no|            mail|
+------+---+---------+----------------+
|  John| 25|   100483|john@example.com|
|   Sam| 49|   448900| sam@example.com|
|  Will| 63|   229809|will@example.com|
|Robert| 20|   299011|              NA|
|  Hill| 78|   567233|hill@example.com|
+------+---+---------+----------------+

在这一步中,Robert的mail成功被填充为NA,因为persons中mail为NULL,且people中对应的e_mail也为NULL。

完整代码示例

将上述两个步骤整合到一起,形成完整的解决方案:

from pyspark.sql import SparkSession
from pyspark.sql.functions import coalesce, lit

# 初始化SparkSession
spark = SparkSession.builder.appName("FillMissingValues").getOrCreate()

# 创建persons DataFrame
data_persons = [
    ("John", 25, 100483, "john@example.com"),
    ("Sam", 49, 448900, "sam@example.com"),
    ("Will", 63, None, "will@example.com"),
    ("Robert", 20, 299011, None),
    ("Hill", 78, None, "hill@example.com")
]
columns_persons = ["name", "age", "serial_no", "mail"]
persons = spark.createDataFrame(data_persons, columns_persons)

# 创建people DataFrame
data_people = [
    ("John", 100483, "john@example.com"),
    ("Sam", 448900, "sam@example.com"),
    ("Will", 229809, "will@example.com"),
    ("Robert", 299011, None),
    ("Hill", 567233, "hill@example.com")
]
columns_people = ["name", "s_no", "e_mail"]
people = spark.createDataFrame(data_people, columns_people)

# 步骤一:根据mail匹配,填充serial_no
serials_enriched = persons.alias("p") \
    .join(people.alias("pe1"), persons["mail"] == people["e_mail"], "left") \
    .select(
        persons["name"],
        persons["age"],
        coalesce(persons["serial_no"], people["s_no"], lit("NA")).alias("serial_no"),
        persons["mail"]
    )

# 步骤二:根据serial_no匹配,填充mail
final_df = serials_enriched.alias("se") \
    .join(people.alias("pe2"), serials_enriched["serial_no"] == people["s_no"], "left") \
    .select(
        serials_enriched["name"],
        serials_enriched["age"],
        serials_enriched["serial_no"],
        coalesce(serials_enriched["mail"], people["e_mail"], lit("NA")).alias("mail")
    )

print("最终填充后的 DataFrame:")
final_df.show()

# 停止SparkSession
spark.stop()

注意事项与总结

  1. 左连接的选择: 使用left连接是关键,它确保persons DataFrame中的所有记录都被保留,即使在people DataFrame中没有匹配项。没有匹配的记录的来自people的列将显示为NULL,这正是coalesce函数处理的基础。
  2. coalesce函数: coalesce函数接受一列或多列作为参数,并返回第一个非NULL的值。如果所有参数都为NULL,则返回NULL。在本例中,我们还加入了lit("NA")作为最后一个参数,以确保在所有匹配尝试失败时,缺失值被填充为字符串"NA"。
  3. 别名(Aliases): 在连接操作中为DataFrame使用别名(例如persons.alias("p")和people.alias("pe1"))是一个良好的实践,可以提高代码的可读性,并避免在多表连接时列名冲突。
  4. 列的引用: 在select语句中,需要明确指定要选择的列是来自哪个DataFrame的,尤其是在连接了多个DataFrame之后。例如,persons["name"]确保我们选择的是原始persons DataFrame中的name列。
  5. 重复数据处理: 如果people DataFrame中存在e_mail或s_no的重复值,并且这些重复值可能导致不确定的匹配结果(例如,一个邮箱对应多个序列号),那么在执行连接之前,可能需要对people DataFrame进行去重或聚合操作,以确保每个匹配键只对应一个唯一的值。
  6. 性能考量: 对于非常大的DataFrame,多次连接可能会带来性能开销。然而,对于这种复杂的条件填充逻辑,分步连接通常是清晰且高效的实现方式。Spark的优化器通常能很好地处理这些连接操作。

通过上述方法,我们能够灵活且精确地处理DataFrame中的缺失值填充问题,即使填充逻辑涉及多个匹配条件和源数据表。这种模式在数据清洗和特征工程中非常实用。

相关专题

更多
c语言中null和NULL的区别
c语言中null和NULL的区别

c语言中null和NULL的区别是:null是C语言中的一个宏定义,通常用来表示一个空指针,可以用于初始化指针变量,或者在条件语句中判断指针是否为空;NULL是C语言中的一个预定义常量,通常用来表示一个空值,用于表示一个空的指针、空的指针数组或者空的结构体指针。

232

2023.09.22

java中null的用法
java中null的用法

在Java中,null表示一个引用类型的变量不指向任何对象。可以将null赋值给任何引用类型的变量,包括类、接口、数组、字符串等。想了解更多null的相关内容,可以阅读本专题下面的文章。

437

2024.03.01

js 字符串转数组
js 字符串转数组

js字符串转数组的方法:1、使用“split()”方法;2、使用“Array.from()”方法;3、使用for循环遍历;4、使用“Array.split()”方法。本专题为大家提供js字符串转数组的相关的文章、下载、课程内容,供大家免费下载体验。

258

2023.08.03

js截取字符串的方法
js截取字符串的方法

js截取字符串的方法有substring()方法、substr()方法、slice()方法、split()方法和slice()方法。本专题为大家提供字符串相关的文章、下载、课程内容,供大家免费下载体验。

209

2023.09.04

java基础知识汇总
java基础知识汇总

java基础知识有Java的历史和特点、Java的开发环境、Java的基本数据类型、变量和常量、运算符和表达式、控制语句、数组和字符串等等知识点。想要知道更多关于java基础知识的朋友,请阅读本专题下面的的有关文章,欢迎大家来php中文网学习。

1468

2023.10.24

字符串介绍
字符串介绍

字符串是一种数据类型,它可以是任何文本,包括字母、数字、符号等。字符串可以由不同的字符组成,例如空格、标点符号、数字等。在编程中,字符串通常用引号括起来,如单引号、双引号或反引号。想了解更多字符串的相关内容,可以阅读本专题下面的文章。

620

2023.11.24

java读取文件转成字符串的方法
java读取文件转成字符串的方法

Java8引入了新的文件I/O API,使用java.nio.file.Files类读取文件内容更加方便。对于较旧版本的Java,可以使用java.io.FileReader和java.io.BufferedReader来读取文件。在这些方法中,你需要将文件路径替换为你的实际文件路径,并且可能需要处理可能的IOException异常。想了解更多java的相关内容,可以阅读本专题下面的文章。

550

2024.03.22

php中定义字符串的方式
php中定义字符串的方式

php中定义字符串的方式:单引号;双引号;heredoc语法等等。想了解更多字符串的相关内容,可以阅读本专题下面的文章。

546

2024.04.29

excel表格操作技巧大全 表格制作excel教程
excel表格操作技巧大全 表格制作excel教程

Excel表格操作的核心技巧在于 熟练使用快捷键、数据处理函数及视图工具,如Ctrl+C/V(复制粘贴)、Alt+=(自动求和)、条件格式、数据验证及数据透视表。掌握这些可大幅提升数据分析与办公效率,实现快速录入、查找、筛选和汇总。

0

2026.01.21

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Java 教程
Java 教程

共578课时 | 48.7万人学习

国外Web开发全栈课程全集
国外Web开发全栈课程全集

共12课时 | 1.0万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号