0

0

聊聊flink Table的Group Windows

蓮花仙者

蓮花仙者

发布时间:2025-09-13 09:10:10

|

191人浏览过

|

来源于php中文网

原创

本文旨在探讨flink table的group windows

聊聊flink Table的Group Windows

Table table = input
    .window([Window w].as("w"))  // 定义窗口并为其赋予别名 w
    .groupBy("w")  // 按窗口 w 分组表
    .select("b.sum");  // 聚合

Table table = input
    .window([Window w].as("w"))  // 定义窗口并为其赋予别名 w
    .groupBy("w, a")  // 按属性 a 和窗口 w 分组表
    .select("a, b.sum");  // 聚合

Table table = input
    .window([Window w].as("w"))  // 定义窗口并为其赋予别名 w
    .groupBy("w, a")  // 按属性 a 和窗口 w 分组表
    .select("a, w.start, w.end, w.rowtime, b.count"); // 聚合并添加窗口的开始、结束和行时间戳

窗口操作可以为Window设置别名,并在groupBy及select中引用该别名。窗口具有start、end和rowtime属性,其中start和rowtime是包含的,而end是排外的。

Tumbling Windows:

// 事件时间的Tumbling窗口
.window(Tumble.over("10.minutes").on("rowtime").as("w"));

// 处理时间的Tumbling窗口(假设有一个处理时间属性 "proctime")
.window(Tumble.over("10.minutes").on("proctime").as("w"));

// 基于行数的Tumbling窗口(假设有一个处理时间属性 "proctime")
.window(Tumble.over("10.rows").on("proctime").as("w"));

Tumbling Windows按照固定窗口大小移动,因此窗口之间不重叠;over方法用于指定窗口大小;窗口大小可以基于事件时间、处理时间或行数来定义。

Sliding Windows:

// 事件时间的Sliding窗口
.window(Slide.over("10.minutes").every("5.minutes").on("rowtime").as("w"));

// 处理时间的Sliding窗口(假设有一个处理时间属性 "proctime")
.window(Slide.over("10.minutes").every("5.minutes").on("proctime").as("w"));

// 基于行数的Sliding窗口(假设有一个处理时间属性 "proctime")
.window(Slide.over("10.rows").every("5.rows").on("proctime").as("w"));

当滑动间隔小于窗口大小时,Sliding Windows会导致窗口重叠,因此行可能属于多个窗口;over方法用于指定窗口大小,窗口大小可以基于事件时间、处理时间或行数来定义;every方法用于指定滑动间隔。

Session Windows:

甲骨文AI协同平台
甲骨文AI协同平台

专门用于甲骨文研究的革命性平台

下载
// 事件时间的Session窗口
.window(Session.withGap("10.minutes").on("rowtime").as("w"));

// 处理时间的Session窗口(假设有一个处理时间属性 "proctime")
.window(Session.withGap("10.minutes").on("proctime").as("w"));

Session Windows没有固定的窗口大小,它基于非活动时间的长度来关闭窗口,withGap方法用于指定两个窗口之间的间隔,作为时间间隔;Session Windows只能使用事件时间或处理时间。

Table类提供了window操作,接收Window参数,并创建WindowedTable对象。

class Table(
    private[flink] val tableEnv: TableEnvironment,
    private[flink] val logicalPlan: LogicalNode) {

  //......

  def window(window: Window): WindowedTable = {
    new WindowedTable(this, window)
  }

  //......
}

WindowedTable类仅提供groupBy操作,groupBy可以接收String类型的参数,也可以接收Expression类型的参数;String类型的参数会被转换为Expression类型,最终调用的是Expression类型的groupBy方法;如果groupBy操作除了窗口之外没有其他属性,则其并行度为1,只会在单个任务上执行;groupBy方法创建WindowGroupedTable对象。

class WindowedTable(
    private[flink] val table: Table,
    private[flink] val window: Window) {

  def groupBy(fields: Expression*): WindowGroupedTable = {
    val fieldsWithoutWindow = fields.filterNot(window.alias.equals(_))
    if (fields.size != fieldsWithoutWindow.size + 1) {
      throw new ValidationException("GroupBy must contain exactly one window alias.")
    }

    new WindowGroupedTable(table, fieldsWithoutWindow, window)
  }

  def groupBy(fields: String): WindowGroupedTable = {
    val fieldsExpr = ExpressionParser.parseExpressionList(fields)
    groupBy(fieldsExpr: _*)
  }

}

WindowGroupedTable类仅提供select操作,select可以接收String类型的参数,也可以接收Expression类型的参数;String类型的参数会被转换为Expression类型,最终调用的是Expression类型的select方法;select方法创建新的Table对象,其Project操作的子节点为WindowAggregate

