0

0

[源码解析] Flink UDAF 背后做了什么

星夢妙者

星夢妙者

发布时间:2025-09-09 09:11:01

|

477人浏览过

|

来源于php中文网

原创

源码解析 flink udaf 背后做了什么

0x00 摘要

本文涉及到Flink SQL UDAF,Window 状态管理等部分,希望能起到抛砖引玉的作用,让大家可以借此深入了解这个领域。

0x01 概念1.1 概念

大家知道,Flink的自定义聚合函数(UDAF)可以将多条记录聚合成1条记录,这功能是通过accumulate方法来完成的,官方参考指出:

但是实时计算还有一些特殊的场景,在此场景下,还需要提供merge方法才能完成。

1.2 疑问

之前因为没亲身操作,所以一直忽略merge的特殊性。最近无意中看到了一个UDAF的实现,突然觉得有一个地方很奇怪,即 accumulate 和 merge 这两个函数不应该定义在一个类中。因为这是两个完全不同的处理方法。应该定义在两个不同的类中。

比如用UDAF做word count,则:

accumulate 是在一个task中累积数字,其实就相当于 map;merge 是把很多task的结果再次累积起来,就相当于 reduce;

然后又想出了一个问题:Flink是如何管理 UDAF的accumulator?其状态存在哪里?

看起来应该是Flink在背后做了一些黑魔法,把这两个函数从一个类中拆分了。为了验证我们的推测,让我们从源码入手来看看这些问题:

Flink SQL转换/执行计划生成阶段,如何处理在 "同一个类中" 的不同类型功能函数 accumulate 和 merge?Flink runtime 如何处理 merge?Flink runtime 如何处理 UDAF的accumulator的历史状态?1.3 UDAF示例代码

示例代码摘要如下 :

代码语言:javascript代码运行次数:0运行复制

public class CountUdaf extends AggregateFunction<long countudaf.countaccum> {    //定义存放count UDAF状态的accumulator的数据的结构。    public static class CountAccum {        public long total;    }      //初始化count UDAF的accumulator。    public CountAccum createAccumulator() {        CountAccum acc = new CountAccum();        acc.total = 0;        return acc;    }      //accumulate提供了,如何根据输入的数据,更新count UDAF存放状态的accumulator。    public void accumulate(CountAccum accumulator, Object iValue) {        accumulator.total++;    }    public void merge(CountAccum accumulator, Iterable<countaccum> its) {        for (CountAccum other : its) {            accumulator.total += other.total;        }    }}</countaccum></long>

0x02 批处理

批处理相对简单,因为数据是有边界的,其逻辑比较清晰。

2.1 代码

首先给出测试代码

代码语言:javascript代码运行次数:0运行复制

val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))// register the DataSet as a view "WordCount"tEnv.createTemporaryView("WordCount", input, 'word, 'frequency)tEnv.registerFunction("countUdaf", new CountUdaf())// run a SQL query on the Table and retrieve the result as a new Tableval table = tEnv.sqlQuery("SELECT word, countUdaf(frequency), SUM(frequency) FROM WordCount GROUP BY word")case class WC(word: String, frequency: Long)

2.2 计划生成

DataSetAggregate.translateToPlan

中生成了执行计划。原来Flink把 SQL 语句分割成两个阶段:

combineGroupreduceGroup

于是我们推断,这很有可能就是 combineGroup 调用accumulate,reduceGroup 调用 merge。

