Flink学习:WaterMark
创始人
2025-05-29 04:05:01
0

WaterMark

  • 一、什么是水位线?
  • 二、案例分析
  • 三、如何生成水位线?
    • (一)、在SourceFunction中直接定义Timestamps和Watermarks
    • (二)、自定义生成Timstamps和Watermarks

一、什么是水位线?

  • 通常情况下,由于网络或系统等外部因素影响,事件数据往往不能及时传输至Flink系统中,导致数据乱序或者延迟到达等问题,因此,需要有一种机制能够控制数据处理的过程和进度,这种机制就是水位线
  • 水位线本质上是一个时间戳,且是动态变化的,会根据最大事件时间生成
watermark = 进入Flink窗口的最大事件时间(maxEventTime) - 一定的延迟时间(t)
//这个延迟时间t是在程序当中配置的
  • watermark时间戳是与窗口结束时间比较的,当watermark大于窗口结束时间时,意味着窗口结束,需要触发窗口计算
  • 举个例子,某条数据的事件时间为2023:03:16 9:00:00,它的下一条数据的事件时间为2023:03:16 9:06:00,窗口设置为滚动窗口为5分钟,延迟时间t设置为2分钟,此时窗口结束时间为2023:03:16 9:00:00,水位线是2023:03:16 9:06:00 - 2min = 2023:03:16 9:04:00,watermark < window endtime,这两条数据应该在同一个窗口内,下面是具体的例子

二、案例分析

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.api.java.tuple._
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{TableEnvironment, Types}
import org.apache.flink.table.sinks.CsvTableSink
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.types.Rowcase class User(id:Int,name:String,age:Int,timestamp:Long)
object SqlTest {def main(args: Array[String]): Unit = {val streamEnv = StreamExecutionEnvironment.getExecutionEnvironmentval tEnv = TableEnvironment.getTableEnvironment(streamEnv)//指定时间类型为事件时间streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val stream = streamEnv.fromElements(User(1,"nie",22,1511658000),User(2,"hu",20,1511658000),User(2,"xiao",19,1511658000)).assignAscendingTimestamps(_.timestamp * 1000L) //指定水位线tEnv.registerDataStream("testTable",stream,'id, 'name,'age,'event_time.rowtime)val result = tEnv.sqlQuery("select id,sum(age) from testTable group by TUMBLE(event_time,INTERVAL '5' MINUTE),id")result.toRetractStream[Row].print()streamEnv.execute("windowTest")}
}
  • 如上述代码所示,三条数据时间戳一样,也就是事件时间相同,水位线为自带的时间戳*1000L转成毫秒,也就是没有水位线,滚动窗口时间间隔为5min,这时计算结果应该是3条数据都在一个窗口中计算,最终会产生2条数据

在这里插入图片描述

  • 如下所示,把第三条时间戳增加300,也就是增加了5分钟,下面三条数据的真实日期分别为2017-11-26 9:0:0、2017-11-26
    9:0:0、2017-11-26 9:5:0
    val stream = streamEnv.fromElements(User(1,"nie",22,1511658000),User(2,"hu",20,1511658000),User(2,"xiao",19,1511658300)).assignAscendingTimestamps(_.timestamp * 1000L) //指定水位线
  • 第三条数据正好晚了5分钟,此时前两条数据在一个窗口,第三条数据在一个窗口,最终应该产生三条数据,如下所示

在这里插入图片描述

  • 此时设置水位线为延迟一分钟,相当于第三条数据在第二条数据后第4分钟到的
    val stream = streamEnv.fromElements(User(1,"nie",22,1511658000),User(2,"hu",20,1511658000),User(2,"xiao",19,1511658300)).assignAscendingTimestamps(_.timestamp + 100) //指定水位线
  • 此时又是第一种情况,三条数据应该都在一个窗口中,如下所示

在这里插入图片描述

三、如何生成水位线?

生成水位线分为两步:

  • 第一步需要指定eventTime,可以通过StreamExecutionEnvironment的TimeCharacteristic指定,还需要在Flink程序中指定event
    time时间戳在数据中的字段信息,在Flink程序中会通过指定字段抽取出对应的事件时间,该过程叫做Timestamps Assigning
  • 第二步就是创建相应的Watermarks,需要用户定义根据Timestamps计算出Watermarks的生成策略
  • 目前Flink支持两种方式指定Timestamps和生成WaterMarks,一种方式在DataStream Source算子接口的Source Function定义,另一种方式是通过自定义Timestamp Assigner和Watermark Generator生成

(一)、在SourceFunction中直接定义Timestamps和Watermarks

(二)、自定义生成Timstamps和Watermarks

自定义生成分为两种:

  • Periodic Watermarks:根据设定时间间隔周期性地生成Watermarks
  • Punctuated Watermarks:根据接入数据的数量生成

1、Periodic Watermarks

  • Periodic Watermark又分为两种:升序模式和固定时延间隔

1)、升序模式

  • 会将数据中的Timestamp根据指定字段提取,并用当前的Timestamp作为最新的watermarks,适用于事件按顺序生成

eg:
2)、使用固定时延间隔的Timestamp Assigner

  • 通过设定固定的时间间隔来指定Watermark落后于Timestamp的区间长度,也就是最长容忍到多长时间内的数据到达系统

eg:

2、Punctuated Watermarks

上一篇:图解GPT-2

下一篇:javah生成jni头文件

相关内容

热门资讯

证监会再次修订《上市公司治理准... 7月25日晚上,证监会发布通知对《上市公司治理准则(修订征求意见稿)》公开征求意见。本次修订目的是为...
从“闭眼买”到“不敢信”,山姆... 蓝鲨导读:中国零售崛起 作者 | 张二河 编辑 | 卢旭成 山姆会员店,因上架了一款韩国品牌陷入舆论...
原创 欧... 近日,欧盟委员会突然宣布了一则令人震惊的消息:7月25日,该委员会正式对中国太阳能玻璃发起第二次双反...
7月28日财经早餐:欧美达成贸... 来源:市场资讯 汇通财经APP讯——周一(北京时间7月28日),现货黄金交投于3335.78美元/盎...
科技早报 | 贝索斯完成大规模... 贝索斯完成一轮大规模亚马逊股票出售,套现57亿美元 亚马逊公司当地时间7月25日提交至美国证券交易...
股市必读:紫光股份(00093... 截至2025年7月25日收盘,紫光股份(000938)报收于25.04元,上涨0.64%,换手率1....
15%!美国与欧盟达成贸易协议... 据央视新闻报道,当地时间7月27日,美国总统特朗普表示,美国已与欧盟达成贸易协议,对欧盟输美商品征收...
早新闻|央行4000亿元MLF... 宏观热点 央行、农业农村部印发《关于加强金融服务农村改革 推进乡村全面振兴的意见》 近日,中国人...
21专访|细胞存储,《繁花》爷... 21世纪经济报道记者 赵云帆 上海报道 “我是一个真正意义上的创业者”,年过古稀的瞿建国,在采访中如...
华熙生物赵燕谈胶原蛋白乱象:科... 21世纪经济报道记者雷晨 北京报道 近年来,重组胶原蛋白成为医美和护肤领域的热门概念,市场宣传中不乏...
富春染织完成董事会选举换届 开... 7月25日晚间,富春染织公告显示,当日,公司2025年第一次临时股东会和富春染织第四届第一次董事会在...
圣湘生物:两款产品取得医疗器械... 每经AI快讯,圣湘生物(SH 688289,收盘价:22.94元)7月27日晚间发布公告称,圣湘生物...
10年期国债收益率升至1.73... 近期债券市场出现显著调整,多重因素交织推动收益率持续上行。权益市场强势表现与大宗商品价格上涨形成合力...
当对手都在做下沉 蜜雪冰城旗下... [ 今年5月,蜜雪集团跟巴西签署40亿元人民币的采购意向大单,其中大多数是咖啡豆。 ] 当星巴克、瑞...
新手必看!股指期货交易规则基础... 股指期货交易规则,看似复杂抽象,实则与我们的日常生活有着奇妙的共通之处。它就像一场精心编排的生活交响...
王登发履新茅台技开公司“一把手... 一则微信公众号发布的信息,披露了茅台集团旗下的技术开发公司“一把手”已换人。 近日,南都湾财社-酒水...
特斯拉机器人V3量产版亮相!马... 快科技7月27日消息,特斯拉的Optimus人形机器人V3量产版终于要来了!马斯克在最近的财报电话会...
原创 中... 在金融全球化的浪潮中,中国资本市场始终勇立潮头,不断探索前行。7月26日,中国资本市场学会成立大会暨...
报告:我国经济增长保持韧性 下... 央广网北京7月27日消息(记者 樊瑞)近日,中国金融四十人论坛(CF40论坛)发布《2025年第二季...
超6300亿元!A股银行“分红... 7月25日,成都银行完成权益分派股权登记,将于7月28日发放现金红利,这标志着A股上市银行2024年...