
本文介绍使用 spark 窗口函数(window + row_number)配合聚合操作,实现对每个分组内行序敏感的筛选与重组,例如跳过每组第二条记录后拼接剩余文本,无需自定义 udaf 即可高效完成。
在 Spark SQL 和 DataFrame API 中,若需对每个分组(如按 Names 分组)执行基于行序的精细操作(如“删除每组第 2 条记录”、“保留前 N 条”、“取中间值”等),直接使用 groupBy().agg() 无法满足需求——因为标准聚合函数(如 collect_list, concat_ws)不感知行内顺序。此时,窗口函数(Window Function)是更简洁、高效且符合 Spark 原生范式的解决方案。
核心思路分为三步:
- 添加行号标识:使用 row_number().over(Window.partitionBy("Names").orderBy(...)) 为每组内各行分配唯一序号;
- 过滤目标行:通过 .where("rn 2") 排除每组中序号为 2 的记录;
- 重新聚合:对清洗后的数据按 Names 分组,用 collect_list 收集剩余文本,并以空格为分隔符拼接。
以下是完整 Java 实现(适配 Spark 3.x):
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.expressions.Window; import static org.apache.spark.sql.functions.*; // 假设 df 已初始化为原始 DatasetDataset
result = df .withColumn("rn", row_number().over(Window.partitionBy("Names").orderBy("Names"))) // 注意:orderBy 字段影响行序稳定性 .filter(col("rn").notEqual(2)) .groupBy("Names") .agg(concat_ws(" ", collect_list("Random_Text")).as("Random_Text")); result.show();
✅ 关键说明与注意事项:
- orderBy("Names") 在窗口中仅用于保证 row_number 可重复性(因 Names 相同,实际顺序未定义)。若业务要求严格顺序(如按原始输入顺序或时间戳),应改用有区分度的列(如 orderBy("timestamp") 或添加 monotonically_increasing_id() 作为辅助排序键);
- filter(col("rn").notEqual(2)) 等价于 .where("rn 2"),推荐使用列对象语法以增强类型安全;
- concat_ws(" ", collect_list(...)) 自动忽略 null 值,适合文本拼接场景;若需更复杂逻辑(如去重、截断、格式化),可在 collect_list 后接 udf 或使用高阶函数(Spark 3.4+ 支持 transform, filter 等);
- 无需自定义 UDAF:本方案完全基于内置函数,性能优、可读性强、易于维护,避免了 UDAF 开发、序列化及调试成本。
该模式可轻松扩展至其他分组级变换场景,例如:“每组保留最后 3 条”、“跳过首尾各 1 条”、“仅合并偶数行”等——只需调整窗口内的 orderBy 和后续 filter 条件即可。










