watermark = 进入Flink窗口的最大事件时间(maxEventTime) - 一定的延迟时间(t)
//这个延迟时间t是在程序当中配置的
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.api.java.tuple._
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{TableEnvironment, Types}
import org.apache.flink.table.sinks.CsvTableSink
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.types.Rowcase class User(id:Int,name:String,age:Int,timestamp:Long)
object SqlTest {def main(args: Array[String]): Unit = {val streamEnv = StreamExecutionEnvironment.getExecutionEnvironmentval tEnv = TableEnvironment.getTableEnvironment(streamEnv)//指定时间类型为事件时间streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val stream = streamEnv.fromElements(User(1,"nie",22,1511658000),User(2,"hu",20,1511658000),User(2,"xiao",19,1511658000)).assignAscendingTimestamps(_.timestamp * 1000L) //指定水位线tEnv.registerDataStream("testTable",stream,'id, 'name,'age,'event_time.rowtime)val result = tEnv.sqlQuery("select id,sum(age) from testTable group by TUMBLE(event_time,INTERVAL '5' MINUTE),id")result.toRetractStream[Row].print()streamEnv.execute("windowTest")}
}
val stream = streamEnv.fromElements(User(1,"nie",22,1511658000),User(2,"hu",20,1511658000),User(2,"xiao",19,1511658300)).assignAscendingTimestamps(_.timestamp * 1000L) //指定水位线
val stream = streamEnv.fromElements(User(1,"nie",22,1511658000),User(2,"hu",20,1511658000),User(2,"xiao",19,1511658300)).assignAscendingTimestamps(_.timestamp + 100) //指定水位线
生成水位线分为两步:
自定义生成分为两种:
1、Periodic Watermarks
1)、升序模式
eg:
2)、使用固定时延间隔的Timestamp Assigner
eg:
2、Punctuated Watermarks
上一篇:图解GPT-2
下一篇:javah生成jni头文件