一篇文章带你深入明白FlinkSQL中的窗口
发布时间:2021-06-04 13:26:47 所属栏目:大数据 来源:互联网
导读:一、分组窗口(Group Windows) 分组窗口(Group Windows)会根据时间或行计数间隔,将行聚合到有限的组(Group)中,并对每个组的数据执行一次聚合函数。 Table API 中的 Group Windows 都是使用.window(w:GroupWindow)子句定义的,并且必须由 as 子句指定一个别
一、分组窗口(Group Windows) 分组窗口(Group Windows)会根据时间或行计数间隔,将行聚合到有限的组(Group)中,并对每个组的数据执行一次聚合函数。 Table API 中的 Group Windows 都是使用.window(w:GroupWindow)子句定义的,并且必须由 as 子句指定一个别名。为了按窗口对表进行分组,窗口的别名必须在 group by 子句中,像常规的分组字段一样引用。例子:
val table = input
.window([w: GroupWindow] as 'w)
.groupBy('w, 'a)
.select('a, 'w.start, 'w.end, 'w.rowtime, 'b.count)
Table API 提供了一组具有特定语义的预定义 Window 类,这些类会被转换为底层DataStream 或 DataSet 的窗口操作。
Table API 支持的窗口定义,和我们熟悉的一样,主要也是三种:滚动(Tumbling)、滑动(Sliding和 会话(Session)。
1.1 滚动窗口
滚动窗口(Tumbling windows)要用 Tumble 类来定义,另外还有三个方法:
over:定义窗口长度
on:用来分组(按时间间隔)或者排序(按行数)的时间字段
as:别名,必须出现在后面的 groupBy 中
实现案例
1.需求
设置滚动窗口为10秒钟统计id出现的次数。
2.数据准备
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718206,32
sensor_1,1547718208,36.2
sensor_1,1547718210,29.7
sensor_1,1547718213,30.9
3.代码实现
package windows
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{EnvironmentSettings, Table, Tumble}
import org.apache.flink.types.Row
/**
* @Package Windows
* @File :FlinkSQLTumBlingTie.java
* @author 大数据老哥
* @date 2020/12/25 21:58
* @version V1.0
* 设置滚动窗口
*/
object FlinkSQLTumBlingTie {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val tableEnv = StreamTableEnvironment.create(env, settings)
// 读取数据
val inputPath = "./data/sensor.txt"
val inputStream = env.readTextFile(inputPath)
// 先转换成样例类类型(简单转换操作)
val dataStream = inputStream
.map(data => {
val arr = data.split(",")
SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L
})
val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp.rowtime as 'ts)
// 注册表
tableEnv.createTemporaryView("sensor", sensorTable)
// table 实现
val resultTable = sensorTable
.window(Tumble over 10.seconds on 'ts as 'tw) // 每10秒统计一次,滚动时间窗口
.groupBy('id, 'tw)
.select('id, 'id.count, 'tw.end)
//sql 实现
val sqlTable = tableEnv.sqlQuery(
"""
|select
|id,
|count(id) ,
|tumble_end(ts,interval '10' second)
|from sensor
|group by
|id,
|tumble(ts,interval '10' second)
|""".stripMargin)
/***
* .window(Tumble over 10.minutes on 'rowtime as 'w) (事件时间字段 rowtime)
* .window(Tumble over 10.minutes on 'proctime as 'w)(处理时间字段 proctime)
* .window(Tumble over 10.minutes on 'proctime as 'w) (类似于计数窗口,按处理时间排序,10 行一组)
*/
resultTable.toAppendStream[Row].print("talbe")
sqlTable.toRetractStream[Row].print("sqlTable")
env.execute("FlinkSQLTumBlingTie")
}
case class SensorReading(id: String, timestamp: Long, temperature: Double)
}
![]() (编辑:钦州站长网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |