jjzjj

FlinkSQL之Windowing TVF

再美不及姑娘你 2023-03-28 原文

Windowing TVF

在Flink1.13版本之后出现的替代之前的Group window的产物,官网描述其 is more powerful and effective

 //TVF 中的tumble滚动窗口
 //tumble(table sensor,descriptor(et),interval '5' second ):作为一张表存在
 //特别注意!!!!
 //如果在sql中使用了tumble窗口,则一定需要group by,而且group by后一定有window_start,window_end两个字段

sql实现TVF的tumble窗口实现

 package net.cyan.FlinkSql.TVF;
 
 import net.cyan.POJO.WaterSensor;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 
 import java.time.Duration;
 
 import static org.apache.flink.table.api.Expressions.$;
 
 public class Demo1_Window_TableAPI_Tumble {
     public static void main(String[] args) {
         //创建执行环境
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         //创建表的运行环境
         StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);
         env.setParallelism(1);
         DataStream<WaterSensor> waterSensorStream =
                 env.fromElements(
                         new WaterSensor("sensor_1", 1000L, 10),
                         new WaterSensor("sensor_1", 2000L, 20),
                         new WaterSensor("sensor_2", 3000L, 30),
                         new WaterSensor("sensor_1", 4000L, 40),
                         new WaterSensor("sensor_1", 5000L, 50),
                         new WaterSensor("sensor_2", 6000L, 60))
                        .assignTimestampsAndWatermarks(
                                 WatermarkStrategy
                                        .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                        .withTimestampAssigner((ws, ts) -> ws.getTs())
 
                        );
         //创建table
         Table table = tabEnv.fromDataStream(waterSensorStream,$("id"),$("ts"),$("vc"),$("et").rowtime());
         //创建表
         tabEnv.createTemporaryView("sensor",table);
         //执行sql
         //TVF 中的tumble滚动窗口
         //tumble(table sensor,descriptor(et),interval '5' second ):作为一张表存在
         //特别注意!!!!
         //如果在sql中使用了tumble窗口,则一定需要group by,而且group by后一定有window_start,window_end两个字段
         tabEnv.sqlQuery("select" +
                 " window_start,window_end,id," +
                 "sum(vc) sum_vc" +
                 " from table (tumble(table sensor,descriptor(et),interval '5' second ))" +
                 " group by window_start,window_end,id ")
                .execute()
                .print();
 
    }
 }

sql实现TVF的滑动窗口

 //TVF 中的hop滚动窗口
 //hop(table sensor,descriptor(et),interval '2' second,interval '5' second ):作为一张表存在
 //first interval :滑动步长, second interval :窗口长度
 //特别注意!!!!
 // 1.TVF 中滑动窗口的滑动步长与窗口长度必须是整数倍的关系,不然会报错
 // 例如:滑动步长为2,窗口长度就不能为5,可以为6
 // 2.如果在sql中使用了hop窗口,则一定需要group by,而且group by后一定有window_start,window_end两个字段
 package net.cyan.FlinkSql.TVF;
 
 import net.cyan.POJO.WaterSensor;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 
 import java.time.Duration;
 
 import static org.apache.flink.table.api.Expressions.$;
 
 public class Demo2_Window_TVF_Hop {
     public static void main(String[] args) {
         //创建执行环境
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         //创建表的运行环境
         StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);
         env.setParallelism(1);
         DataStream<WaterSensor> waterSensorStream =
                 env.fromElements(
                         new WaterSensor("sensor_1", 1000L, 10),
                         new WaterSensor("sensor_1", 2000L, 20),
                         new WaterSensor("sensor_2", 3000L, 30),
                         new WaterSensor("sensor_1", 4000L, 40),
                         new WaterSensor("sensor_1", 5000L, 50),
                         new WaterSensor("sensor_2", 6000L, 60))
                        .assignTimestampsAndWatermarks(
                                 WatermarkStrategy
                                        .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                        .withTimestampAssigner((ws, ts) -> ws.getTs())
 
                        );
         //创建table
         Table table = tabEnv.fromDataStream(waterSensorStream,$("id"),$("ts"),$("vc"),$("et").rowtime());
         //创建表
         tabEnv.createTemporaryView("sensor",table);
         //执行sql
         //TVF 中的hop滚动窗口
         //hop(table sensor,descriptor(et),interval '2' second,interval '5' second ):作为一张表存在
         //first interval :滑动步长, second interval :窗口长度
         //特别注意!!!!
         // 1.TVF 中滑动窗口的滑动步长与窗口长度必须是整数倍的关系,不然会报错
         // 例如:滑动步长为2,窗口长度就不能为5,可以为6
         // 2.如果在sql中使用了hop窗口,则一定需要group by,而且group by后一定有window_start,window_end两个字段
         tabEnv.sqlQuery("select" +
                 " window_start,window_end,id," +
                 "sum(vc) sum_vc" +
                 " from table (hop(table sensor,descriptor(et),interval '2' second,interval '6' second ))" +
                 " group by window_start,window_end,id ")
                .execute()
                .print();
 
 
 
    }
 }

