0

0

聊聊flink的Tumbling Window

絕刀狂花

絕刀狂花

发布时间:2025-09-14 09:02:20

|

911人浏览过

|

来源于php中文网

原创

本文主要研究一下flink的Tumbling Window

聊聊flink的Tumbling Window

WindowAssigner

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java

代码语言:javascript代码运行次数:0运行复制

@PublicEvolvingpublic abstract class WindowAssigner<t w extends window> implements Serializable {    private static final long serialVersionUID = 1L;​    /**     * Returns a {@code Collection} of windows that should be assigned to the element.     *     * @param element The element to which windows should be assigned.     * @param timestamp The timestamp of the element.     * @param context The {@link WindowAssignerContext} in which the assigner operates.     */    public abstract Collection<w> assignWindows(T element, long timestamp, WindowAssignerContext context);​    /**     * Returns the default trigger associated with this {@code WindowAssigner}.     */    public abstract Trigger<t w> getDefaultTrigger(StreamExecutionEnvironment env);​    /**     * Returns a {@link TypeSerializer} for serializing windows that are assigned by     * this {@code WindowAssigner}.     */    public abstract TypeSerializer<w> getWindowSerializer(ExecutionConfig executionConfig);​    /**     * Returns {@code true} if elements are assigned to windows based on event time,     * {@code false} otherwise.     */    public abstract boolean isEventTime();​    /**     * A context provided to the {@link WindowAssigner} that allows it to query the     * current processing time.     *     * <p>This is provided to the assigner by its containing     * {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator},     * which, in turn, gets it from the containing     * {@link org.apache.flink.streaming.runtime.tasks.StreamTask}.     */    public abstract static class WindowAssignerContext {​        /**         * Returns the current processing time.         */        public abstract long getCurrentProcessingTime();​    }}</p></w></t></w></t>

WindowAssigner定义了assignWindows、getDefaultTrigger、getWindowSerializer、isEventTime这几个抽象方法,同时定义了抽象静态类WindowAssignerContext;它有两个泛型,其中T为元素类型,而W为窗口类型Window

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/windows/Window.java

代码语言:javascript代码运行次数:0运行复制

@PublicEvolvingpublic abstract class Window {​    /**     * Gets the largest timestamp that still belongs to this window.     *     * @return The largest timestamp that still belongs to this window.     */    public abstract long maxTimestamp();}

Window对象代表把无限流数据划分为有限buckets的集合,它有一个maxTimestamp,代表该窗口数据在该时间点内到达;它有两个子类,一个是GlobalWindow,一个是TimeWindowTimeWindow

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java

图酷AI
图酷AI

下载即用!可以免费使用的AI图像处理工具,致力于为用户提供最先进的AI图像处理技术,让图像编辑变得简单高效。

下载

代码语言:javascript代码运行次数:0运行复制

