Flink中的UDF的实现
admin
2024-03-15 11:21:07
0

Flink 的 Table API 和 SQL 提供了多种自定义函数的接口,以抽象类的形式定义。当前 UDF
主要有以下几类:

  • 标量函数(Scalar Functions):将输入的标量值转换成一个新的标量值;
  • 表函数(Table Functions):将标量值转换成一个或多个新的行数据,也就是扩展成一个表;
  • 聚合函数(Aggregate Functions):将多行数据里的标量值转换成一个新的标量值;
  • 表聚合函数(Table Aggregate Functions):将多行数据里的标量值转换成一个或多个新的行数据;

标量函数(Scalar Functions)

        自定义标量函数可以把 0 个、 1 个或多个标量值转换成一个标量值,它对应的输入是一行数据中的字段,输出则是唯一的值。所以从输入和输出表中行数据的对应关系看,标量函数是“一对一”的转换,类似于hive中的UDF
        想要实现自定义的标量函数,我们需要自定义一个类来继承抽象类 ScalarFunction,并实
现叫作 eval() 的求值方法
。标量函数的行为就取决于求值方法的定义,它必须是公有(public),
而且名字必须是 eval。求值方法 eval 可以重载多次,任何数据类型都可作为求值方法的参数和返回值类型。

使用场景:求传入对象的哈希值

public static class HashFunction extends ScalarFunction {// 接受任意类型输入,返回 INT 型输出public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {return o.hashCode();}
}
// 注册函数
tableEnv.createTemporarySystemFunction("HashFunction", HashFunction.class);
// 在 SQL 里调用注册好的函数
tableEnv.sqlQuery("SELECT HashFunction(myField) FROM MyTable");

表函数(Table Functions)

        表函数的输入参数也可以是 0 个、1 个或多个标量值;不同的是,它可以返回任意多行数据。“多行数据”事实上就构成了一个表,所以“表函数”可以认为就是返回一个表的函数,这是一个“一对多”的转换关系,类似于Hive中的UDTF(窗口TVF本质上就是一个表函数)。让输入表中的每一行,与它转换得到的表进行联结(join),然后再拼成一个完整的大表,这就相当于对原来的表进行了扩展。在 Hive 的 SQL 语法中,提供了“侧向视图”(lateral view,也叫横向视图)的功能,可以将表中的一行数据拆分成多行;Flink SQL 也有类似的功能,是用 LATERAL TABLE 语法来实现的。

        实现自定义的表函数,需要自定义类来继承抽象类 TableFunction,内部必须要实现的也是一个名为 eval 的求值方法。与标量函数不同的是,TableFunction 类本身是有一个泛型参数T 的,这就是表函数返回数据的类型;而 eval()方法没有返回类型,内部也没有 return语句,是通过调用 collect()方法来发送想要输出的行数据的。

使用场景:对字段进行拆分,一行变多行。

// 注意这里的类型标注,输出是 Row 类型,Row 中包含两个字段:word 和 length。
@FunctionHint(output = @DataTypeHint("ROW"))
public static class SplitFunction extends TableFunction {public void eval(String str) {for (String s : str.split(" ")) {// 使用 collect()方法发送一行数据collect(Row.of(s, s.length()));}}
}
// 注册函数
tableEnv.createTemporarySystemFunction("SplitFunction", SplitFunction.class);
// 重命名侧向表中的字段
tableEnv.sqlQuery("SELECT myField, newWord, newLength " +"FROM MyTable " +"LEFT JOIN LATERAL TABLE(SplitFunction(myField)) AS T(newWord, newLength) ON TRUE");

聚合函数(Aggregate Functions)

