jjzjj

FlinkSql之TableAPI详解

再美不及姑娘你 2023-03-28 原文

一、FlinkSql的概念

核心概念

Flink 的 Table APISQL 是流批统一的 API。 这意味着 Table API & SQL 在无论有限的批式输入还是无限的流式输入下,都具有相同的语义。 因为传统的关系代数以及 SQL 最开始都是为了批式处理而设计的, 关系型查询在流式场景下不如在批式场景下容易理解.

动态表和连续查询

动态表(Dynamic Tables) 是 Flink 的支持流数据的 Table API 和 SQL 的核心概念。

与表示批处理数据的静态表不同,动态表是随时间变化的。可以像查询静态批处理表一样查询它们。查询动态表将生成一个连续查询(Continuous Query)。一个连续查询永远不会终止,结果会生成一个动态表。查询不断更新其(动态)结果表,以反映其(动态)输入表上的更改。

TableAPI

首先需要导入依赖

 <dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
     <version>${flink.version}</version>
     <scope>provided</scope>
 </dependency>
 <dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
     <version>${flink.version}</version>
     <scope>provided</scope>
 </dependency>
 <dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-csv</artifactId>
     <version>${flink.version}</version>
 </dependency>
 <dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-json</artifactId>
     <version>${flink.version}</version>
 </dependency>
 
 <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress -->
 <dependency>
     <groupId>org.apache.commons</groupId>
     <artifactId>commons-compress</artifactId>
     <version>1.21</version>
 </dependency>
 /**
  * 使用TableAPI的基本套路:
  * 1.创建表的执行环境
  * 2.创建表,将流转换为动态表,表的字段名从bean的属性名自动抽取
  * 3.对动态表进行查询
  * 4.把动态表转换为流
  */

这里需要注意的问题:

1.TableAPI 中将动态表转换为流时有两种方法

 DataStream<Row> rowDataStream = tableEnvironment.toAppendStream(result, Row.class);

toAppendStream方法只能在查询时使用,不能使用包含聚合函数等更新语句

 DataStream<Tuple2<Boolean, Row>> tuple2DataStream = tableEnvironment.toRetractStream(select, Row.class);

toRetractStream则可以使用

2.上述两种方法内传入的参数Row.class,表示将表中查询出的数据封装为行类型,也就是对每行进行封装,解决查询出的数据列少于或者多于原表。如何能够确保所查询的数据与之前封装的Bean有完全一致的结构则也可以封装为原Bean.class

代码实现:

 package net.cyan.FlinkSql;
 
 import net.cyan.POJO.WaterSensor;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.types.Row;
 
 import static org.apache.flink.table.api.Expressions.$;
 
 /**
  * 使用TableAPI的基本套路:
  * 1.创建表的执行环境
  * 2.创建表,将流转换为动态表,表的字段名从bean的属性名自动抽取
  * 3.对动态表进行查询
  * 4.把动态表转换为流
  */
 public class Demo1 {
     public static void main(String[] args) {
         Configuration configuration=new Configuration();
         configuration.setInteger("rest.port",3333);
         //创建执行环境
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
         env.setParallelism(1);
         //模拟数据
         DataStreamSource<WaterSensor> WaterSensorSource = env.fromElements(
                 new WaterSensor("S1", 1000L, 10),
                 new WaterSensor("S1", 1000L, 10),
                 new WaterSensor("S2", 2000L, 20),
                 new WaterSensor("S3", 3000L, 30),
                 new WaterSensor("S4", 4000L, 40),
                 new WaterSensor("S5", 5000L, 50)
        );
         //创建表的执行环境
         StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
         //创建表,将流转换为动态表,表的字段名从bean的属性名自动抽取
         Table table = tableEnvironment.fromDataStream(WaterSensorSource);
         //对表进行查询
         //1、过时的查询书写
         Table result = table
                .where("id='S1'")
                .select("*");
         //2、不过时的书写
         Table result1 = table
 //               .where($("id").isEqual("S1"))
                .select($("id"), $("ts"), $("vc"));
         //3.聚合函数
         Table select = table
                .groupBy($("id"))
                .aggregate($("vc").sum().as("sum_vc"))
                .select($("id"), $("sum_vc"));
         //把动态表转换为流,使用到了之前创建的表运行环境
 
         SingleOutputStreamOperator<Row> tuple2DataStream = tableEnvironment
                .toRetractStream(select, Row.class)
                .filter(t -> t.f0)
                .map(t -> t.f1);
 //       DataStream<Row> rowDataStream = tableEnvironment.toAppendStream(result, Row.class);
 //       DataStream<Row> rowDataStream1 = tableEnvironment.toAppendStream(result1, Row.class);
 //       rowDataStream.print();
 //       rowDataStream1.print();
         tuple2DataStream.print();
 
 
         try {
             //启动执行环境
             env.execute();
        } catch (Exception e) {
             e.printStackTrace();
        }
 
 
 
    }
 }

 