@PublicEvolvingpublic class TimeWindow extends Window {​    private final long start;    private final long end;​    public TimeWindow(long start, long end) {        this.start = start;        this.end = end;    }​    /**     * Gets the starting timestamp of the window. This is the first timestamp that belongs     * to this window.     *     * @return The starting timestamp of this window.     */    public long getStart() {        return start;    }​    /**     * Gets the end timestamp of this window. The end timestamp is exclusive, meaning it     * is the first timestamp that does not belong to this window any more.     *     * @return The exclusive end timestamp of this window.     */    public long getEnd() {        return end;    }​    /**     * Gets the largest timestamp that still belongs to this window.     *     * <p>This timestamp is identical to {@code getEnd() - 1}.     *     * @return The largest timestamp that still belongs to this window.     *     * @see #getEnd()     */    @Override    public long maxTimestamp() {        return end - 1;    }​    @Override    public boolean equals(Object o) {        if (this == o) {            return true;        }        if (o == null || getClass() != o.getClass()) {            return false;        }​        TimeWindow window = (TimeWindow) o;​        return end == window.end && start == window.start;    }​    @Override    public int hashCode() {        return MathUtils.longToIntWithBitMixing(start + end);    }​    @Override    public String toString() {        return "TimeWindow{" +                "start=" + start +                ", end=" + end +                '}';    }​    /**     * Returns {@code true} if this window intersects the given window.     */    public boolean intersects(TimeWindow other) {        return this.start = other.start;    }​    /**     * Returns the minimal window covers both this window and the given window.     */    public TimeWindow cover(TimeWindow other) {        return new TimeWindow(Math.min(start, other.start), Math.max(end, other.end));    }​    // ------------------------------------------------------------------------    // Serializer    // ------------------------------------------------------------------------​    //......​    // ------------------------------------------------------------------------    //  Utilities    // ------------------------------------------------------------------------​    /**     * Merge overlapping {@link TimeWindow}s. For use by merging     * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner WindowAssigners}.     */    public static void mergeWindows(Collection<timewindow> windows, MergingWindowAssigner.MergeCallback<timewindow> c) {​        // sort the windows by the start time and then merge overlapping windows​        List<timewindow> sortedWindows = new ArrayList(windows);​        Collections.sort(sortedWindows, new Comparator<timewindow>() {            @Override            public int compare(TimeWindow o1, TimeWindow o2) {                return Long.compare(o1.getStart(), o2.getStart());            }        });​        List<tuple2 set>>> merged = new ArrayList();        Tuple2<timewindow set>> currentMerge = null;​        for (TimeWindow candidate: sortedWindows) {            if (currentMerge == null) {                currentMerge = new Tuple2();                currentMerge.f0 = candidate;                currentMerge.f1 = new HashSet();                currentMerge.f1.add(candidate);            } else if (currentMerge.f0.intersects(candidate)) {                currentMerge.f0 = currentMerge.f0.cover(candidate);                currentMerge.f1.add(candidate);            } else {                merged.add(currentMerge);                currentMerge = new Tuple2();                currentMerge.f0 = candidate;                currentMerge.f1 = new HashSet();                currentMerge.f1.add(candidate);            }        }​        if (currentMerge != null) {            merged.add(currentMerge);        }​        for (Tuple2<timewindow set>> m: merged) {            if (m.f1.size() > 1) {                c.merge(m.f1, m.f0);            }        }    }​    /**     * Method to get the window start for a timestamp.     *     * @param timestamp epoch millisecond to get the window start.     * @param offset The offset which window start would be shifted by.     * @param windowSize The size of the generated windows.     * @return window start     */    public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {        return timestamp - (timestamp - offset + windowSize) % windowSize;    }}</timewindow></timewindow></tuple2></timewindow></timewindow></timewindow></timewindow></p>

TimeWindow有start及end属性,其中start为inclusive,而end为exclusive,所以maxTimestamp返回的是end-1;这里重写了equals及hashcode方法TimeWindow提供了intersects方法用于表示本窗口与指定窗口是否有交叉;而cover方法用于返回本窗口与指定窗口的重叠窗口TimeWindow还提供了mergeWindows及getWindowStartWithOffset静态方法;前者用于合并重叠的时间窗口,后者用于获取指定timestamp、offset、windowSize的window startTumblingEventTimeWindows

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java

代码语言:javascript代码运行次数:0运行复制

@PublicEvolvingpublic class TumblingEventTimeWindows extends WindowAssigner<object timewindow> {    private static final long serialVersionUID = 1L;​    private final long size;​    private final long offset;​    protected TumblingEventTimeWindows(long size, long offset) {        if (offset = size) {            throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy 0  assignWindows(Object element, long timestamp, WindowAssignerContext context) {        if (timestamp > Long.MIN_VALUE) {            // Long.MIN_VALUE is currently assigned when no timestamp is present            long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);            return Collections.singletonList(new TimeWindow(start, start + size));        } else {            throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +                    "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +                    "'DataStream.assignTimestampsAndWatermarks(...)'?");        }    }​    @Override    public Trigger<object timewindow> getDefaultTrigger(StreamExecutionEnvironment env) {        return EventTimeTrigger.create();    }​    @Override    public String toString() {        return "TumblingEventTimeWindows(" + size + ")";    }​    public static TumblingEventTimeWindows of(Time size) {        return new TumblingEventTimeWindows(size.toMilliseconds(), 0);    }​    public static TumblingEventTimeWindows of(Time size, Time offset) {        return new TumblingEventTimeWindows(size.toMilliseconds(), offset.toMilliseconds());    }​    @Override    public TypeSerializer<timewindow> getWindowSerializer(ExecutionConfig executionConfig) {        return new TimeWindow.Serializer();    }​    @Override    public boolean isEventTime() {        return true;    }}</timewindow></object></object>

TumblingEventTimeWindows继承了Window,其中元素类型为Object,而窗口类型为TimeWindow;它有两个参数,一个是size,一个是offset,其中offset必须大于等于0,size必须大于offsetassignWindows方法获取的窗口为start及start+size,而start=TimeWindow.getWindowStartWithOffset(timestamp, offset, size);getDefaultTrigger方法返回的是EventTimeTrigger;getWindowSerializer方法返回的是TimeWindow.Serializer();isEventTime返回trueTumblingEventTimeWindows提供了of静态工厂方法,可以指定size及offset参数TumblingProcessingTimeWindows

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java

代码语言:javascript代码运行次数:0运行复制

public class TumblingProcessingTimeWindows extends WindowAssigner<object timewindow> {    private static final long serialVersionUID = 1L;​    private final long size;​    private final long offset;​    private TumblingProcessingTimeWindows(long size, long offset) {        if (offset = size) {            throw new IllegalArgumentException("TumblingProcessingTimeWindows parameters must satisfy  0  assignWindows(Object element, long timestamp, WindowAssignerContext context) {        final long now = context.getCurrentProcessingTime();        long start = TimeWindow.getWindowStartWithOffset(now, offset, size);        return Collections.singletonList(new TimeWindow(start, start + size));    }​    public long getSize() {        return size;    }​    @Override    public Trigger<object timewindow> getDefaultTrigger(StreamExecutionEnvironment env) {        return ProcessingTimeTrigger.create();    }​    @Override    public String toString() {        return "TumblingProcessingTimeWindows(" + size + ")";    }​    public static TumblingProcessingTimeWindows of(Time size) {        return new TumblingProcessingTimeWindows(size.toMilliseconds(), 0);    }​    public static TumblingProcessingTimeWindows of(Time size, Time offset) {        return new TumblingProcessingTimeWindows(size.toMilliseconds(), offset.toMilliseconds());    }​    @Override    public TypeSerializer<timewindow> getWindowSerializer(ExecutionConfig executionConfig) {        return new TimeWindow.Serializer();    }​    @Override    public boolean isEventTime() {        return false;    }}</timewindow></object></object>

TumblingProcessingTimeWindows继承了WindowAssigner,其中元素类型为Object,而窗口类型为TimeWindow;它有两个参数,一个是size,一个是offset,其中offset必须大于等于0,size必须大于offsetassignWindows方法获取的窗口为start及start+size,而start=TimeWindow.getWindowStartWithOffset(now, offset, size),而now值则为context.getCurrentProcessingTime(),则是与TumblingEventTimeWindows的不同之处,TumblingProcessingTimeWindows不使用timestamp参数来计算,它使用now值替代;getDefaultTrigger方法返回的是ProcessingTimeTrigger,而isEventTime方法返回的为falseTumblingProcessingTimeWindows也提供了of静态工厂方法,可以指定size及offset参数小结flink的Tumbling Window分为TumblingEventTimeWindows及TumblingProcessingTimeWindows,它们都继承了WindowAssigner,其中元素类型为Object,而窗口类型为TimeWindow;它有两个参数,一个是size,一个是offset,其中offset必须大于等于0,size必须大于offsetWindowAssigner定义了assignWindows、getDefaultTrigger、getWindowSerializer、isEventTime这几个抽象方法,同时定义了抽象静态类WindowAssignerContext;它有两个泛型,其中T为元素类型,而W为窗口类型;TumblingEventTimeWindows及TumblingProcessingTimeWindows的窗口类型为TimeWindow,它有start及end属性,其中start为inclusive,而end为exclusive,maxTimestamp返回的是end-1,它还提供了mergeWindows及getWindowStartWithOffset静态方法;前者用于合并重叠的时间窗口,后者用于获取指定timestamp、offset、windowSize的window startTumblingEventTimeWindows及TumblingProcessingTimeWindows的不同在于assignWindows、getDefaultTrigger、isEventTime方法;前者assignWindows使用的是参数中的timestamp,而后者使用的是now值;前者的getDefaultTrigger返回的是EventTimeTrigger,而后者返回的是ProcessingTimeTrigger;前者isEventTime方法返回的为true,而后者返回的为falsedocTumbling Windows

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
windows查看端口占用情况
windows查看端口占用情况

Windows端口可以认为是计算机与外界通讯交流的出入口。逻辑意义上的端口一般是指TCP/IP协议中的端口,端口号的范围从0到65535,比如用于浏览网页服务的80端口,用于FTP服务的21端口等等。怎么查看windows端口占用情况呢?php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

1431

2023.07.26

查看端口占用情况windows
查看端口占用情况windows

端口占用是指与端口关联的软件占用端口而使得其他应用程序无法使用这些端口,端口占用问题是计算机系统编程领域的一个常见问题,端口占用的根本原因可能是操作系统的一些错误,服务器也可能会出现端口占用问题。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

1163

2023.07.27

windows照片无法显示
windows照片无法显示

当我们尝试打开一张图片时,可能会出现一个错误提示,提示说"Windows照片查看器无法显示此图片,因为计算机上的可用内存不足",本专题为大家提供windows照片无法显示相关的文章,帮助大家解决该问题。

833

2023.08.01

windows查看端口被占用的情况
windows查看端口被占用的情况

windows查看端口被占用的情况的方法:1、使用Windows自带的资源监视器;2、使用命令提示符查看端口信息;3、使用任务管理器查看占用端口的进程。本专题为大家提供windows查看端口被占用的情况的相关的文章、下载、课程内容,供大家免费下载体验。

461

2023.08.02

windows无法访问共享电脑
windows无法访问共享电脑

在现代社会中,共享电脑是办公室和家庭的重要组成部分。然而,有时我们可能会遇到Windows无法访问共享电脑的问题。这个问题可能会导致数据无法共享,影响工作和生活的正常进行。php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

2361

2023.08.08

windows自动更新
windows自动更新

Windows操作系统的自动更新功能可以确保系统及时获取最新的补丁和安全更新,以提高系统的稳定性和安全性。然而,有时候我们可能希望暂时或永久地关闭Windows的自动更新功能。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

874

2023.08.10

windows boot manager
windows boot manager

windows boot manager无法开机的解决方法:1、系统文件损坏,使用Windows安装光盘或USB启动盘进入恢复环境,选择修复计算机,然后选择自动修复;2、引导顺序错误,进入恢复环境,选择命令提示符,输入命令"bootrec /fixboot"和"bootrec /fixmbr",然后重新启动计算机;3、硬件问题,使用硬盘检测工具进行扫描和修复;4、重装操作系统。本专题还提供其他解决

1948

2023.08.28

windows锁屏快捷键
windows锁屏快捷键

windows锁屏快捷键是Windows键+L、Ctrl+Alt+Del、Windows键+D、Windows键+P和Windows键+R。本专题为大家提供windows相关的文章、下载、课程内容,供大家免费下载体验。

1666

2023.08.30

JavaScript浏览器渲染机制与前端性能优化实践
JavaScript浏览器渲染机制与前端性能优化实践

本专题围绕 JavaScript 在浏览器中的执行与渲染机制展开,系统讲解 DOM 构建、CSSOM 解析、重排与重绘原理,以及关键渲染路径优化方法。内容涵盖事件循环机制、异步任务调度、资源加载优化、代码拆分与懒加载等性能优化策略。通过真实前端项目案例,帮助开发者理解浏览器底层工作原理,并掌握提升网页加载速度与交互体验的实用技巧。

23

2026.03.06

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
CSS3 教程
CSS3 教程

共18课时 | 6.7万人学习

PostgreSQL 教程
PostgreSQL 教程

共48课时 | 10.3万人学习

Django 教程
Django 教程

共28课时 | 4.8万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号