        用户自定义聚合函数(User Defined AGGregate function,UDAGG)会把一行或多行数据
(也就是一个表)聚合成一个标量值。这是一个标准的“多对一”的转换,类似于Hive中的UDAF。聚合函数的概念我们之前已经接触过多次,如 SUM()、MAX()、MIN()、AVG()、COUNT()都是常见的系统内置聚合函数。
        自定义聚合函数需要继承抽象类 AggregateFunction。AggregateFunction 有两个泛型参数
,T 表示聚合输出的结果类型,ACC 则表示聚合的中间状态类型,所以要创建一个累加器。

使用场景:计算加权平均数

// 累加器类型定义
public static class WeightedAvgAccumulator {public long sum = 0; // 加权和public int count = 0; // 数据个数
}
// 自定义聚合函数,输出为长整型的平均值,累加器类型为 WeightedAvgAccumulator
public static class WeightedAvg extends AggregateFunction {@Overridepublic WeightedAvgAccumulator createAccumulator() {return new WeightedAvgAccumulator(); // 创建累加器}@Overridepublic Long getValue(WeightedAvgAccumulator acc) {if (acc.count == 0) {return null; // 防止除数为 0} else {return acc.sum / acc.count; // 计算平均值并返回}}// 累加计算方法,每来一行数据都会调用public void accumulate(WeightedAvgAccumulator acc, Long iValue, Integer 
iWeight) {acc.sum += iValue * iWeight;acc.count += iWeight;}
}// 注册自定义聚合函数
tableEnv.createTemporarySystemFunction("WeightedAvg", WeightedAvg.class);
// 调用函数计算加权平均值
Table result = tableEnv.sqlQuery("SELECT student, WeightedAvg(score, weight) FROM ScoreTable GROUP BY student");

表聚合函数(Table Aggregate Functions)

          用户自定义表聚合函数(UDTAGG)可以把一行或多行数据(也就是一个表)聚合成另 一张表,结果表中可以有多行多列。很明显,这就像表函数和聚合函数的结合体,是一个“多对多”的转换
        自定义表聚合函数需要继承抽象类 TableAggregateFunction。TableAggregateFunction结构和原理与 AggregateFunction 非常类似,同样有两个泛型参数,用一个 ACC 类型的累加器(accumulator)来存储聚合的中间结果。
        表聚合函数得到的是一张表;在流处理中做持续查询,应该每次都会把这个表重新计算输出。如果输入一条数据后,只是对结果表里一行或几行进行了更新(Update),这时我们重新计算整个表、全部输出显然就不够高效了。为了提高处理效率,TableAggregateFunction 还提供了一个 emitUpdateWithRetract()方法,它可以在结果表发生变化时,以“撤回”(retract)老数据、发送新数据的方式增量地进行更新。如果同时定义了 emitValue()和 emitUpdateWithRetract()两个方法,在进行更新操作时会优先调用 emitUpdateWithRetract()。

使用场景:Top N 查询
// 聚合累加器的类型定义,包含最大的第一和第二两个数据
public static class Top2Accumulator {public Integer first;public Integer second;
}
// 自定义表聚合函数,查询一组数中最大的两个,返回值为(数值,排名)的二元组
public static class Top2 extends TableAggregateFunction, 
Top2Accumulator> {@Overridepublic Top2Accumulator createAccumulator() {Top2Accumulator acc = new Top2Accumulator();acc.first = Integer.MIN_VALUE; // 为方便比较,初始值给最小值acc.second = Integer.MIN_VALUE;return acc;}// 每来一个数据调用一次,判断是否更新累加器public void accumulate(Top2Accumulator acc, Integer value) {
if (value > acc.first) {acc.second = acc.first;acc.first = value;} else if (value > acc.second) {acc.second = value;}}// 输出(数值,排名)的二元组,输出两行数据public void emitValue(Top2Accumulator acc, Collector> 
out) {if (acc.first != Integer.MIN_VALUE) {out.collect(Tuple2.of(acc.first, 1));}if (acc.second != Integer.MIN_VALUE) {out.collect(Tuple2.of(acc.second, 2));}}
}// 注册表聚合函数函数
tableEnv.createTemporarySystemFunction("Top2", Top2.class);
// 在 Table API 中调用函数
tableEnv.from("MyTable").groupBy($("myField")).flatAggregate(call("Top2", $("value")).as("value", "rank")).select($("myField"), $("value"), $("rank"));