关于combineGroup,如果有兴趣,可以看看我之前文章 [源码解析] Flink的groupBy和reduce究竟做了什么 以及 源码解析 GroupReduce,GroupCombine 和 Flink SQL group by](https://cloud.tencent.com/developer/article/1693307)

代码语言:javascript代码运行次数:0运行复制

override def translateToPlan(tableEnv: BatchTableEnvImpl,    queryConfig: BatchQueryConfig): DataSet[Row] = {    if (grouping.length > 0) {      // grouped aggregation      if (preAgg.isDefined) {        // 执行到这里        inputDS          // pre-aggregation          .groupBy(grouping: _*)          .combineGroup(preAgg.get) // 第一阶段          .returns(preAggType.get)          .name(aggOpName)                    // final aggregation          .groupBy(grouping.indices: _*)          .reduceGroup(finalAgg.right.get) // 第二阶段          .returns(rowTypeInfo)          .name(aggOpName)      }    }}

SQL语句对应的执行计划大致为:

[源码解析] Flink UDAF 背后做了什么

2.3 执行

在执行看,确实对应了两个阶段。

阶段 1 确实是 GroupReduceCombineDriver 调用到了 accumulate。

代码语言:javascript代码运行次数:0运行复制

//堆栈如下accumulate:25, CountUdaf (mytest)accumulate:-1, DataSetAggregatePrepareMapHelper$5combine:71, DataSetPreAggFunction (org.apache.flink.table.runtime.aggregate)sortAndCombine:213, GroupReduceCombineDriver (org.apache.flink.runtime.operators)run:188, GroupReduceCombineDriver (org.apache.flink.runtime.operators)  //SQL UDAF生成的代码如下  function = {DataSetAggregatePrepareMapHelper$5@10085}  function_mytest$CountUdaf$5ae272a09e5f36214da5c4e5436c4c48 = {CountUdaf@10079} "CountUdaf" function_org$apache$flink$table$functions$aggfunctions$LongSumAggFunction$a5214701531789b3139223681d = {LongSumAggFunction@10087} "LongSumAggFunction"  

阶段 2 中 GroupReduceDriver 调用到了 merge

代码语言:javascript代码运行次数:0运行复制

//堆栈如下merge:29, CountUdaf (mytest)mergeAccumulatorsPair:-1, DataSetAggregateFinalHelper$6reduce:71, DataSetFinalAggFunction (org.apache.flink.table.runtime.aggregate)run:131, GroupReduceDriver (org.apache.flink.runtime.operators)  //SQL UDAF生成的代码如下   function = {DataSetAggregateFinalHelper$6@10245}  function_mytest$CountUdaf$5ae272a09e5f36214da5c4e5436c4c48 = {CountUdaf@10238} "CountUdaf" function_org$apache$flink$table$functions$aggfunctions$LongSumAggFunction$a5214701531789b3139223681d = {LongSumAggFunction@10247} "LongSumAggFunction"  

Flink对用户定义的UDAF代码分别生成了两个不同的功能类:

DataSetAggregatePrepareMapHelper : 用于Combine阶段,调用了accumulateDataSetAggregateFinalHelper :用于Reduce阶段,调用了merge2.4 状态管理

UDAF有一个accumulator,这个会在程序运行过程中始终存在,Flink是如何管理这个accumulator呢?

GroupReduceCombineDriver类有一个成员变量 combiner,

代码语言:javascript代码运行次数:0运行复制

public class GroupReduceCombineDriver<in out> implements Driver<groupcombinefunction out>, OUT> {  private GroupCombineFunction<in out> combiner;}</in></groupcombinefunction></in>

而 combiner 被赋予了 DataSetPreAggFunction 类的一个实例。

代码语言:javascript代码运行次数:0运行复制

class DataSetPreAggFunction(genAggregations: GeneratedAggregationsFunction)  extends AbstractRichFunction{  private var accumulators: Row = _ //这里存储历史状态  private var function: GeneratedAggregations = _}

Flink就是把 UDAF的accumulator 存储在

combiner.accumulators

中,我们可以看到,无论用户定义了什么类型作为 accumulator,Flink都用万能类型 Row 搞定。

代码语言:javascript代码运行次数:0运行复制

combiner = {DataSetPreAggFunction@10063}  genAggregations = {GeneratedAggregationsFunction@10070}  accumulators = {Row@10117} "mytest.CountUdaf$CountAccum@1e343db7,(0,false)" function = {DataSetAggregatePrepareMapHelper$5@10066}  // function是包含用户代码的功能类。  function_mytest$CountUdaf$5ae272a09e5f36214da5c4e5436c4c48 = {CountUdaf@10076} "CountUdaf" 

2.5 总结

让我们总结一下,批处理被分成两个阶段:

combineGroup :根据用户UDAF代码生成功能类 DataSetAggregatePrepareMapHelper,用于Combine阶段,调用了accumulate;reduceGroup :根据用户UDAF代码生成功能类 DataSetAggregateFinalHelper,用于Reduce阶段,调用了 merge;

Flink在GroupReduceCombineDriver类的成员变量 combiner 中存储 accumulator历史状态。

0x03 流处理

Avatar AI
Avatar AI

AI成像模型,可以从你的照片中生成逼真的4K头像

下载

流处理则是和批处理完全不同的世界,下面我们看看流处理背后有什么奥秘。

在流计算场景中,数据没有边界源源不断的流入的,每条数据流入都可能会触发计算,比如在进行count或sum这些操作是如何计算的呢?

是选择每次触发计算将所有流入的历史数据重新计算一遍?还是每次计算都基于上次计算结果进行增量计算呢?如果选择增量计算,那么上一次的中间计算结果保存在哪里?内存?3.1 示例代码代码语言:javascript代码运行次数:0运行复制

val query: Table = tableEnv.sqlQuery(  """    |SELECT    |countUdaf(num)    |FROM tb_num    |GROUP BY TUMBLE(proctime, INTERVAL '10' SECOND)   """.stripMargin)

3.2 计划生成

DataStreamGroupWindowAggregateBase.translateToPlan

函数中完成了计划生成。根据Stream的类型(是否有key),会走不同的逻辑业务。

WindowedStream

代表了根据key分组,并且基于

WindowAssigner

切分窗口的数据流。所以

WindowedStream

都是从

KeyedStream

衍生而来的。在key分组的流上进行窗口切分是比较常用的场景,也能够很好地并行化(不同的key上的窗口聚合可以分配到不同的task去处理)。当在普通流(没有key)上进行窗口操作时,就要用到

AllWindowedStream

AllWindowedStream

是直接在

DataStream

上进行

windowAll(...)

操作。在普通流上进行窗口操作,就势必需要将所有分区的流都汇集到单个的Task中,而这个单个的Task很显然就会成为整个Job的瓶颈。

我们的示例代码是基于Key的,所以走

WindowedStream

分支,即一个 window 中即做accumulate,又做merge。

代码语言:javascript代码运行次数:0运行复制

// grouped / keyed aggregationif (grouping.length > 0) {      // 有key,所以是 WindowedStream,我们示例走这里      val windowFunction = AggregateUtil.createAggregationGroupWindowFunction(...)      val keySelector = new CRowKeySelector(grouping, inputSchema.projectedTypeInfo(grouping))      val keyedStream = timestampedInput.keyBy(keySelector)      val windowedStream =        createKeyedWindowedStream(queryConfig, window, keyedStream)          .asInstanceOf[WindowedStream[CRow, Row, DataStreamWindow]]      val (aggFunction, accumulatorRowType) =        AggregateUtil.createDataStreamGroupWindowAggregateFunction(...)      windowedStream        .aggregate(aggFunction, windowFunction, accumulatorRowType, outRowType)        .name(keyedAggOpName)}// global / non-keyed aggregationelse {      // 没有key,所以是AllWindowedStream       val windowFunction = AggregateUtil.createAggregationAllWindowFunction(...)      val windowedStream =        createNonKeyedWindowedStream(queryConfig, window, timestampedInput)          .asInstanceOf[AllWindowedStream[CRow, DataStreamWindow]]      val (aggFunction, accumulatorRowType) =        AggregateUtil.createDataStreamGroupWindowAggregateFunction(...)      windowedStream        .aggregate(aggFunction, windowFunction, accumulatorRowType, outRowType)        .name(nonKeyedAggOpName)}

SQL语句对应的执行计划大致如下,我们能看出来 accumulate & merge 都在 Window 中处理。

[源码解析] Flink UDAF 背后做了什么

3.3 执行 & 状态管理

可以看到,流处理对UDAF的管理,就完全是进入了Window的地盘,而UDAF历史状态管理其实就是Flink Window状态管理的领域了。

我们以基于key的WindowedStream为例继续进行研究。

3.3.1 接受到一个新输入

当Window接受到一个输入item时候,item会被分配到一个key,由KeySelector完成。WindowOperator 类首先使用用户选择的 windowAssigner 将流入的数据分配到响应的window中,有可能是1个,0个甚至多个window。这里就会做accumulate。

本例

windowAssigner = {TumblingProcessingTimeWindows}

,进入到processElement函数的 非 MergingWindow部分,具体流程如下:

遍历elementWindows,进行业务处理 1)判断该window是否已过期,isWindowLate(window)2)获取该window的context,windowState.setCurrentNamespace(window); 这里是 HeapAggregatingState。3)将数据加入,windowState.add(element.getValue()); 3.1)调用 stateTable.transform();处理输入 3.1.1)StateMap stateMap = getMapForKeyGroup(keyGroup); 这里获取到CopyOnWriteStateMap3.1.2)stateMap.transform(key, namespace, value, transformation); 3.1.2.1)调用 AggregateTransformation.apply,其又调用 aggFunction.add(value, accumulator); 3.1.2.1.1)调用 GroupingWindowAggregateHelper.accumulate(accumulatorRow, value.row),其又调用 用户定义的 accumulate;

