# 启动一个flinkyarn-sesion集群
yarn-sesion.sh -d
sql-client.sh

-- 创建source表
CREATE TABLE datagen (
id STRING,
name STRING,
age INT
) WITH (
'connector' = 'datagen',
'rows-per-second' = '5', -- 每秒生成的数据行数据
'fields.id.length' = '5', --字段长度限制
'fields.name.length'='3',
'fields.age.min' ='1', -- 最小值
'fields.age.max'='100' -- 最大值
)
-- 执行sql
-- 可以直接在命令行中查看直接结果
-- 会实时打印结果
select age,count(1) as c from datagen group by age;
-- 输出结果模式
SET 'sql-client.execution.result-mode' = 'table';
SET 'sql-client.execution.result-mode' = 'changelog';
SET 'sql-client.execution.result-mode' = 'tableau';
-- 在flinksql中执行insert into
CREATE TABLE age_num_mysql (
age INT,
num BIGINT,
PRIMARY KEY (age) NOT ENFORCED -- 按照主键更新数据
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://master:3306/bigdata17?useUnicode=true&characterEncoding=UTF-8',
'table-name' = 'age_num', -- 需要手动到数据库中创建表
'username' = 'root',
'password' = '123456'
)
-- 在数据库中创建表
CREATE TABLE `age_num` (
`age` int NOT NULL,
`num` bigint(20) DEFAULT NULL,
PRIMARY KEY (`age`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- 插入数据
-- insert into 的语句会提交到flink的集群中运行,和本地客户端就没有关系了
insert into age_num_mysql
select age,count(1) as num from datagen group by age;
执行:select age,count(1) as c from datagen group by age;
结果如下

执行:insert into age_num_mysql
select age,count(1) as num from datagen group by age;效果如下

-- 可以将通用的sql放在一个初始的sql文件中
--文件中可以写多个sql,
vim sql-client.sql
CREATE CATALOG mysql_catalog WITH(
'type' = 'jdbc',
'default-database' = 'bigdata17',
'username' = 'root',
'password' = '123456',
'base-url' = 'jdbc:mysql://master:3306'
);
use catalog mysql_catalog;
-- 启动sql-client
sql-client.sh -i sql-client.sql

可以直接执行一个sql文件
-- 创建一个sql文件
vim age_num.sql
-- source 表
CREATE TABLE datagen (
id STRING,
name STRING,
age INT
) WITH (
'connector' = 'datagen',
'rows-per-second' = '5', -- 每秒生成的数据行数据
'fields.id.length' = '5', --字段长度限制
'fields.name.length'='3',
'fields.age.min' ='1', -- 最小值
'fields.age.max'='100' -- 最大值
);
-- 多个sql使用分号分隔
-- sink表
CREATE TABLE age_num_mysql (
age INT,
num BIGINT,
PRIMARY KEY (age) NOT ENFORCED -- 按照主键更新数据
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://master:3306/bigdata?useUnicode=true&characterEncoding=UTF-8',
'table-name' = 'age_num', -- 需要手动到数据库中创建表
'username' = 'root',
'password' = '123456'
);
-- 插入数据
insert into age_num_mysql
select age,count(1) as num from datagen group by age;
-- 启动
sql-client.sh -f age_num.sql
-- 效果和刚才的分步骤操作差不多,只不过这次是通过一个sql脚本文件一起操作
catalog ---> database ---> table ---> 字段
catalog是flink用于保存元数据的一种机制
元数据(表结构)
基于内存的catalog,元数据只在当前会话中起作用
也是flink默认的catalog
只能在flink中直接读写数据库中的表
不能在JdbcCatalog中创建flink的表
-- 创建jdbc catalog
CREATE CATALOG mysql_catalog WITH(
'type' = 'jdbc',
'default-database' = 'bigdata',
'username' = 'root',
'password' = '123456',
'base-url' = 'jdbc:mysql://master:3306'
);
-- 查看当前所有的catalog;
show catalogs;
-- 切换catalog
use catalog mysql_catalog;
hive catalog 可以用于flink读取hvie中的表,可以用于在hive元数据中保存flink的
配置Hadoop classpath
vim /etc/profile
# 放在PAth的后面
export HADOOP_CLASSPATH=`hadoop classpath`
source /etc/profile 将flink-sql-connector-hive-1.2.2_2.12-1.15.0.jar依赖包上传到flink的lib目录下
nohup hive --service metastore >> metastore.log 2>&1 & 重启yarn-session集群
# 查看yarn中正在运行的任务
yarn application -list
# 关闭yarn-session
yarn application -kill application_1659099426082_0003
# 启动yarn-session
yarn-session.sh -d 在sql-client中创建hive 的catalog
-- 进入sql客户端
sql-client.sh
-- 创建hive catalog
CREATE CATALOG hive_catalog WITH (
'type' = 'hive',
'default-database' = 'bigdata17',
'hive-conf-dir' = '/usr/local/soft/hive-1.2.1/conf'
);
-- set the HiveCatalog as the current catalog of the session
USE CATALOG hive_catalog;
在flink中可以直接查询hive的表
select * from students3; 
将 hadoop-mapreduce-client-core-2.7.6.jar 放到 flink/lib 的目录下,然后kill掉已经启动的yarn任务,重新生成任务,再次执行结果如下:

在flink中创建表,表的元数据可以保存到hive的元数据中
flink将表的元数据保存在hive的元数据中,在hive中可以看到flink的表,但是不能对flink的表进行查询
flink的元数据保存在hive中,元数据不会丢失
CREATE TABLE datagen (
id STRING,
name STRING,
age INT
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1', -- 每秒生成的数据行数据
'fields.id.length' = '5', --字段长度限制
'fields.name.length'='3',
'fields.age.min' ='1', -- 最小值
'fields.age.max'='100' -- 最大值
); 
spark sql默认就监控hive语法的
-- 指定hive方言
-- 开启hive的方言之后就不能使用flink自己的语法了
set table.sql-dialect=hive;
--默认方言
set table.sql-dialect=default;
-- 加载hive模块,在fink中就可以直接使用hive的函数了
LOAD MODULE hive WITH ('hive-version' = '1.2.1');
-- 查看所有的模块
SHOW MODULES;
flink 中不支持hive中的很多语法比如split
加载hive模块后,就能使用了如下:


目录第1题连续问题分析:解法:第2题分组问题分析:解法:第3题间隔连续问题分析:解法:第4题打折日期交叉问题分析:解法:第5题同时在线问题分析:解法:第1题连续问题如下数据为蚂蚁森林中用户领取的减少碳排放量iddtlowcarbon10012021-12-1212310022021-12-124510012021-12-134310012021-12-134510012021-12-132310022021-12-144510012021-12-1423010022021-12-154510012021-12-1523.......找出连续3天及以上减少碳排放量在100以上的用户分析:遇到这类
我想在Ruby的TCPServer中获取客户端的IP地址。以及(如果可能的话)MAC地址。例如,Ruby中的时间服务器,请参阅评论。tcpserver=TCPServer.new("",80)iftcpserverputs"Listening"loopdosocket=tcpserver.acceptifsocketThread.newdoputs"Connectedfrom"+#HERE!HowcanigettheIPAddressfromtheclient?socket.write(Time.now.to_s)socket.closeendendendend非常感谢!
我有一个模型User,它在创建后的回调中创建了选项#Userhas_one:user_optionsafter_create:create_optionsprivatedefcreate_optionsUserOptions.create(user:self)end我对此有一些简单的Rspec覆盖:describe"newuser"doit"createsuser_optionsaftertheuseriscreated"douser=create(:user)user.user_options.shouldbe_kind_of(UserOptions)endend一切正常,直到我将自
我正在尝试使用RubyEventMachine访问使用SSL证书身份验证的HTTPSWeb服务,但我没有让它工作。我编写了以下简单代码块来对其进行端到端测试:require'rubygems'require'em-http'EventMachine.rundourl='https://foobar.com/'ssl_opts={:private_key_file=>'/tmp/private.key',:cert_chain_file=>'/tmp/ca.pem',:verify_peer=>false}http=EventMachine::HttpRequest.new(url).g
我正在为需要与API建立SSL连接的客户端开发应用程序。我得到了三个文件;一个信任根证书(.cer)文件、一个中间证书(.cer)文件和一个签名的响应文件。我得到的安装说明与IIS或Javakeytool程序有关;我正在用RubyonRails构建应用程序,所以这两种方法都不是一个选项(据我所知)。证书由运行API服务的组织自签名,看来我获得了客户端证书以相互验证https连接。我不确定如何使用我的应用程序中的证书连接和使用API签名响应文件的作用我读过"Usingaself-signedcertificate"和thisarticleonOpenSSLinRuby但两者似乎都不是很到
我正在努力在Ruby中创建启用SSL的服务器,以及与服务器一起使用的相应Ruby客户端。为了进行测试,我使用以下命令创建了自己的根CA证书。$:~/devel/ssl-test/ssl/CA$opensslgenrsa-outTestCA.key2048GeneratingRSAprivatekey,2048bitlongmodulus............+++...........................+++eis65537(0x10001)$:~/devel/ssl-test/ssl/CA$opensslreq-new-keyTestCA.key-outTestCA.
目录:一、简介二、HQL的执行流程三、索引四、索引案例五、Hive常用DDL操作六、Hive常用DML操作七、查询结果插入到表八、更新和删除操作九、查询结果写出到文件系统十、HiveCLI和Beeline命令行的基本使用十一、Hive配置一、简介Hive是一个构建在Hadoop之上的数据仓库,它可以将结构化的数据文件映射成表,并提供类SQL查询功能,用于查询的SQL语句会被转化为MapReduce作业,然后提交到Hadoop上运行。特点:简单、容易上手(提供了类似sql的查询语言hql),使得精通sql但是不了解Java编程的人也能很好地进行大数据分析;灵活性高,可以自定义用户函数(UDF)和
我有一个通过IMAP处理传入电子邮件的Rails应用程序。当前使用一种方法来搜索TMail对象的各个部分以查找给定的content_type:defself.search_parts_for_content_type(parts,content_type='text/html')parts.eachdo|part|ifpart.content_type==content_typereturnpart.bodyelseifpart.multipart?ifbody=self.search_parts_for_content_type(part.parts,content_type)ret
我正在尝试使用Prawn生成PDF报告,我可以通过传递单个ID轻松地让它对表演Action进行报告,但我想生成一个包含其中每条记录的报告。就像一个标准的railsscaffold索引页面。使用rails它看起来像这样:简单!但我不确定如何用Prawn做到这一点..类似于:defindex@customer=Customer.allrespond_todo|format|format.htmlPrawn::Document.generate("customer_list.pdf")do|pdf|pdf.text"#{@customer.id}"pdf.text"#{@customer.n
我正在尝试用Ruby编写一个HTTPS客户端。它将使用HTTPS连接到服务器,传递身份验证token(通过单独的登录过程获得)和SSL客户端证书。我正在使用rest-client执行以下操作:client=RestClient::Resource.new(url,:ssl_client_cert=>OpenSSL::X509::Certificate.new(File.read('./certificate/client-2048.pem')),:ssl_client_key=>OpenSSL::PKey::RSA.new(File.read('./certificate/client