0

0

PySpark 中基于日期范围的动态分组:无需 UDF 实现高效区间聚类

心靈之曲

心靈之曲

发布时间:2026-02-28 12:03:37

|

247人浏览过

|

来源于php中文网

原创

PySpark 中基于日期范围的动态分组:无需 UDF 实现高效区间聚类

本文详解如何在 PySpark 中对按日期排序的数据进行“滚动窗口式”动态分组——即以首行 item_date 为起点,将后续落在其 max_window(如 +365 天)内的记录归入同一组,并自动触发新分组;全程纯 SQL 函数实现,零 UDF、零 Pandas 转换。

本文详解如何在 pyspark 中对按日期排序的数据进行“滚动窗口式”动态分组——即以首行 `item_date` 为起点,将后续落在其 `max_window`(如 +365 天)内的记录归入同一组,并自动触发新分组;全程纯 sql 函数实现,零 udf、零 pandas 转换。

在大规模时序数据处理中,常需按“逻辑业务周期”而非固定时间粒度(如月/年)进行分组,例如:将首次事件发生后的 365 天内所有关联事件划为一个生命周期组,后续超出则开启新周期。这种依赖前序累积状态的分组逻辑无法通过常规 groupBy() 或窗口聚合直接表达,但借助 PySpark 3.4+ 引入的高阶函数 aggregate() 与累积差值建模,可完全在 Catalyst 优化器内高效完成。

核心思路是:

Hoppy Copy
Hoppy Copy

AI邮件营销文案平台

下载
  1. 按 item_date 排序,确保时序一致性;
  2. 计算相邻日期差(天数),转化为相对增量序列;
  3. 构建前缀累积差值列表,模拟“从组首日到当前行”的累计跨度;
  4. 用 aggregate() 模拟状态机:维护 (current_span, group_id) 元组,若新增差值使 current_span ≤ 365,则延续当前组;否则重置 current_span = 0 并递增 group_id。

以下为完整可运行代码(适配 Spark 3.4+):

from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import *
from pyspark.sql.types import *

# 初始化 SparkSession(生产环境请配置相应 master)
spark = SparkSession.builder.appName("DateRangeGrouping").getOrCreate()

# 构造示例数据
sample_df = spark.createDataFrame([
    ('2020-01-01', '2021-01-01', 1),
    ('2020-02-01', '2021-02-01', 1),
    ('2021-01-15', '2022-01-15', 2),
    ('2022-01-15', '2023-01-15', 2),
    ('2022-02-01', '2023-02-01', 3),
    ('2022-03-01', '2023-03-01', 3),
    ('2023-03-01', '2024-03-01', 4),
], ['item_date', 'max_window', 'expected_grouping_index'])

# 步骤 1:转为 date 类型并排序
df = sample_df.withColumn("item_date", col("item_date").cast("date"))
window_spec = Window.orderBy("item_date")

# 步骤 2:计算与前一行的天数差(首行为 NULL)
df = df.withColumn("days_since_prev", 
                   datediff(col("item_date"), lag(col("item_date"), 1).over(window_spec)))

# 步骤 3:收集从首行到当前行的所有差值(形成累积路径)
cumulative_window = window_spec.rowsBetween(Window.unboundedPreceding, Window.currentRow)
df = df.withColumn("diff_list", collect_list("days_since_prev").over(cumulative_window))

# 步骤 4:使用 aggregate 模拟分组状态机
# 初始状态:(累计跨度, 当前组ID) = (0, 0)
initial_state = array(lit(0), lit(0))
df = df.withColumn(
    "group_state",
    aggregate(
        "diff_list",
        initial_state,
        lambda acc, x: when(
            (acc[0] + coalesce(x, lit(0))) <= 365,  # 若累加后仍在窗口内
            array(acc[0] + coalesce(x, lit(0)), acc[1])  # 延续当前组:更新跨度
        ).otherwise(
            array(lit(0), acc[1] + 1)  # 超出窗口:重置跨度,组ID+1
        )
    )
)

# 步骤 5:提取 group_id(即 state[1])
result_df = df.withColumn("group_id", col("group_state")[1])