可以看到,是 windowState 添加元素时候,调用到State的API,然后间接调用到了UDAF。

3.3.2 windowState & UDAF执行

windowState 以 window 为 namespace,以隔离不同的window的context。这里虽然叫做 windowState 。但是可以发现,该类存储的是不同window中的对应的原始数据(processWindowFunction情况)或结果(ReduceFunction/AggregateFunction情况)。我们此例中,存储的是执行结果。

本例用到的 window process 是 Incremental Aggregation Functions。即 ReduceFunction 与 AggregateFunction ,其特点是无需保存 window 中的所有数据,一旦新数据进入,便可与之前的中间结果进行计算,因此这种 window 中其状态仅需保存一个结果便可。

因此这里我们拿到的是 HeapReducingState, HeapAggregatingState,当执行到

windowState.add(element.getValue());
语句时,便调用UDAF得出结果。

3.3.3 State & 结果存储

在flink中state用来存放计算过程的节点中间结果或元数据。在flink内部提供三种state存储实现

内存HeapStateBackend:存放数据量小,用于开发测试使用;生产不建议使用HDFS的FsStateBackend :分布式文件持久化,每次都会产生网络io,可用于大state,不支持增量;可用于生产RocksDB的RocksDBStateBackend:本地文件 + 异步hdfs持久化,也可用于大state数据量,唯一支持增量,可用于生产;

