
本文详解如何在 pyspark 中构建复合窗口,同时满足“最近 n 行”和“最近 m 天”双重条件,通过 `collect_list` + `filter` 组合实现高效、可读的历史结构化字段生成。
在实际数据分析场景中,常需为每条记录聚合其“有效历史”——既不能无限制回溯(避免性能与语义失真),也不能仅依赖固定行偏移(忽略时间稀疏性)。典型需求如:对每个用户,收集当前行前最多 2 条、且日期距今不超过 10 天的记录,并结构化为元组列表。PySpark 原生窗口(rowsBetween)仅支持行数约束,不直接支持时间范围过滤;但可通过“先取宽窗口 + 后过滤”的两阶段策略优雅解决。
核心思路分三步:
- 构造结构化内容列:使用 struct('id', 'date', 'value') 将目标字段打包为嵌套结构,便于后续统一处理;
- 定义宽松行窗口:rowsBetween(-3, -1)(取前 3 行中的前 2 行,预留缓冲)获取初步候选集;
- 时间条件后过滤:用 filter(history, x -> x.date >= date - interval 10 days) 动态剔除超时记录。
以下是完整可运行示例(注意:示例中为演示简洁使用 interval 2 day,实际应替换为 interval 10 days):
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from datetime import datetime
spark = SparkSession.builder.appName("window-history").getOrCreate()
# 构造示例数据(注意:date 列必须为 timestamp 类型)
df = spark.createDataFrame([
(1, datetime(2023, 1, 1), 100),
(1, datetime(2023, 5, 1), 200),
(1, datetime(2023, 5, 2), 300),
(1, datetime(2023, 5, 3), 400),
(1, datetime(2023, 5, 4), 500)
], ["id", "date", "value"])
# 关键步骤:结构化 → 宽窗口聚合 → 时间过滤
result_df = (
df
.withColumn("content_struct", F.struct("id", "date", "value"))
.withColumn(
"history",
F.collect_list("content_struct")
.over(Window.orderBy("date").partitionBy("id").rowsBetween(-3, -1))
)
.withColumn(
"history",
F.expr("filter(history, x -> x.date >= date - interval 10 days)")
)
)
result_df.select("id", "date", "value", "history").show(truncate=False)⚠️ 关键注意事项:
- 时间类型校验:date 列必须为 TimestampType,否则 interval 计算将失败。若原始为字符串,需先用 to_timestamp("date", "yyyy-MM-dd") 转换;
- 窗口排序稳定性:orderBy('date') 要求同一 id 内日期严格递增或处理并列情况(如加次级排序 orderBy('date', 'value'));
- 性能权衡:rowsBetween(-N, -1) 的 N 应略大于预期最大行数(如本例 N=3 对应“最多取 2 行”),避免因时间过滤过度裁剪导致结果为空;
- 空值安全:filter 在空数组上返回空数组,无需额外 coalesce;
- 输出格式定制:若需转为 (id, date_str, value) 元组字符串(如题干示例),可在最后追加 .withColumn("history_str", F.array_join(F.transform("history", lambda x: F.concat_ws(", ", x.id, F.date_format(x.date, "yyyy-MM-dd"), x.value)), "), ("))。
该方案兼具表达力与工程实用性,是 PySpark 处理“时间敏感滑动窗口”问题的标准范式。