二、TableAPI读取文件

使用TableAPI读取文件时,我们首先需要知道去哪里读取也就是文件路径、读取文件的格式、读取出来的数据的结构也就是结果表的表结构及表名

 package net.cyan.FlinkSql;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.descriptors.Csv;
 import org.apache.flink.table.descriptors.FileSystem;
 import org.apache.flink.table.descriptors.Schema;
 import org.apache.flink.table.types.DataType;
 
 import static org.apache.flink.table.api.Expressions.$;
 
 public class Demo2_readWriteText {
     public static void main(String[] args) {
         //创建执行环境
 //       Configuration configuration = new Configuration();
 //       configuration.setInteger("rest.port", 3333);
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(1);
         StreamTableEnvironment talEnv = StreamTableEnvironment.create(env);
         //创建查询的数据结果封装类型
         Schema schema = new Schema()
                .field("id", DataTypes.STRING())
                .field("ts", DataTypes.BIGINT())
                .field("vc", DataTypes.INT());
         talEnv
                .connect(new FileSystem().path("input/sensor.txt"))  //读取文件路径
                .withFormat(new Csv()) //读取文件的数据格式
                .withSchema(schema) //读取出来的数据格式
                .createTemporaryTable("sensor");//定义结果表名
 
         //进行查询
         Table select = talEnv.from("sensor")
                .where($("id").isEqual("sensor_1"))
                .select($("id"), $("ts"), $("vc"));
 
 
         //将查询结果写入到新文件中
         //同样建立一个动态表连接
         talEnv
                .connect(new FileSystem().path("input/b.txt"))  //写入路径
                .withFormat(new Csv()) //写入文件的数据格式
                .withSchema(schema) //写入的数据格式
                .createTemporaryTable("abc");//定义写入表名
         //进行写入操作
 
         select.executeInsert("abc");
 
 //       try {
 //           //启动执行环境
 //           env.execute();
 //       } catch (Exception e) {
 //           e.printStackTrace();
 //       }
 
    }
 }

 

三、TableAPI 读取、写入Kakfa

基本流程

1>需要创建表的运行环境

 //创建表的运行环境
 StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);

2>创建查询出的数据写出结构

 //创建表结构
 Schema schema=new Schema()
        .field("id",DataTypes.STRING())
        .field("ts",DataTypes.BIGINT())
        .field("vc",DataTypes.INT());

3> 创建kafka连接

 //创建kafka连接
 tabEnv.connect(
         new Kafka()
        .version("universal")// 版本号
        .property("bootstrap.servers","hadoop102:9092")//地址
        .property("group.id","cy")//消费者组
        .topic("first")//消费主题
 
  )
        .withFormat(new Json())//写入的格式
        .withSchema(schema)
        .createTemporaryTable("a");//临时表

4> 进行查询

 //创建表
 Table select = tabEnv.from("a").select("*");

5> 创建写入kafka连接

 //创建写入主题
 tabEnv.connect(
         new Kafka()
                .version("universal")// 版本号
                .property("bootstrap.servers","hadoop102:9092")//地址
                .topic("first1")//消费主题
                .sinkPartitionerRoundRobin()//随机分区
 
 )
        .withFormat(new Json())//写入的格式
        .withSchema(schema)
        .createTemporaryTable("c");

6> 写入

 //写入
 select.executeInsert("c");