我们这里拿到的是 HeapAggregatingState。

3.3.4 State 存储结构

以三元组的形式存储保存数据,即 key, namespace, value。

代码语言:javascript代码运行次数:0运行复制

public abstract class StateTable<k n s>implements StateSnapshotRestore, Iterable<stateentry n s>> {   /**   * Map for holding the actual state objects. The outer array represents the key-groups.   * All array positions will be initialized with an empty state map.   */protected final StateMap<k n s>[] keyGroupedStateMaps;}// 真实中变量摘录如下keyGroupedStateMaps = {StateMap[1]@9266}  0 = {CopyOnWriteStateMap@9262} // 这里就是将要保存用户accumulator的地方  stateSerializer = {RowSerializer@9254}   snapshotVersions = {TreeSet@9277}  size = 0  primaryTable = {CopyOnWriteStateMap$StateMapEntry[128]@9278}   incrementalRehashTable = {CopyOnWriteStateMap$StateMapEntry[2]@9280}   lastNamespace = {TimeWindow@9239} "TimeWindow{start=1593934200000, end=1593934210000}"</k></stateentry></k>

在上面提及的

3.1.2)stateMap.transform(key, namespace, value, transformation);

代码语言:javascript代码运行次数:0运行复制

@Overridepublic <t> void transform(   K key,   N namespace,   T value,   StateTransformationFunction<s t> transformation) throws Exception {   final StateMapEntry<k n s> entry = putEntry(key, namespace);   // copy-on-write check for state   entry.state = transformation.apply(      (entry.stateVersion 3.4 总结<p>流处理对UDAF的管理,就完全是进入了Window的地盘,而UDAF历史状态管理其实就是Flink Window状态管理的领域了。</p>window接受到新输入,就会往 windowState 添加元素。windowState 添加元素时候,调用到State的API,然后间接调用到了UDAFwindowState 在本例存储的是UDAF执行结果。具体存储是在HeapAggregatingState中完成。0xFF 参考<p>Flink - 当数据流入window时,会发生什么</p><p>Flink SQL 自定义UDAF</p><p>自定义聚合函数(UDAF)</p><p>Apache Flink - 常见数据流类型</p><p>Flink-SQL源码解读(一)window算子的创建的源码分析</p><p>从udaf谈flink的state</p><p>Apache Flink - 常见数据流类型</p><p>Flink状态管理(二)状态数据结构和注册流程</p></k></s></t>

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
数据分析工具有哪些
数据分析工具有哪些

数据分析工具有Excel、SQL、Python、R、Tableau、Power BI、SAS、SPSS和MATLAB等。详细介绍:1、Excel,具有强大的计算和数据处理功能;2、SQL,可以进行数据查询、过滤、排序、聚合等操作;3、Python,拥有丰富的数据分析库;4、R,拥有丰富的统计分析库和图形库;5、Tableau,提供了直观易用的用户界面等等。

1133

2023.10.12

SQL中distinct的用法
SQL中distinct的用法

SQL中distinct的语法是“SELECT DISTINCT column1, column2,...,FROM table_name;”。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

340

2023.10.27

SQL中months_between使用方法
SQL中months_between使用方法

在SQL中,MONTHS_BETWEEN 是一个常见的函数,用于计算两个日期之间的月份差。想了解更多SQL的相关内容,可以阅读本专题下面的文章。

381

2024.02.23

SQL出现5120错误解决方法
SQL出现5120错误解决方法

SQL Server错误5120是由于没有足够的权限来访问或操作指定的数据库或文件引起的。想了解更多sql错误的相关内容,可以阅读本专题下面的文章。

2152

2024.03.06

sql procedure语法错误解决方法
sql procedure语法错误解决方法

sql procedure语法错误解决办法:1、仔细检查错误消息;2、检查语法规则;3、检查括号和引号;4、检查变量和参数;5、检查关键字和函数;6、逐步调试;7、参考文档和示例。想了解更多语法错误的相关内容,可以阅读本专题下面的文章。

380

2024.03.06

oracle数据库运行sql方法
oracle数据库运行sql方法

运行sql步骤包括:打开sql plus工具并连接到数据库。在提示符下输入sql语句。按enter键运行该语句。查看结果,错误消息或退出sql plus。想了解更多oracle数据库的相关内容,可以阅读本专题下面的文章。

1683

2024.04.07

sql中where的含义
sql中where的含义

sql中where子句用于从表中过滤数据,它基于指定条件选择特定的行。想了解更多where的相关内容,可以阅读本专题下面的文章。

585

2024.04.29

sql中删除表的语句是什么
sql中删除表的语句是什么

sql中用于删除表的语句是drop table。语法为drop table table_name;该语句将永久删除指定表的表和数据。想了解更多sql的相关内容,可以阅读本专题下面的文章。

440

2024.04.29

C# ASP.NET Core微服务架构与API网关实践
C# ASP.NET Core微服务架构与API网关实践

本专题围绕 C# 在现代后端架构中的微服务实践展开,系统讲解基于 ASP.NET Core 构建可扩展服务体系的核心方法。内容涵盖服务拆分策略、RESTful API 设计、服务间通信、API 网关统一入口管理以及服务治理机制。通过真实项目案例,帮助开发者掌握构建高可用微服务系统的关键技术,提高系统的可扩展性与维护效率。

3

2026.03.11

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
React 教程
React 教程

共58课时 | 6万人学习

TypeScript 教程
TypeScript 教程

共19课时 | 3.4万人学习

Bootstrap 5教程
Bootstrap 5教程

共46课时 | 3.6万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号