sql实现TVF的累计窗口

累计窗口的应用:

需求:每天每隔一个小时统计一次当天的pv(浏览量)

流的方式如何解决:

1、用滚动窗口, 窗口长度设为1h

2、每天的第一个窗口清除状态,后面的不清,进行状态的累加

或者

用滚动窗口,长度设置为2day

自定义触发器,每隔1小时对窗内的元素计算一次,不关闭窗口

 

sql的方式如何解决?

直接使用累计窗口cumulate

 //TVF 中的cumulate累计窗口
 //cumulate(table tableName,descriptor(timecol),step,size):作为一张表存在
 //tableName:表名
 //timecol:时间属性字段
 //step:累计步长,跟滑动步长类似
 //size:窗口长度
 //特别注意!!!!
 //1.累计窗口的步长与窗口长度同样是需要整数倍关系
 // 2.如果在sql中使用了cumulate窗口,则一定需要group by,而且group by后一定有window_start,window_end两个字段
 package net.cyan.FlinkSql.TVF;
 
 import net.cyan.POJO.WaterSensor;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 
 import java.time.Duration;
 
 import static org.apache.flink.table.api.Expressions.$;
 
 public class Demo3_Window_TVF_cumulate {
     public static void main(String[] args) {
         //创建执行环境
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         //创建表的运行环境
         StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);
         env.setParallelism(1);
         DataStream<WaterSensor> waterSensorStream =
                 env.fromElements(
                         new WaterSensor("sensor_1", 1000L, 10),
                         new WaterSensor("sensor_1", 2000L, 20),
                         new WaterSensor("sensor_2", 3000L, 30),
                         new WaterSensor("sensor_1", 4000L, 40),
                         new WaterSensor("sensor_1", 5000L, 50),
                         new WaterSensor("sensor_2", 6000L, 60))
                        .assignTimestampsAndWatermarks(
                                 WatermarkStrategy
                                        .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                        .withTimestampAssigner((ws, ts) -> ws.getTs())
 
                        );
         //创建table
         Table table = tabEnv.fromDataStream(waterSensorStream,$("id"),$("ts"),$("vc"),$("et").rowtime());
         //创建表
         tabEnv.createTemporaryView("sensor",table);
         //执行sql
         //TVF 中的cumulate累计窗口
         //cumulate(table tableName,descriptor(timecol),step,size):作为一张表存在
         //tableName:表名
         //timecol:时间属性字段
         //step:累计步长,跟滑动步长类似
         //size:窗口长度
         //特别注意!!!!
         //1.累计窗口的步长与窗口长度同样是需要整数倍关系
         // 2.如果在sql中使用了cumulate窗口,则一定需要group by,而且group by后一定有window_start,window_end两个字段
         tabEnv.sqlQuery("select" +
                 " window_start,window_end,id," +
                 " sum(vc) sum_vc" +
                 " from table (cumulate(table sensor,descriptor(et),interval '2' second,interval '6' second)) " +
                 "group by window_start,window_end,id")
                .execute()
                .print();
    }
 }
 