class WindowGroupedTable(
    private[flink] val table: Table,
    private[flink] val groupKeys: Seq[Expression],
    private[flink] val window: Window) {

  def select(fields: Expression*): Table = {
    val expandedFields = expandProjectList(fields, table.logicalPlan, table.tableEnv)
    val (aggNames, propNames) = extractAggregationsAndProperties(expandedFields, table.tableEnv)

    val projectsOnAgg = replaceAggregationsAndProperties(
      expandedFields, table.tableEnv, aggNames, propNames)

    val projectFields = extractFieldReferences(expandedFields ++ groupKeys :+ window.timeField)

    new Table(table.tableEnv,
      Project(
        projectsOnAgg,
        WindowAggregate(
          groupKeys,
          window.toLogicalWindow,
          propNames.map(a => Alias(a._1, a._2)).toSeq,
          aggNames.map(a => Alias(a._1, a._2)).toSeq,
          Project(projectFields, table.logicalPlan).validate(table.tableEnv)
        ).validate(table.tableEnv),
        // required for proper resolution of the time attribute in multi-windows
        explicitAlias = true
      ).validate(table.tableEnv))
  }

  def select(fields: String): Table = {
    val fieldExprs = ExpressionParser.parseExpressionList(fields)
    //get the correct expression for AggFunctionCall
    val withResolvedAggFunctionCall = fieldExprs.map(replaceAggFunctionCall(_, table.tableEnv))
    select(withResolvedAggFunctionCall: _*)
  }
}

总结:窗口操作可以为Window设置别名,并在groupBy及select中引用该别名。窗口具有start、end和rowtime属性,其中start和rowtime是包含的,而end是排外的。Tumbling Windows按固定窗口大小移动,不重叠;Sliding Windows在滑动间隔小于窗口大小的情况下会重叠;Session Windows基于非活动时间关闭窗口。Table类提供window操作,创建WindowedTable;WindowedTable提供groupBy操作,创建WindowGroupedTable;WindowGroupedTable提供select操作,创建新的Table,其Project操作的子节点为WindowAggregate。

相关专题

更多
string转int
string转int

在编程中,我们经常会遇到需要将字符串(str)转换为整数(int)的情况。这可能是因为我们需要对字符串进行数值计算,或者需要将用户输入的字符串转换为整数进行处理。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

318

2023.08.02

session失效的原因
session失效的原因

session失效的原因有会话超时、会话数量限制、会话完整性检查、服务器重启、浏览器或设备问题等等。详细介绍:1、会话超时:服务器为Session设置了一个默认的超时时间,当用户在一段时间内没有与服务器交互时,Session将自动失效;2、会话数量限制:服务器为每个用户的Session数量设置了一个限制,当用户创建的Session数量超过这个限制时,最新的会覆盖最早的等等。

311

2023.10.17

session失效解决方法
session失效解决方法

session失效通常是由于 session 的生存时间过期或者服务器关闭导致的。其解决办法:1、延长session的生存时间;2、使用持久化存储;3、使用cookie;4、异步更新session;5、使用会话管理中间件。

740

2023.10.18

cookie与session的区别
cookie与session的区别

本专题整合了cookie与session的区别和使用方法等相关内容,阅读专题下面的文章了解更详细的内容。

88

2025.08.19

windows查看端口占用情况
windows查看端口占用情况

Windows端口可以认为是计算机与外界通讯交流的出入口。逻辑意义上的端口一般是指TCP/IP协议中的端口,端口号的范围从0到65535,比如用于浏览网页服务的80端口,用于FTP服务的21端口等等。怎么查看windows端口占用情况呢?php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

601

2023.07.26

查看端口占用情况windows
查看端口占用情况windows

端口占用是指与端口关联的软件占用端口而使得其他应用程序无法使用这些端口,端口占用问题是计算机系统编程领域的一个常见问题,端口占用的根本原因可能是操作系统的一些错误,服务器也可能会出现端口占用问题。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

1104

2023.07.27

windows照片无法显示
windows照片无法显示

当我们尝试打开一张图片时,可能会出现一个错误提示,提示说"Windows照片查看器无法显示此图片,因为计算机上的可用内存不足",本专题为大家提供windows照片无法显示相关的文章,帮助大家解决该问题。

792

2023.08.01

windows查看端口被占用的情况
windows查看端口被占用的情况

windows查看端口被占用的情况的方法:1、使用Windows自带的资源监视器;2、使用命令提示符查看端口信息;3、使用任务管理器查看占用端口的进程。本专题为大家提供windows查看端口被占用的情况的相关的文章、下载、课程内容,供大家免费下载体验。

452

2023.08.02

Java JVM 原理与性能调优实战
Java JVM 原理与性能调优实战

本专题系统讲解 Java 虚拟机(JVM)的核心工作原理与性能调优方法,包括 JVM 内存结构、对象创建与回收流程、垃圾回收器(Serial、CMS、G1、ZGC)对比分析、常见内存泄漏与性能瓶颈排查,以及 JVM 参数调优与监控工具(jstat、jmap、jvisualvm)的实战使用。通过真实案例,帮助学习者掌握 Java 应用在生产环境中的性能分析与优化能力。

3

2026.01.20

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Express 中文开发手册
Express 中文开发手册

共0课时 | 0人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号