完整代码如下

 package net.cyan.FlinkSql;
 
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.descriptors.Json;
 import org.apache.flink.table.descriptors.Kafka;
 import org.apache.flink.table.descriptors.Schema;
 
 public class Demo5_readWriteKafka {
     public static void main(String[] args) {
        //创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         //创建表的运行环境
         StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);
         //创建表结构
         Schema schema=new Schema()
                .field("id",DataTypes.STRING())
                .field("ts",DataTypes.BIGINT())
                .field("vc",DataTypes.INT());
         //创建kafka连接
         tabEnv.connect(
                 new Kafka()
                .version("universal")// 版本号
                .property("bootstrap.servers","hadoop102:9092")//地址
                .property("group.id","cy")//消费者组
                .topic("first")//消费主题
 
          )
                .withFormat(new Json())//写入的格式
                .withSchema(schema)
                .createTemporaryTable("a");
         //创建表
         Table select = tabEnv.from("a").select("*");
         //创建写入主题
         tabEnv.connect(
                 new Kafka()
                        .version("universal")// 版本号
                        .property("bootstrap.servers","hadoop102:9092")//地址
                        .topic("first1")//消费主题
                        .sinkPartitionerRoundRobin()//随即分区
 
        )
                .withFormat(new Json())//写入的格式
                .withSchema(schema)
                .createTemporaryTable("c");
 
         //写入
         select.executeInsert("c");
 
 
    }
 }
 