有关FlinkSQL之Windowing TVF的更多相关文章

  1. flink学习35:flinkSQL查询mysql - 2

    总览:   importorg.apache.flink.streaming.api.scala._importorg.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimportorg.apache.flink.table.api.EnvironmentSettingsimportorg.apache.flink.table.api.bridge.scala.{StreamTableEnvironment,tableConversions}objectsqlQueryTable{ defmain(args:Array[St

  2. FlinkSQL 14.5 CDC实现同步oracle11G 数据到mysql - 2

    下载oracle1.实现功能oracle11G--->flinksql---->mysql2.版本组件版本flinkflink-1.14.5-bin-scala_2.12.tgzflinkcdcflink-sql-connector-oracle-cdc-2.2.1.jaroracleoracle11G3.docker安装oracle11G3.1拉去oracle11G镜像dockerpullregistry.cn-hangzhou.aliyuncs.com/helowin/oracle_11g3.2查看镜像[root@basenode~]#dockerimagesREPOSITORYTAGIM

  3. FlinkSQL消费Kafka写入Hive表 - 2

    环境版本:hadoop-3.1.0hive-3.1.2flink-1.13.2一、开发Maven引入依赖项:org.apache.flinkflink-java${flink.version}org.apache.flinkflink-streaming-java_${scala.binary.version}${flink.version}org.apache.flinkflink-table-planner-blink_${scala.binary.version}${flink.version}org.apache.flinkflink-table-api-java-bridge_2.1

  4. FlinkSQL消费Kafka写入Hive表 - 2

    环境版本:hadoop-3.1.0hive-3.1.2flink-1.13.2一、开发Maven引入依赖项:org.apache.flinkflink-java${flink.version}org.apache.flinkflink-streaming-java_${scala.binary.version}${flink.version}org.apache.flinkflink-table-planner-blink_${scala.binary.version}${flink.version}org.apache.flinkflink-table-api-java-bridge_2.1

  5. Flink:FlinkSql解析嵌套Json - 2

    日常开发中都是用的简便json格式,但是偶尔也会遇到嵌套json的时候,因此在用flinksql的时候就有点麻烦,下面用简单例子简单定义处理下1,数据是网上摘抄,但包含里常用的大部分格式{   "afterColumns":{      "created":"1589186680",      "extra":{         "canGiving":false      },      "parameter":[1,2,3,4]   },   "beforeColumns":null,   "tableVersion":{      "binlogFile":null,      "bin

  6. Flink:FlinkSql解析嵌套Json - 2

    日常开发中都是用的简便json格式,但是偶尔也会遇到嵌套json的时候,因此在用flinksql的时候就有点麻烦,下面用简单例子简单定义处理下1,数据是网上摘抄,但包含里常用的大部分格式{   "afterColumns":{      "created":"1589186680",      "extra":{         "canGiving":false      },      "parameter":[1,2,3,4]   },   "beforeColumns":null,   "tableVersion":{      "binlogFile":null,      "bin

  7. flink学习33:flinkSQL连接mysql,查询插入数据 - 2

    总览1.生成运行时env2.生成表环境3.接上数据流,数据流数据生成表4.把数据库中sink保存数据的表,在flink中生成一遍(相当于把flink生成的表,绑定到数据库中的表),配上数据库连接信息,并执行,及注册5.查询表,可以根据注册表名查询6.插入表,可以根据生成的flink表进行数据插入完整案例:importorg.apache.flink.streaming.api.scala._importorg.apache.flink.table.api.bridge.scala._importorg.apache.flink.table.api._importorg.apache.flink

  8. flink学习33:flinkSQL连接mysql,查询插入数据 - 2

    总览1.生成运行时env2.生成表环境3.接上数据流,数据流数据生成表4.把数据库中sink保存数据的表,在flink中生成一遍(相当于把flink生成的表,绑定到数据库中的表),配上数据库连接信息,并执行,及注册5.查询表,可以根据注册表名查询6.插入表,可以根据生成的flink表进行数据插入完整案例:importorg.apache.flink.streaming.api.scala._importorg.apache.flink.table.api.bridge.scala._importorg.apache.flink.table.api._importorg.apache.flink

  9. 华为云FusionInsight MRS FlinkSQL 复杂嵌套Json解析最佳实践 - 2

    背景说明随着流计算的发展,挑战不再仅限于数据量和计算量,业务变得越来越复杂,开发者可能是资深的大数据从业者、初学Java的爱好者,或是不懂代码的数据分析者。如何提高开发者的效率,降低流计算的门槛,对推广实时计算非常重要。SQL是数据处理中使用最广泛的语言,它允许用户简明扼要地展示其业务逻辑。Flink作为流批一体的计算引擎,致力于提供一套SQL支持全部应用场景,FlinkSQL的实现也完全遵循ANSISQL标准。之前,用户可能需要编写上百行业务代码,使用SQL后,可能只需要几行SQL就可以轻松搞定。本文介绍如何使用华为FusionInsightMRSFlinkServer服务进行界面化的Fli

  10. 华为云FusionInsight MRS FlinkSQL 复杂嵌套Json解析最佳实践 - 2

    背景说明随着流计算的发展,挑战不再仅限于数据量和计算量,业务变得越来越复杂,开发者可能是资深的大数据从业者、初学Java的爱好者,或是不懂代码的数据分析者。如何提高开发者的效率,降低流计算的门槛,对推广实时计算非常重要。SQL是数据处理中使用最广泛的语言,它允许用户简明扼要地展示其业务逻辑。Flink作为流批一体的计算引擎,致力于提供一套SQL支持全部应用场景,FlinkSQL的实现也完全遵循ANSISQL标准。之前,用户可能需要编写上百行业务代码,使用SQL后,可能只需要几行SQL就可以轻松搞定。本文介绍如何使用华为FusionInsightMRSFlinkServer服务进行界面化的Fli

随机推荐