Flink学习:WaterMark
创始人
2025-05-29 04:05:01
0

WaterMark

  • 一、什么是水位线?
  • 二、案例分析
  • 三、如何生成水位线?
    • (一)、在SourceFunction中直接定义Timestamps和Watermarks
    • (二)、自定义生成Timstamps和Watermarks

一、什么是水位线?

  • 通常情况下,由于网络或系统等外部因素影响,事件数据往往不能及时传输至Flink系统中,导致数据乱序或者延迟到达等问题,因此,需要有一种机制能够控制数据处理的过程和进度,这种机制就是水位线
  • 水位线本质上是一个时间戳,且是动态变化的,会根据最大事件时间生成
watermark = 进入Flink窗口的最大事件时间(maxEventTime) - 一定的延迟时间(t)
//这个延迟时间t是在程序当中配置的
  • watermark时间戳是与窗口结束时间比较的,当watermark大于窗口结束时间时,意味着窗口结束,需要触发窗口计算
  • 举个例子,某条数据的事件时间为2023:03:16 9:00:00,它的下一条数据的事件时间为2023:03:16 9:06:00,窗口设置为滚动窗口为5分钟,延迟时间t设置为2分钟,此时窗口结束时间为2023:03:16 9:00:00,水位线是2023:03:16 9:06:00 - 2min = 2023:03:16 9:04:00,watermark < window endtime,这两条数据应该在同一个窗口内,下面是具体的例子

二、案例分析

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.api.java.tuple._
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{TableEnvironment, Types}
import org.apache.flink.table.sinks.CsvTableSink
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.types.Rowcase class User(id:Int,name:String,age:Int,timestamp:Long)
object SqlTest {def main(args: Array[String]): Unit = {val streamEnv = StreamExecutionEnvironment.getExecutionEnvironmentval tEnv = TableEnvironment.getTableEnvironment(streamEnv)//指定时间类型为事件时间streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val stream = streamEnv.fromElements(User(1,"nie",22,1511658000),User(2,"hu",20,1511658000),User(2,"xiao",19,1511658000)).assignAscendingTimestamps(_.timestamp * 1000L) //指定水位线tEnv.registerDataStream("testTable",stream,'id, 'name,'age,'event_time.rowtime)val result = tEnv.sqlQuery("select id,sum(age) from testTable group by TUMBLE(event_time,INTERVAL '5' MINUTE),id")result.toRetractStream[Row].print()streamEnv.execute("windowTest")}
}
  • 如上述代码所示,三条数据时间戳一样,也就是事件时间相同,水位线为自带的时间戳*1000L转成毫秒,也就是没有水位线,滚动窗口时间间隔为5min,这时计算结果应该是3条数据都在一个窗口中计算,最终会产生2条数据

在这里插入图片描述

  • 如下所示,把第三条时间戳增加300,也就是增加了5分钟,下面三条数据的真实日期分别为2017-11-26 9:0:0、2017-11-26
    9:0:0、2017-11-26 9:5:0
    val stream = streamEnv.fromElements(User(1,"nie",22,1511658000),User(2,"hu",20,1511658000),User(2,"xiao",19,1511658300)).assignAscendingTimestamps(_.timestamp * 1000L) //指定水位线
  • 第三条数据正好晚了5分钟,此时前两条数据在一个窗口,第三条数据在一个窗口,最终应该产生三条数据,如下所示

在这里插入图片描述

  • 此时设置水位线为延迟一分钟,相当于第三条数据在第二条数据后第4分钟到的
    val stream = streamEnv.fromElements(User(1,"nie",22,1511658000),User(2,"hu",20,1511658000),User(2,"xiao",19,1511658300)).assignAscendingTimestamps(_.timestamp + 100) //指定水位线
  • 此时又是第一种情况,三条数据应该都在一个窗口中,如下所示

在这里插入图片描述

三、如何生成水位线?

生成水位线分为两步:

  • 第一步需要指定eventTime,可以通过StreamExecutionEnvironment的TimeCharacteristic指定,还需要在Flink程序中指定event
    time时间戳在数据中的字段信息,在Flink程序中会通过指定字段抽取出对应的事件时间,该过程叫做Timestamps Assigning
  • 第二步就是创建相应的Watermarks,需要用户定义根据Timestamps计算出Watermarks的生成策略
  • 目前Flink支持两种方式指定Timestamps和生成WaterMarks,一种方式在DataStream Source算子接口的Source Function定义,另一种方式是通过自定义Timestamp Assigner和Watermark Generator生成

(一)、在SourceFunction中直接定义Timestamps和Watermarks

(二)、自定义生成Timstamps和Watermarks

自定义生成分为两种:

  • Periodic Watermarks:根据设定时间间隔周期性地生成Watermarks
  • Punctuated Watermarks:根据接入数据的数量生成

1、Periodic Watermarks

  • Periodic Watermark又分为两种:升序模式和固定时延间隔

1)、升序模式

  • 会将数据中的Timestamp根据指定字段提取,并用当前的Timestamp作为最新的watermarks,适用于事件按顺序生成

eg:
2)、使用固定时延间隔的Timestamp Assigner

  • 通过设定固定的时间间隔来指定Watermark落后于Timestamp的区间长度,也就是最长容忍到多长时间内的数据到达系统

eg:

2、Punctuated Watermarks

上一篇:图解GPT-2

下一篇:javah生成jni头文件

相关内容

热门资讯

FIBA期待杨瀚森表现 最新实... 北京时间6月25日消息,FIBA国际篮联公布了最新一期世界杯预选赛亚太区球队实力榜,中国男篮排在澳大...
收评:创业板指放量反弹涨2.8... 市场冲高回落后,再度震荡拉升。黄白线分化明显,权重股走势较强。量能明显放大,沪深两市成交额3.59万...
巨头财报引爆A股存储芯片板块,... 当地时间6月24日美股盘后, 美光科技(MU.US)公布截至5月31日的2026财年第三财季财报,业...
银行、消金公司助贷余额增速不得... 近日,中国证券报记者从多位业内人士处独家获悉,5月以来,多地金融监管部门对部分中小银行、消金公司下达...
朱鸿接任陈航,担任钉钉科技有限... 消费日报-今朝新闻讯 天眼查显示,6月23日,钉钉科技有限公司发生工商变更,陈航卸任法定代表人、董事...
3日累跌超20%,德创环保:公... 6月25日, 德创环保(603177.SH)公告,公司股票于2026年6月23日、6月24日和6月2...
北京发布2026年第七轮拟供商... 央广网北京6月25日消息(记者门庭婷)6月25日,北京市规划和自然资源委员会网站发布了2026年第七...
开放麦 | 启明创投胡奇:从A... “2026年,创投圈的浪潮再次翻涌:AI从技术概念走进产业深水区,硬科技创业从“小众赛道” 变成“主...
腾讯孙忠怀:在行业转身处 6月24日,2026腾讯视频年度发布在上海举行。腾讯公司副总裁、腾讯在线视频董事长孙忠怀以《在行业转...
加息,突变!美联储,重磅传来!... 美联储政策路径突生变数。 美国商务部经济分析局最新公布的数据显示,5月个人消费支出(PCE)物价指数...
6月合肥上门收金必看!5步避坑... 2026年6月,合肥黄金市场持续高位运行,不少市民翻出家里闲置的旧金饰、投资金条想变现,上门回收因为...
潮汕女富豪挂帅后加码液冷!祥鑫... 潮汕女强人,带着百亿公司加码液冷散热。 6月24日晚间,祥鑫科技(002965.SZ)公告称,公司董...
马斯克向太空要电,GobiX ... 一场关于「去哪里找电」的全球竞赛,正在朝两个方向展开。 作者|周永亮 编辑| 郑玄 「太空光伏是不是...
原料药行业陷入周期低谷 有药企... 每经记者|许立波 每经编辑|魏文艺 “过完年到现在,我们整个团队每个月都在出差,跑遍了亚非拉、欧美市...
家门口筛查白内障!永顺泽家镇暖... 大众卫生报·新湖南客户端6月25日讯(通讯员 彭雪姣)为切实解决辖区老年性白内障患者异地就医奔波、就...
终于等到!油价马上再大跌,这个... 点击添加图片描述(最多60个字) 编辑 各位车主朋友,好消息接二连三! 继6月18日油价大幅下调...
丈量出海新路 世界酒庄影响力指... 长期以来,全球酒庄评价体系由西方机构主导,且大多局限于单一酒种、单一评价维度,这一局面正逐渐被打破。...
峰瑞资本创始合伙人李丰:从资本... “2026年,创投圈的浪潮再次翻涌:AI从技术概念走进产业深水区,硬科技创业从“小众赛道” 变成“主...
原创 A... 迈向成熟,还有茁壮成长的机会。 作者 | 方璐 编辑丨于婞 来源 | 野马财经 2026年6月21日...
为企业解锁出海新通道!亚太中小... 6月24日下午,作为2026年APEC中小企业工商论坛的重要组成部分,亚太中小企业国际化合作发展论坛...