
本文介绍如何在 pyspark 中对两个 dataframe 执行左连接,填充缺失字段,并新增 `src` 列标识数据来源(如 `tbl1` 或 `tbl2`),最终统一输出为标准格式。
在 PySpark 数据处理中,常需将主表(如用户邮箱表)与参考表(如员工编号映射表)进行关联,以补全或校验关键字段。本例中,df1 包含 mail_id 和 e_no(员工号),df2 包含 email 和 emp_no,但 df2 中部分 email 为空 —— 这意味着:当 df1 的邮箱在 df2 中未匹配时,应优先使用 df1 自身的 e_no 值作为 emp_no,并标记来源为 tbl1;反之,若成功匹配,则取 df2 的 emp_no 并标记为 tbl2。
关键点在于连接条件与 src 判断逻辑的一致性:原问题中错误地用 e_no == emp_no 连接,但实际业务语义是“用邮箱对齐”,即 df1.mail_id == df2.email;同时,src 的判断应基于连接后 emp_no 是否为空(因左连接下 df2.emp_no 为 null 表示未匹配),而非检查 mail_id 字段本身。
以下是完整、可运行的解决方案:
from pyspark.sql import functions as F
from pyspark.sql.functions import col, when
# 步骤1:执行左连接(以 df1 为主表,按邮箱对齐)
result = df1.join(df2, df1.mail_id == df2.email, "left")
# 步骤2:构造 src 列——emp_no 为 null 表示未从 df2 匹配到,故来源为 tbl1;否则为 tbl2
result = result.withColumn(
"src",
when(col("emp_no").isNull(), "tbl1").otherwise("tbl2")
)
# 步骤3:选择并重命名字段,统一输出结构
# 若 emp_no 为空,取 df1 的 e_no;否则取 df2 的 emp_no
final_result = result.select(
col("mail_id").alias("email"),
when(col("emp_no").isNull(), col("e_no")).otherwise(col("emp_no")).alias("emp_no"),
"src"
)
final_result.show(truncate=False)✅ 输出效果(与预期一致):
+------------------+------+-----+ |email |emp_no|src | +------------------+------+-----+ |[email protected] |111 |tbl1 | |[email protected] |222 |tbl2 | |[email protected] |333 |tbl2 | +------------------+------+-----+
⚠️ 注意事项:
- 连接键必须语义正确:本例是邮箱关联,不是员工号相等,切勿误用 df1.e_no == df2.emp_no;
- isNull() 判断对象是连接后的列:col("emp_no").isNull() 可靠反映 df2 是否提供有效值;
- 避免空字符串陷阱:df2.email 含空值(非 null),但左连接后 emp_no 仍可能为 null,因此直接判 emp_no 更健壮;
- 若后续需去重或处理多匹配,建议在连接前对 df2 按 email 去重(如 df2.dropDuplicates(["email"]))。
该方案结构清晰、逻辑明确,适用于 ETL 中常见的“主表补全 + 来源追踪”场景。










