
本文详解如何在 pyspark 中对两个 dataframe 执行左连接,填充缺失字段,并新增 `src` 列标识数据来源(`tbl1` 或 `tbl2`),同时正确处理空邮箱与员工号映射逻辑。
在实际数据处理中,常需将主表(如用户邮箱表)与参考表(如员工编号映射表)关联,以补全或校验关键字段。本例中,df1 包含 mail_id 和 e_no(员工号),df2 包含 email 和 emp_no(部分 email 为空,但 emp_no 完整)。目标是:以 df1 为基准做左连接,优先使用 df2 的 emp_no 填充,若 df2 中无匹配则回退至 df1 的 e_no,并用 src 列明确标注数据来源。
关键点在于:
- 连接条件应基于语义对齐:df1.mail_id ↔ df2.email(而非 e_no ↔ emp_no,否则无法关联邮箱信息);
- src 的判断逻辑取决于 df2.emp_no 是否存在(即是否成功匹配到参考表),而非 mail_id 是否为空;
- 最终列需重命名并统一字段名(如 mail_id → email),且 emp_no 需按来源智能选择。
以下是可直接运行的完整代码:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lit
# 示例数据构建(仅用于演示)
spark = SparkSession.builder.appName("JoinWithSource").getOrCreate()
df1 = spark.createDataFrame([
("[email protected]", 111),
("[email protected]", 222),
("[email protected]", 333)
], ["mail_id", "e_no"])
df2 = spark.createDataFrame([
("[email protected]", 111),
(None, 222),
(None, 333)
], ["email", "emp_no"])
# 步骤1:以 df1 为主表,基于 mail_id == email 左连接 df2
result = df1.join(df2, df1.mail_id == df2.email, "left")
# 步骤2:生成 src 列 —— 若 df2.emp_no 为空,则来自 tbl1(即用 df1.e_no),否则来自 tbl2
result = result.withColumn("src", when(col("emp_no").isNull(), "tbl1").otherwise("tbl2"))
# 步骤3:构造最终列:email(取 df1.mail_id)、emp_no(优先 df2.emp_no,否则 df1.e_no)、src
final_result = result.select(
col("mail_id").alias("email"),
when(col("emp_no").isNull(), col("e_no")).otherwise(col("emp_no")).alias("emp_no"),
"src"
)
final_result.show()输出结果:
+----------------+-----+----+ | email|emp_no| src| +----------------+-----+----+ |[email protected]| 111|tbl2| |[email protected]| 222|tbl1| |[email protected]| 333|tbl1| +----------------+-----+----+
✅ 注意事项:
- 原问题中错误地将连接键设为 e_no == emp_no,导致无法利用邮箱字段进行语义关联,也使 src 判断失去业务意义;
- when().otherwise() 是惰性求值的列操作,务必使用 col() 显式引用列,避免字符串误判(如 'mail_id'.isNull() 会报错);
- 若 df2 中存在重复 email,左连接会产生笛卡尔膨胀,建议提前对 df2 去重(如 df2.dropDuplicates(["email"]));
- 生产环境中建议添加 cache() 或 checkpoint() 优化多次复用中间结果的性能。
通过该方案,你不仅能精准完成多源数据融合,还能清晰追溯每条记录的数据血缘,为后续审计与调试提供可靠依据。










