
本文详解如何在 pyspark 中对两个 dataframe 执行左连接,基于邮箱或员工号匹配填充缺失字段,并新增 `src` 列标识数据来源(原始表或补充表),最终统一输出为标准格式。
在实际数据处理中,常需将主表(如用户邮件表)与辅助表(如员工信息表)关联,以补全关键字段(如 emp_no),同时明确每条记录的数据归属。本例中,df1 包含 mail_id 和 e_no,df2 包含 email 和 emp_no,目标是:以 df1 为主表执行左连接,优先使用 df2 的 emp_no 填充;若 df2 中无匹配项,则回退使用 df1 的 e_no;并新增 src 列标记来源(tbl2 表示来自 df2,tbl1 表示回退使用 df1 的原始值)。
关键点在于:
- 连接条件需语义正确:原问题中误用 df1['e_no'] == df2['emp_no'],但两表逻辑关系实为 mail_id ↔ email(邮箱地址匹配),e_no 与 emp_no 是待对齐的业务字段,不应作为连接键;
- src 判定逻辑需基于连接结果状态:应检查 emp_no 是否为 null(即 df2 未匹配),而非错误地判断 mail_id 字段;
- 字段合并需显式控制:使用 when().otherwise() 统一生成 emp_no 列,避免列名冲突或空值遗漏。
以下是推荐的完整实现:
from pyspark.sql import functions as F
from pyspark.sql.functions import col, when
# 步骤1:基于邮箱字段执行左连接(df1为主表)
result = df1.join(df2, df1.mail_id == df2.email, "left")
# 步骤2:添加数据来源标识列 'src'
# 若 emp_no 为 null → 来自 df1(回退使用 e_no),标记为 "tbl1"
# 否则 → 来自 df2,标记为 "tbl2"
result = result.withColumn(
"src",
when(col("emp_no").isNull(), "tbl1").otherwise("tbl2")
)
# 步骤3:构造最终结果:重命名 mail_id → email,合并 emp_no 字段,保留 src
final_result = result.select(
col("mail_id").alias("email"),
when(col("emp_no").isNull(), col("e_no")).otherwise(col("emp_no")).alias("emp_no"),
"src"
)
final_result.show()✅ 输出效果(与预期一致):
+------------------+------+----+ | email|emp_no| src| +------------------+------+----+ || 111|tbl1| | | 222|tbl2| | | 333|tbl2| +------------------+------+----+
⚠️ 注意事项:
- 确保 mail_id 与 email 字段类型一致(建议均为 string),必要时用 .cast("string") 显式转换;
- 若存在大小写或空格差异,可提前标准化:df1 = df1.withColumn("mail_id", F.trim(F.lower(col("mail_id"))));
- when().otherwise() 是惰性求值,无需担心性能;但避免嵌套过深,保持逻辑可读性;
- 生产环境中建议添加 result.count() 和 result.where(col("emp_no").isNull()).count() 进行连接质量校验。
该方案结构清晰、语义准确,兼顾健壮性与可维护性,适用于各类类似的数据补全与溯源场景。