有关FlinkSql之TableAPI详解的更多相关文章

  1. 物联网MQTT协议详解 - 2

    一、什么是MQTT协议MessageQueuingTelemetryTransport:消息队列遥测传输协议。是一种基于客户端-服务端的发布/订阅模式。与HTTP一样,基于TCP/IP协议之上的通讯协议,提供有序、无损、双向连接,由IBM(蓝色巨人)发布。原理:(1)MQTT协议身份和消息格式有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。MQTT传输的消息分为:主题(Topic)和负载(payload)两部分Topic,可以理解为消息的类型,订阅者订阅(Su

  2. Tcl脚本入门笔记详解(一) - 2

    TCL脚本语言简介•TCL(ToolCommandLanguage)是一种解释执行的脚本语言(ScriptingLanguage),它提供了通用的编程能力:支持变量、过程和控制结构;同时TCL还拥有一个功能强大的固有的核心命令集。TCL经常被用于快速原型开发,脚本编程,GUI和测试等方面。•实际上包含了两个部分:一个语言和一个库。首先,Tcl是一种简单的脚本语言,主要使用于发布命令给一些互交程序如文本编辑器、调试器和shell。由于TCL的解释器是用C\C++语言的过程库实现的,因此在某种意义上我们又可以把TCL看作C库,这个库中有丰富的用于扩展TCL命令的C\C++过程和函数,所以,Tcl是

  3. 【详解】Docker安装Elasticsearch7.16.1集群 - 2

    开门见山|拉取镜像dockerpullelasticsearch:7.16.1|配置存放的目录#存放配置文件的文件夹mkdir-p/opt/docker/elasticsearch/node-1/config#存放数据的文件夹mkdir-p/opt/docker/elasticsearch/node-1/data#存放运行日志的文件夹mkdir-p/opt/docker/elasticsearch/node-1/log#存放IK分词插件的文件夹mkdir-p/opt/docker/elasticsearch/node-1/plugins若你使用了moba,直接右键新建即可如上图所示依次类推创建

  4. 【Elasticsearch基础】Elasticsearch索引、文档以及映射操作详解 - 2

    文章目录概念索引相关操作创建索引更新副本查看索引删除索引索引的打开与关闭收缩索引索引别名查询索引别名文档相关操作新建文档查询文档更新文档删除文档映射相关操作查询文档映射创建静态映射创建索引并添加映射概念es中有三个概念要清楚,分别为索引、映射和文档(不用死记硬背,大概有个印象就可以)索引可理解为MySQL数据库;映射可理解为MySQL的表结构;文档可理解为MySQL表中的每行数据静态映射和动态映射上面已经介绍了,映射可理解为MySQL的表结构,在MySQL中,向表中插入数据是需要先创建表结构的;但在es中不必这样,可以直接插入文档,es可以根据插入的文档(数据),动态的创建映射(表结构),这就

  5. 最强Http缓存策略之强缓存和协商缓存的详解与应用实例 - 2

    HTTP缓存是指浏览器或者代理服务器将已经请求过的资源保存到本地,以便下次请求时能够直接从缓存中获取资源,从而减少网络请求次数,提高网页的加载速度和用户体验。缓存分为强缓存和协商缓存两种模式。一.强缓存强缓存是指浏览器直接从本地缓存中获取资源,而不需要向web服务器发出网络请求。这是因为浏览器在第一次请求资源时,服务器会在响应头中添加相关缓存的响应头,以表明该资源的缓存策略。常见的强缓存响应头如下所述:Cache-ControlCache-Control响应头是用于控制强制缓存和协商缓存的缓存策略。该响应头中的指令如下:max-age:指定该资源在本地缓存的最长有效时间,以秒为单位。例如:Ca

  6. IDEA 2022 创建 Spring Boot 项目详解 - 2

    如何用IDEA2022创建并初始化一个SpringBoot项目?目录如何用IDEA2022创建并初始化一个SpringBoot项目?0. 环境说明1.  创建SpringBoot项目 2.编写初始化代码0. 环境说明IDEA2022.3.1JDK1.8SpringBoot1.  创建SpringBoot项目        打开IDEA,选择NewProject创建项目。        填写项目名称、项目构建方式、jdk版本,按需要修改项目文件路径等信息。        选择springboot版本以及需要的包,此处只选择了springweb。        此处需特别注意,若你使用的是jdk1

  7. 详解Unity中的粒子系统Particle System (二) - 2

    前言上一篇我们简要讲述了粒子系统是什么,如何添加,以及基本模块的介绍,以及对于曲线和颜色编辑器的讲解。从本篇开始,我们将按照模块结构讲解下去,本篇主要讲粒子系统的主模块,该模块主要是控制粒子的初始状态和全局属性的,以下是关于该模块的介绍,请大家指正。目录前言本系列提要一、粒子系统主模块1.阅读前注意事项2.参考图3.参数讲解DurationLoopingPrewarmStartDelayStartLifetimeStartSpeed3DStartSizeStartSize3DStartRotationStartRotationFlipRotationStartColorGravityModif

  8. VMware虚拟机与本地主机进行磁盘共享(详解) - 2

    VMware虚拟机与本地主机进行磁盘共享前提虚拟机版本为Windows10(专业版,不是可能有问题)本地主机为家庭版或学生版(此版本会有问题,但有替代方式)最好是专业版VMware操作1.关闭防火墙,全部关闭。2.打开电脑属性3.点击共享-》高级共享-》权限4.如果没有everyone,就添加权限选择完全控制,然后应用确定。5.打开cmd输入lusrmgr.msc(只有专业版可以打开)如果不是专业版,可以跳过这一步。点击用户-》administrator密码要复杂密码,否则不行。推荐admaiN@1234类型的密码。设置完密码,点击属性,将禁用解开。6.如果虚拟机的windows不是专业版,可

  9. ElasticSearch之 ik分词器详解 - 2

    IK分词器本文分为简介、安装、使用三个角度进行讲解。简介倒排索引众所周知,ES是一个及其强大的搜索引擎,那么它为什么搜索效率极高呢,当然和他的存储方式脱离不了关系,ES采取的是倒排索引,就是反向索引;常见索引结构几乎都是通过key找value,例如Map;倒排索引的优势就是有效利用Value,将多个含有相同Value的值存储至同一位置。分词器为了配合倒排索引,分词器也就诞生了,只有合理的利用Value,才会让倒排索引更加高效,如果一整个Value不进行任何操作直接进行存储,那么Value和key毫无区别。分词器Analyzer通常会对Value进行操作:一、字符过滤,过滤掉html标签;二、分

  10. Educational Codeforces Round 146 (Rated for Div. 2)(B,E详解) - 2

    题外话:抑郁场,开局一小时只出A,死活想不来B,最后因为D题出锅ura才保住可怜的分。但咱本来就写不到DB-LongLegs(数论)本题题解法一学自同样抑郁的知乎作者幽血魅影的题解,有讲解原理。法二来着知乎巨佬cup-pyy(大佬说《不难发现》呜呜)题意三种操作:向上走mmm步向右走mmm步给自己一次走的步数加111,即使得m=m+1m=m+1m=m+1问从(0,0)(0,0)(0,0)走到(a,b)(a,b)(a,b)的最小操作次数,值得注意的是操作三不可逆。解析假设我们最终一步的大小增长到mmm,那么在这个过程中我能以[1,m][1,m][1,m](当步数增长到该数时)之间的任何数字向上或

随机推荐