# 最终结果:每组仅保留首行(按 item_date 升序,即 row_number() == 1)
final_df = result_df.withColumn("rn", row_number().over(Window.partitionBy("group_id").orderBy("item_date"))) \
                    .filter(col("rn") == 1) \
                    .drop("diff_list", "group_state", "rn", "days_since_prev")

result_df.select("item_date", "max_window", "expected_grouping_index", "group_id").show(truncate=False)

关键注意事项
性能保障:全程使用 Catalyst 内置函数,避免 UDF 的 JVM 序列化开销与不可优化性;collect_list 在窗口内可控(数据已排序且分组天然稀疏),实践中建议对超长序列加 limit 防止 OOM。
⚠️ 边界鲁棒性:lag() 产生的首行 days_since_prev 为 NULL,coalesce(x, lit(0)) 确保 aggregate 计算安全;若实际窗口非固定 365 天(如依赖 max_window 字段),可将 ? 扩展应用:该模式可泛化至任意“状态驱动分组”,如会话超时分组(30 分钟无活动则新建 session)、库存批次合并(累计数量 ≤ 阈值则同批)等场景。

通过此方法,你不仅能精准复现示例中的 expected_grouping_index(对应 group_id + 1),更能获得可扩展、可维护、高性能的生产级分组能力——真正践行“用 Spark 的方式解决 Spark 的问题”。

本站声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
数据分析工具有哪些
数据分析工具有哪些

数据分析工具有Excel、SQL、Python、R、Tableau、Power BI、SAS、SPSS和MATLAB等。详细介绍:1、Excel,具有强大的计算和数据处理功能;2、SQL,可以进行数据查询、过滤、排序、聚合等操作;3、Python,拥有丰富的数据分析库;4、R,拥有丰富的统计分析库和图形库;5、Tableau,提供了直观易用的用户界面等等。

1048

2023.10.12

SQL中distinct的用法
SQL中distinct的用法

SQL中distinct的语法是“SELECT DISTINCT column1, column2,...,FROM table_name;”。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

339

2023.10.27

SQL中months_between使用方法
SQL中months_between使用方法

在SQL中,MONTHS_BETWEEN 是一个常见的函数,用于计算两个日期之间的月份差。想了解更多SQL的相关内容,可以阅读本专题下面的文章。

379

2024.02.23

SQL出现5120错误解决方法
SQL出现5120错误解决方法

SQL Server错误5120是由于没有足够的权限来访问或操作指定的数据库或文件引起的。想了解更多sql错误的相关内容,可以阅读本专题下面的文章。

1905

2024.03.06

sql procedure语法错误解决方法
sql procedure语法错误解决方法

sql procedure语法错误解决办法:1、仔细检查错误消息;2、检查语法规则;3、检查括号和引号;4、检查变量和参数;5、检查关键字和函数;6、逐步调试;7、参考文档和示例。想了解更多语法错误的相关内容,可以阅读本专题下面的文章。

379

2024.03.06

oracle数据库运行sql方法
oracle数据库运行sql方法

运行sql步骤包括:打开sql plus工具并连接到数据库。在提示符下输入sql语句。按enter键运行该语句。查看结果,错误消息或退出sql plus。想了解更多oracle数据库的相关内容,可以阅读本专题下面的文章。

1458

2024.04.07

sql中where的含义
sql中where的含义

sql中where子句用于从表中过滤数据,它基于指定条件选择特定的行。想了解更多where的相关内容,可以阅读本专题下面的文章。

585

2024.04.29

sql中删除表的语句是什么
sql中删除表的语句是什么

sql中用于删除表的语句是drop table。语法为drop table table_name;该语句将永久删除指定表的表和数据。想了解更多sql的相关内容,可以阅读本专题下面的文章。

437

2024.04.29

Golang 测试体系与代码质量保障:工程级可靠性建设
Golang 测试体系与代码质量保障:工程级可靠性建设

Go语言测试体系与代码质量保障聚焦于构建工程级可靠性系统。本专题深入解析Go的测试工具链(如go test)、单元测试、集成测试及端到端测试实践,结合代码覆盖率分析、静态代码扫描(如go vet)和动态分析工具,建立全链路质量监控机制。通过自动化测试框架、持续集成(CI)流水线配置及代码审查规范,实现测试用例管理、缺陷追踪与质量门禁控制,确保代码健壮性与可维护性,为高可靠性工程系统提供质量保障。

0

2026.02.28

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号