Flink学习:WaterMark
创始人
2025-05-29 04:05:01
0

WaterMark

  • 一、什么是水位线?
  • 二、案例分析
  • 三、如何生成水位线?
    • (一)、在SourceFunction中直接定义Timestamps和Watermarks
    • (二)、自定义生成Timstamps和Watermarks

一、什么是水位线?

  • 通常情况下,由于网络或系统等外部因素影响,事件数据往往不能及时传输至Flink系统中,导致数据乱序或者延迟到达等问题,因此,需要有一种机制能够控制数据处理的过程和进度,这种机制就是水位线
  • 水位线本质上是一个时间戳,且是动态变化的,会根据最大事件时间生成
watermark = 进入Flink窗口的最大事件时间(maxEventTime) - 一定的延迟时间(t)
//这个延迟时间t是在程序当中配置的
  • watermark时间戳是与窗口结束时间比较的,当watermark大于窗口结束时间时,意味着窗口结束,需要触发窗口计算
  • 举个例子,某条数据的事件时间为2023:03:16 9:00:00,它的下一条数据的事件时间为2023:03:16 9:06:00,窗口设置为滚动窗口为5分钟,延迟时间t设置为2分钟,此时窗口结束时间为2023:03:16 9:00:00,水位线是2023:03:16 9:06:00 - 2min = 2023:03:16 9:04:00,watermark < window endtime,这两条数据应该在同一个窗口内,下面是具体的例子

二、案例分析

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.api.java.tuple._
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{TableEnvironment, Types}
import org.apache.flink.table.sinks.CsvTableSink
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.types.Rowcase class User(id:Int,name:String,age:Int,timestamp:Long)
object SqlTest {def main(args: Array[String]): Unit = {val streamEnv = StreamExecutionEnvironment.getExecutionEnvironmentval tEnv = TableEnvironment.getTableEnvironment(streamEnv)//指定时间类型为事件时间streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val stream = streamEnv.fromElements(User(1,"nie",22,1511658000),User(2,"hu",20,1511658000),User(2,"xiao",19,1511658000)).assignAscendingTimestamps(_.timestamp * 1000L) //指定水位线tEnv.registerDataStream("testTable",stream,'id, 'name,'age,'event_time.rowtime)val result = tEnv.sqlQuery("select id,sum(age) from testTable group by TUMBLE(event_time,INTERVAL '5' MINUTE),id")result.toRetractStream[Row].print()streamEnv.execute("windowTest")}
}
  • 如上述代码所示,三条数据时间戳一样,也就是事件时间相同,水位线为自带的时间戳*1000L转成毫秒,也就是没有水位线,滚动窗口时间间隔为5min,这时计算结果应该是3条数据都在一个窗口中计算,最终会产生2条数据

在这里插入图片描述

  • 如下所示,把第三条时间戳增加300,也就是增加了5分钟,下面三条数据的真实日期分别为2017-11-26 9:0:0、2017-11-26
    9:0:0、2017-11-26 9:5:0
    val stream = streamEnv.fromElements(User(1,"nie",22,1511658000),User(2,"hu",20,1511658000),User(2,"xiao",19,1511658300)).assignAscendingTimestamps(_.timestamp * 1000L) //指定水位线
  • 第三条数据正好晚了5分钟,此时前两条数据在一个窗口,第三条数据在一个窗口,最终应该产生三条数据,如下所示

在这里插入图片描述

  • 此时设置水位线为延迟一分钟,相当于第三条数据在第二条数据后第4分钟到的
    val stream = streamEnv.fromElements(User(1,"nie",22,1511658000),User(2,"hu",20,1511658000),User(2,"xiao",19,1511658300)).assignAscendingTimestamps(_.timestamp + 100) //指定水位线
  • 此时又是第一种情况,三条数据应该都在一个窗口中,如下所示

在这里插入图片描述

三、如何生成水位线?

生成水位线分为两步:

  • 第一步需要指定eventTime,可以通过StreamExecutionEnvironment的TimeCharacteristic指定,还需要在Flink程序中指定event
    time时间戳在数据中的字段信息,在Flink程序中会通过指定字段抽取出对应的事件时间,该过程叫做Timestamps Assigning
  • 第二步就是创建相应的Watermarks,需要用户定义根据Timestamps计算出Watermarks的生成策略
  • 目前Flink支持两种方式指定Timestamps和生成WaterMarks,一种方式在DataStream Source算子接口的Source Function定义,另一种方式是通过自定义Timestamp Assigner和Watermark Generator生成

(一)、在SourceFunction中直接定义Timestamps和Watermarks

(二)、自定义生成Timstamps和Watermarks

自定义生成分为两种:

  • Periodic Watermarks:根据设定时间间隔周期性地生成Watermarks
  • Punctuated Watermarks:根据接入数据的数量生成

1、Periodic Watermarks

  • Periodic Watermark又分为两种:升序模式和固定时延间隔

1)、升序模式

  • 会将数据中的Timestamp根据指定字段提取,并用当前的Timestamp作为最新的watermarks,适用于事件按顺序生成

eg:
2)、使用固定时延间隔的Timestamp Assigner

  • 通过设定固定的时间间隔来指定Watermark落后于Timestamp的区间长度,也就是最长容忍到多长时间内的数据到达系统

eg:

2、Punctuated Watermarks

上一篇:图解GPT-2

下一篇:javah生成jni头文件

相关内容

热门资讯

银价推涨光伏组件报价,下游企业... 来源:第一财经 受成本端银价上涨影响,本周光伏组件价格再次上调。据行业机构Infolink Cons...
黄金史诗级暴跌,原因可能与一纸... 当地时间1月30日,随着美联储前理事凯文·沃什(Kevin Warsh)正式被美国总统特朗普提名为下...
深圳国资七亿下场扫货白石洲? 来源:市场资讯 (来源:深圳房产在线) 最近看到,近日一则消息引发关注,就是今年1月发生一宗白石洲大...
国投智能2025业绩承压 AI... 来源:财联社 财联社1月30日讯(记者 方彦博)2025年,AI应用的商业化落地是众多AI企业面临的...
原创 男... 在爱情的海洋中,星座的波涛有时能揭示出隐藏的情感暗流。当男人在愤怒的风暴中显露出四种迹象时,或许他并...
农业银行董事长谷澍会见英格兰银... 来源:市场资讯 来源:中国农业银行 1月29日,农业银行董事长谷澍会见了英格兰银行副行长兼英国审慎监...
“易中天”,业绩大爆发!需求增... “易中天”2025年度业绩持续爆发! 1月30日晚间,中际旭创发布2025年度业绩预告,预计2025...
双平台战略提速:仙乐健康谋“A... 中国营养健康食品行业的龙头企业仙乐健康,在1月30日向市场投下了一枚重磅消息:公司已正式向香港联交所...
左季庆染指淳厚基金股权纷争为谁... 2026年1月6日,证监会一纸批复核准上海长宁国有资产经营投资有限公司(下称“长宁国资”)成为淳厚基...
上市即巅峰?拉芳家化首度亏损,... 为什么消费端对“拉芳”爱不起来了? 作者 | 方璐 编辑丨于婞 来源 | 野马财经 拉芳家化(603...
原创 黄... 1月31日晚间,英伟达CEO黄仁勋现身中国台湾台北市砖窑古早味怀旧餐厅,宴请了35位与英伟达合作的供...
山西太钢不锈钢股份有限公司 2... 来源:证券日报 证券代码:000825 证券简称:太钢不锈 公告编号:2026-001 本公司及董...
把自己的银行贷款出借给别人,有... 新京报讯(记者张静姝 通讯员邸越洋)因贷款出借后未被归还,原告牛女士将被告杨甲、杨乙诉至法院,要求二...
金价暴跌,刚买的金饰能退吗?有... 黄金价格大跌,多品牌设置退货手续费。 在过去两三天,现货黄金价格经历了“过山车”般的行情,受金价下跌...
预计赚超2500万!“豆腐大王... 图片来源:图虫创意 在经历了一年亏损后,“豆腐大王”祖名股份(003030.SZ)成功实现扭亏为盈。...
特朗普提名“自己人”沃什执掌美... 据新华社报道,当地时间1月30日,美国总统特朗普通过社交媒体宣布,提名美国联邦储备委员会前理事凯文·...
爱芯元智将上市:连年大额亏损,... 撰稿|多客 来源|贝多商业&贝多财经 1月30日,爱芯元智半导体股份有限公司(下称“爱芯元智”,HK...
一夜之间,10只A股拉响警报:... 【导读】深康佳A等10家公司昨夜拉响退市警报 中国基金报记者 夏天 1月30日晚间,A股市场迎来一波...
谁在操控淳厚基金?左季庆为谁趟... 2026年1月6日,证监会一纸批复核准上海长宁国有资产经营投资有限公司(下称“长宁国资”)成为淳厚基...
工商银行党委副书记、行长刘珺会... 人民财讯1月31日电,1月29日,工商银行党委副书记、行长刘珺会见来访的上海电气集团党委书记、董事长...