jjzjj

Flink SQl 客户端-Catalog(hive的catalog是重点)

a-tao必须奥利给 2023-09-04 原文

1、启动一个flink的 集群

  • 可以使用flink独立集群
  • 也可以使用yarn-session.sh
# 启动一个flinkyarn-sesion集群
yarn-sesion.sh -d

2、启动sql-client

sql-client.sh

3、测试命令行

-- 创建source表
CREATE TABLE datagen (
 id STRING,
 name STRING,
 age INT
) WITH (
 'connector' = 'datagen',
 'rows-per-second' = '5', -- 每秒生成的数据行数据
 'fields.id.length' = '5', --字段长度限制
 'fields.name.length'='3',
 'fields.age.min' ='1', -- 最小值
 'fields.age.max'='100' -- 最大值
)
-- 执行sql
-- 可以直接在命令行中查看直接结果
-- 会实时打印结果
select age,count(1) as c from datagen  group by age;

-- 输出结果模式
SET 'sql-client.execution.result-mode' = 'table';
SET 'sql-client.execution.result-mode' = 'changelog';
SET 'sql-client.execution.result-mode' = 'tableau';


-- 在flinksql中执行insert into
CREATE TABLE age_num_mysql (
  age INT,
  num BIGINT,
  PRIMARY KEY (age) NOT ENFORCED -- 按照主键更新数据
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://master:3306/bigdata17?useUnicode=true&characterEncoding=UTF-8',
   'table-name' = 'age_num', -- 需要手动到数据库中创建表
   'username' = 'root',
   'password' = '123456'
)

-- 在数据库中创建表
CREATE TABLE `age_num` (
  `age` int NOT NULL,
  `num` bigint(20) DEFAULT NULL,
  PRIMARY KEY (`age`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;


-- 插入数据
-- insert into 的语句会提交到flink的集群中运行,和本地客户端就没有关系了
insert into age_num_mysql
select age,count(1) as num from datagen  group by age;

执行:select age,count(1) as c from datagen group by age;

结果如下

执行:insert into age_num_mysql
select age,count(1) as num from datagen group by age;

效果如下

4、sql-client.sh -i

-- 可以将通用的sql放在一个初始的sql文件中
--文件中可以写多个sql,
vim sql-client.sql

CREATE CATALOG mysql_catalog WITH(
    'type' = 'jdbc',
    'default-database' = 'bigdata17',
    'username' = 'root',
    'password' = '123456',
    'base-url' = 'jdbc:mysql://master:3306'
);

use catalog mysql_catalog;

-- 启动sql-client 
sql-client.sh -i sql-client.sql

5、sql-client.sh -f

可以直接执行一个sql文件

-- 创建一个sql文件
vim age_num.sql
-- source 表
CREATE TABLE datagen (
 id STRING,
 name STRING,
 age INT
) WITH (
 'connector' = 'datagen',
 'rows-per-second' = '5', -- 每秒生成的数据行数据
 'fields.id.length' = '5', --字段长度限制
 'fields.name.length'='3',
 'fields.age.min' ='1', -- 最小值
 'fields.age.max'='100' -- 最大值
);

-- 多个sql使用分号分隔
-- sink表
CREATE TABLE age_num_mysql (
  age INT,
  num BIGINT,
  PRIMARY KEY (age) NOT ENFORCED -- 按照主键更新数据
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://master:3306/bigdata?useUnicode=true&characterEncoding=UTF-8',
   'table-name' = 'age_num', -- 需要手动到数据库中创建表
   'username' = 'root',
   'password' = '123456'
);

-- 插入数据
insert into age_num_mysql
select age,count(1) as num from datagen  group by age;

-- 启动
sql-client.sh -f age_num.sql

-- 效果和刚才的分步骤操作差不多,只不过这次是通过一个sql脚本文件一起操作

5、Catalog

catalog ---> database ---> table ---> 字段

catalog是flink用于保存元数据的一种机制

元数据(表结构)

1、GenericInMemoryCatalog(默认)

基于内存的catalog,元数据只在当前会话中起作用

也是flink默认的catalog

2、Jdbc Catalog 整库同步

只能在flink中直接读写数据库中的表

不能在JdbcCatalog中创建flink的表

-- 创建jdbc catalog
CREATE CATALOG mysql_catalog WITH(
    'type' = 'jdbc',
    'default-database' = 'bigdata',
    'username' = 'root',
    'password' = '123456',
    'base-url' = 'jdbc:mysql://master:3306'
);

-- 查看当前所有的catalog;
show catalogs;

-- 切换catalog
use catalog mysql_catalog;

3、hive catalog(重点)

hive catalog 可以用于flink读取hvie中的表,可以用于在hive元数据中保存flink的

  • 配置Hadoop classpath

    vim /etc/profile
    # 放在PAth的后面
    export HADOOP_CLASSPATH=`hadoop classpath`
    source /etc/profile
  • 将flink-sql-connector-hive-1.2.2_2.12-1.15.0.jar依赖包上传到flink的lib目录下

    nohup  hive --service metastore >> metastore.log 2>&1 &
  • 重启yarn-session集群

    # 查看yarn中正在运行的任务
    yarn application -list
    # 关闭yarn-session
    yarn application -kill application_1659099426082_0003
    # 启动yarn-session
    yarn-session.sh -d
  • 在sql-client中创建hive 的catalog

-- 进入sql客户端
sql-client.sh 

-- 创建hive catalog
CREATE CATALOG hive_catalog WITH (
    'type' = 'hive',
    'default-database' = 'bigdata17',
    'hive-conf-dir' = '/usr/local/soft/hive-1.2.1/conf'
);
-- set the HiveCatalog as the current catalog of the session
USE CATALOG hive_catalog;
  • 在flink中可以直接查询hive的表

    select * from students3;

将 hadoop-mapreduce-client-core-2.7.6.jar 放到 flink/lib 的目录下,然后kill掉已经启动的yarn任务,重新生成任务,再次执行结果如下:

  • 在flink中创建表,表的元数据可以保存到hive的元数据中

    flink将表的元数据保存在hive的元数据中,在hive中可以看到flink的表,但是不能对flink的表进行查询

    flink的元数据保存在hive中,元数据不会丢失

    CREATE TABLE datagen (
     id STRING,
     name STRING,
     age INT
    ) WITH (
     'connector' = 'datagen',
     'rows-per-second' = '1', -- 每秒生成的数据行数据
     'fields.id.length' = '5', --字段长度限制
     'fields.name.length'='3',
     'fields.age.min' ='1', -- 最小值
     'fields.age.max'='100' -- 最大值
    );

4、使用hive的方言

spark sql默认就监控hive语法的

-- 指定hive方言
-- 开启hive的方言之后就不能使用flink自己的语法了
set table.sql-dialect=hive; 
--默认方言
set table.sql-dialect=default;

5、使用hive的函数

-- 加载hive模块,在fink中就可以直接使用hive的函数了
LOAD MODULE hive WITH ('hive-version' = '1.2.1');
-- 查看所有的模块
SHOW MODULES;

flink 中不支持hive中的很多语法比如split

加载hive模块后,就能使用了如下:

有关Flink SQl 客户端-Catalog(hive的catalog是重点)的更多相关文章

  1. Hive SQL 五大经典面试题 - 2

    目录第1题连续问题分析:解法:第2题分组问题分析:解法:第3题间隔连续问题分析:解法:第4题打折日期交叉问题分析:解法:第5题同时在线问题分析:解法:第1题连续问题如下数据为蚂蚁森林中用户领取的减少碳排放量iddtlowcarbon10012021-12-1212310022021-12-124510012021-12-134310012021-12-134510012021-12-132310022021-12-144510012021-12-1423010022021-12-154510012021-12-1523.......找出连续3天及以上减少碳排放量在100以上的用户分析:遇到这类

  2. ruby - 在 TCPServer (Ruby) 中,我如何从客户端获取 IP/MAC? - 2

    我想在Ruby的TCPServer中获取客户端的IP地址。以及(如果可能的话)MAC地址。例如,Ruby中的时间服务器,请参阅评论。tcpserver=TCPServer.new("",80)iftcpserverputs"Listening"loopdosocket=tcpserver.acceptifsocketThread.newdoputs"Connectedfrom"+#HERE!HowcanigettheIPAddressfromtheclient?socket.write(Time.now.to_s)socket.closeendendendend非常感谢!

  3. ruby-on-rails - 为什么我必须在使用客户验证器后重新加载 rspec 中的记录? - 2

    我有一个模型User,它在创建后的回调中创建了选项#Userhas_one:user_optionsafter_create:create_optionsprivatedefcreate_optionsUserOptions.create(user:self)end我对此有一些简单的Rspec覆盖:describe"newuser"doit"createsuser_optionsaftertheuseriscreated"douser=create(:user)user.user_options.shouldbe_kind_of(UserOptions)endend一切正常,直到我将自

  4. ruby - 如何获得带有 SSL 客户端证书的 HTTPS 请求以与 Ruby EventMachine 一起使用? - 2

    我正在尝试使用RubyEventMachine访问使用SSL证书身份验证的HTTPSWeb服务,但我没有让它工作。我编写了以下简单代码块来对其进行端到端测试:require'rubygems'require'em-http'EventMachine.rundourl='https://foobar.com/'ssl_opts={:private_key_file=>'/tmp/private.key',:cert_chain_file=>'/tmp/ca.pem',:verify_peer=>false}http=EventMachine::HttpRequest.new(url).g

  5. ruby-on-rails - 在 Ruby on Rails 应用程序中使用客户端 SSL - 2

    我正在为需要与API建立SSL连接的客户端开发应用程序。我得到了三个文件;一个信任根证书(.cer)文件、一个中间证书(.cer)文件和一个签名的响应文件。我得到的安装说明与IIS或Javakeytool程序有关;我正在用RubyonRails构建应用程序,所以这两种方法都不是一个选项(据我所知)。证书由运行API服务的组织自签名,看来我获得了客户端证书以相互验证https连接。我不确定如何使用我的应用程序中的证书连接和使用API签名响应文件的作用我读过"Usingaself-signedcertificate"和thisarticleonOpenSSLinRuby但两者似乎都不是很到

  6. ruby - 为什么这个启用 SSL 的 Ruby 服务器/客户端测试有效? - 2

    我正在努力在Ruby中创建启用SSL的服务器,以及与服务器一起使用的相应Ruby客户端。为了进行测试,我使用以下命令创建了自己的根CA证书。$:~/devel/ssl-test/ssl/CA$opensslgenrsa-outTestCA.key2048GeneratingRSAprivatekey,2048bitlongmodulus............+++...........................+++eis65537(0x10001)$:~/devel/ssl-test/ssl/CA$opensslreq-new-keyTestCA.key-outTestCA.

  7. 大数据之Hadoop数据仓库Hive - 2

    目录:一、简介二、HQL的执行流程三、索引四、索引案例五、Hive常用DDL操作六、Hive常用DML操作七、查询结果插入到表八、更新和删除操作九、查询结果写出到文件系统十、HiveCLI和Beeline命令行的基本使用十一、Hive配置一、简介Hive是一个构建在Hadoop之上的数据仓库,它可以将结构化的数据文件映射成表,并提供类SQL查询功能,用于查询的SQL语句会被转化为MapReduce作业,然后提交到Hadoop上运行。特点:简单、容易上手(提供了类似sql的查询语言hql),使得精通sql但是不了解Java编程的人也能很好地进行大数据分析;灵活性高,可以自定义用户函数(UDF)和

  8. ruby-on-rails - 在处理电子邮件回复时,我怎样才能忽略任何电子邮件客户端细节和历史记录? - 2

    我有一个通过IMAP处理传入电子邮件的Rails应用程序。当前使用一种方法来搜索TMail对象的各个部分以查找给定的content_type:defself.search_parts_for_content_type(parts,content_type='text/html')parts.eachdo|part|ifpart.content_type==content_typereturnpart.bodyelseifpart.multipart?ifbody=self.search_parts_for_content_type(part.parts,content_type)ret

  9. ruby-on-rails - Ruby on Rails & Prawn PDF - 创建客户名单 - 2

    我正在尝试使用Prawn生成PDF报告,我可以通过传递单个ID轻松地让它对表演Action进行报告,但我想生成一个包含其中每条记录的报告。就像一个标准的railsscaffold索引页面。使用rails它看起来像这样:简单!但我不确定如何用Prawn做到这一点..类似于:defindex@customer=Customer.allrespond_todo|format|format.htmlPrawn::Document.generate("customer_list.pdf")do|pdf|pdf.text"#{@customer.id}"pdf.text"#{@customer.n

  10. ruby - 具有 HTTPS、SSL 客户端证书和 Keep-Alive 支持的 Ruby HTTP 库? - 2

    我正在尝试用Ruby编写一个HTTPS客户端。它将使用HTTPS连接到服务器,传递身份验证token(通过单独的登录过程获得)和SSL客户端证书。我正在使用rest-client执行以下操作:client=RestClient::Resource.new(url,:ssl_client_cert=>OpenSSL::X509::Certificate.new(File.read('./certificate/client-2048.pem')),:ssl_client_key=>OpenSSL::PKey::RSA.new(File.read('./certificate/client

随机推荐