        目前 SQL 中没有直接使用表聚合函数的方式,所以需要使用 Table API 的方式来调用。这里使用了 flatAggregate()方法,它就是专门用来调用表聚合函数的接口。

相关内容

热门资讯

小米亟待走出“等风来”的叙事逻... 5月26日盘后,小米集团发布了2026年第一季度的财报,营业收入录得991.4亿元,同比下降10.9...
聚焦直播带货、外卖等 市场监管... 新京报讯 据“市说新语”官微消息,近日,市场监管总局印发通知,部署各地市场监管部门自5月至12月开展...
原创 中... 5月13日,希腊总理米佐塔基斯在雅典出席活动时表示,霍尔木兹海峡通航持续受阻将导致通货膨胀,削弱经济...
邮储银行行长芦苇兼任公司首席合... 5月26日,邮储银行发布董事会决议公告,邮储银行行长芦苇自2026年5月26日起兼任邮储银行首席合规...
我愿意二次到店吗?小店主理人交... 来源:滚动播报 (来源:上观新闻) 咖啡店主理人可以去餐饮店体验一天,感受烟火气和客流管理;手工...
原创 深... 当政策暖风遇上资产配置需求,深圳楼市正上演一场“热度与信心齐飞”的戏码!上周(5.18-5.24),...
被封千万网红大蓝卷土重来:拉人... 蓝鲸新闻5月26日讯(记者 赵凯)“朋友圈散布经济恐慌言论制造焦虑,拉人头设多级返利,数百人入局、累...
抖音商城618前六日数据:消费... “清凉经济”热度高:抖音商城618首阶段空气循环扇订单量同比增长348% 作者 I 钱游 报道 I ...
金华有闲置贵金属想变现该怎么挑... 当下闲置物品处置、短期资金周转的需求日渐普遍,市面上的相关服务机构水平参差不齐,不少有黄金回收需求的...
千亿市值芯片企业完成IPO辅导... 【大河财立方消息】5月26日,新三板挂牌企业宸芯科技股份有限公司(证券简称:宸芯科技)公告,收到青岛...
NBBOSS R1全球首发 重... 5月26日,信人智能旗下全球首款企业家专属AI决策伙伴NBBOSS AI决策机器人R1正式全球首发。...
NFC果汁配料表“水”排第一?... 随着气温升高,果汁进入消费旺季。然而很多果汁产品的标注却让消费者感到困惑。比如:有的标注“纯果汁”,...
存储牛市与全民狂热:韩国股市泡... 2026年5月的韩国,正经历一场史无前例的资本狂欢。自2025年4月触底以来,KOSPI指数在18个...
下架,召回!双汇子公司猪肉抗生... 近日,黑龙江省市场监督管理局网站发布关于食品安全监督抽检信息的通告(2026年第7期)。 其中,望奎...
换帅潮席卷白酒圈 白酒本轮人事变动频次之高、画像之多元,几乎超过了过去任何一个周期。 5月19日,“河北王”老白干酒宣...
4月意大利起泡酒猛增122.5... 近日,海关总署公布了2026年4月葡萄酒进口数据。其中,起泡酒表现尤为突出,进口量同比增长35.8%...
华为“韬定律”提振港股半导体股... 财联社5月26日讯(编辑 胡家荣)半导体产业链个股集体走强。截至发稿,华虹半导体(01347.HK)...
历史不会重演,但会惊人相似:中... 金价疯涨别乱买!复刻2015年走势,普通人记住3个保命妙招 最近逛商场,最大的感受就是黄金柜台太热闹...
商品标签被指涉嫌性暗示,盒马道... 近日,盒马旗下一款粉木耳产品因标签设计引发争议,不少网友吐槽该商品标签低俗,涉嫌性暗示。 25日晚...