加入收藏 | 设为首页 | 会员中心 | 我要投稿 钦州站长网 (https://www.0777zz.cn/)- 智能办公、数据计算、云存储网关、负载均衡、设备管理!
当前位置: 首页 > 大数据 > 正文

一篇文章带你深入明白FlinkSQL中的窗口

发布时间:2021-06-04 13:26:47 所属栏目:大数据 来源:互联网
导读:一、分组窗口(Group Windows) 分组窗口(Group Windows)会根据时间或行计数间隔,将行聚合到有限的组(Group)中,并对每个组的数据执行一次聚合函数。 Table API 中的 Group Windows 都是使用.window(w:GroupWindow)子句定义的,并且必须由 as 子句指定一个别
一、分组窗口(Group Windows) 分组窗口(Group Windows)会根据时间或行计数间隔,将行聚合到有限的组(Group)中,并对每个组的数据执行一次聚合函数。 Table API 中的 Group Windows 都是使用.window(w:GroupWindow)子句定义的,并且必须由 as 子句指定一个别名。为了按窗口对表进行分组,窗口的别名必须在 group by 子句中,像常规的分组字段一样引用。例子:
val table = input 
.window([w: GroupWindow] as 'w) 
.groupBy('w, 'a) 
.select('a, 'w.start, 'w.end, 'w.rowtime, 'b.count) 
Table API 提供了一组具有特定语义的预定义 Window 类,这些类会被转换为底层DataStream 或 DataSet 的窗口操作。
Table API 支持的窗口定义,和我们熟悉的一样,主要也是三种:滚动(Tumbling)、滑动(Sliding和 会话(Session)。
1.1 滚动窗口
滚动窗口(Tumbling windows)要用 Tumble 类来定义,另外还有三个方法:
over:定义窗口长度
on:用来分组(按时间间隔)或者排序(按行数)的时间字段
as:别名,必须出现在后面的 groupBy 中
实现案例
1.需求
设置滚动窗口为10秒钟统计id出现的次数。
2.数据准备
sensor_1,1547718199,35.8 
sensor_6,1547718201,15.4 
sensor_7,1547718202,6.7 
sensor_10,1547718205,38.1 
sensor_1,1547718206,32 
sensor_1,1547718208,36.2 
sensor_1,1547718210,29.7 
sensor_1,1547718213,30.9 
3.代码实现
package windows 
 
import org.apache.flink.streaming.api.TimeCharacteristic 
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor 
import org.apache.flink.streaming.api.scala._ 
import org.apache.flink.streaming.api.windowing.time.Time 
import org.apache.flink.table.api.scala._ 
import org.apache.flink.table.api.{EnvironmentSettings, Table, Tumble} 
import org.apache.flink.types.Row 
 
/** 
 * @Package Windows 
 * @File :FlinkSQLTumBlingTie.java 
 * @author 大数据老哥 
 * @date 2020/12/25 21:58 
 * @version V1.0 
 *          设置滚动窗口 
 */ 
object FlinkSQLTumBlingTie { 
  def main(args: Array[String]): Unit = { 
    val env = StreamExecutionEnvironment.getExecutionEnvironment 
    env.setParallelism(1) 
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 
 
    val settings = EnvironmentSettings.newInstance() 
      .useBlinkPlanner() 
      .inStreamingMode() 
      .build() 
    val tableEnv = StreamTableEnvironment.create(env, settings) 
 
    // 读取数据 
    val inputPath = "./data/sensor.txt" 
    val inputStream = env.readTextFile(inputPath) 
    
 
    // 先转换成样例类类型(简单转换操作) 
    val dataStream = inputStream 
      .map(data => { 
        val arr = data.split(",") 
        SensorReading(arr(0), arr(1).toLong, arr(2).toDouble) 
      }) 
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) { 
        override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L 
      }) 
 
    val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp.rowtime as 'ts) 
    // 注册表 
    tableEnv.createTemporaryView("sensor", sensorTable) 
    // table 实现 
    val resultTable = sensorTable 
      .window(Tumble over 10.seconds on 'ts as 'tw) // 每10秒统计一次,滚动时间窗口 
      .groupBy('id, 'tw) 
      .select('id, 'id.count, 'tw.end) 
    //sql 实现 
    val sqlTable = tableEnv.sqlQuery( 
      """ 
        |select 
        |id, 
        |count(id) , 
        |tumble_end(ts,interval '10' second) 
        |from sensor 
        |group by 
        |id, 
        |tumble(ts,interval '10' second) 
        |""".stripMargin) 
 
    /*** 
     * .window(Tumble over 10.minutes on 'rowtime as 'w) (事件时间字段 rowtime) 
     * .window(Tumble over 10.minutes on 'proctime as 'w)(处理时间字段 proctime) 
     * .window(Tumble over 10.minutes on 'proctime as 'w) (类似于计数窗口,按处理时间排序,10 行一组) 
     */ 
    resultTable.toAppendStream[Row].print("talbe") 
    sqlTable.toRetractStream[Row].print("sqlTable") 
     
    env.execute("FlinkSQLTumBlingTie") 
  } 
 
  case class SensorReading(id: String, timestamp: Long, temperature: Double) 
 

(编辑:钦州站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    热点阅读