目录
(1)DIM层的设计依据是维度建模理论,该层存储维度模型的维度表。
(2)DIM层的数据存储在 HBase 表中。
DIM 层表是用于维度关联的,要通过主键去获取相关维度信息,这种场景下 K-V 类型数据库的效率较高。常见的 K-V 类型数据库有 Redis、HBase,而 Redis 的数据常驻内存,会给内存造成较大压力,因而选用 HBase 存储维度数据。
(3)DIM层表名的命名规范为dim_表名
Kafka---topic_db(包含所有的46张业务表)
过滤出所需要的维表数据
过滤条件:在代码中给定十几张维表的表名
问题:如果增加维表,需要修改代码-重新编译-打包-上传、重启任务
优化1:不修改代码、只重启任务
配置信息中保存需要的维表信息,配置信息只在程序启动的时候加载一次
优化2:不修改代码、不重启任务
方向:让程序在启动以后还可以获取配置信息中增加的内容
具体实施:
1) 定时任务:每隔一段时间加载一次配置信息
将定时任务写在Open方法
2) 监控配置信息:一旦配置信息增加了数据,可以立马获取到
(1) MySQLBinlog:FlinkCDC监控直接创建流
a.将配置信息处理成广播流:缺点 -> 如果配置信息过大,冗余太多
b.按照表名进行KeyBy处理:缺点 -> 有可能产生数据倾斜
(2) 文件:Flume->Kafka->Flink消费创建流
将数据写出到Phoenix、JdbcSink、自定义Sink
本层的任务是将业务数据直接写入到不同的 HBase 表中。那么如何让程序知道流中的哪些数据是维度数据?维度数据又应该写到 HBase 的哪些表中?为了解决这个问题,我们选择在 MySQL 中构建一张配置表,通过 Flink CDC 将配置表信息读取到程序中。
1)字段解析
我们将为配置表设计五个字段
- source_table:作为数据源的业务数据表名
- sink_table:作为数据目的地的 Phoenix 表名
- sink_columns:Phoenix 表字段
- sink_pk:Phoenix 表主键
- sink_extend:Phoenix 建表扩展,即建表时一些额外的配置语句
将 source_table 作为配置表的主键,可以通过它获取唯一的目标表名、字段、主键和建表扩展,从而得到完整的 Phoenix 建表语句。
数据格式:
{"before":null,"after":
{"source_table":"aa","sink_table":"bb","sink_columns":"cc","sink_pk":"id","sink_extend":"xxx"},"source":
{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":165251303
9549,"snapshot":"false","db":"gmall-211126-
config","sequence":null,"table":"table_process","server_id":0,"gtid":null,"file":"","pos":0
,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1652513039551,"transaction":null}
2)在Mysql中创建数据库建表并开启Binlog
(1)创建数据库 gmall_config ,注意:和 gmall 业务库区分开
[atguigu@hadoop102 db_log]$ mysql -uroot -p000000 -e"create database gmall_config charset
utf8 default collate utf8_general_ci"
(2)在 gmall_config 库中创建配置表 table_process
CREATE TABLE `table_process` (
`source_table` varchar(200) NOT NULL COMMENT '来源表',
`sink_table` varchar(200) DEFAULT NULL COMMENT '输出表',
`sink_columns` varchar(2000) DEFAULT NULL COMMENT '输出字段',
`sink_pk` varchar(200) DEFAULT NULL COMMENT '主键字段',
`sink_extend` varchar(200) DEFAULT NULL COMMENT '建表扩展',
PRIMARY KEY (`source_table`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
(3)在MySQL配置文件中增加 gmall_config 开启Binlog
[axing@hadoop107 ~]$ sudo vim /etc/my.cnf

(4)为了方便测试,目前就插入两张表名数据,作为维度表

对Maxwell抓取的数据进行ETL,有用的部分保留,没用的过滤掉。
由于Maxwell是把全部数据统一写入一个Topic中, 这样显然不利于日后的数据处理。所以需要把各个维度表拆开处理。
在实时计算中一般把维度数据写入存储容器,一般是方便通过主键查询的数据库比如HBase,Redis,MySQL等。
这样的配置不适合写在配置文件中,因为这样的话,业务端随着需求变化每增加一张维度表表,就要修改配置重启计算程序。
所以这里需要一种动态配置方案,把这种配置长期保存起来,一旦配置有变化,实时计算可以自动感知。这种可以有三个方案实现:
一种是用Zookeeper存储,通过Watch感知数据变化;
另一种是用mysql数据库存储,周期性的同步;
再一种是用mysql数据库存储,使用广播流。
这里选择第三种方案,主要是MySQL对于配置数据初始化和维护管理,使用FlinkCDC读取配置信息表,将配置流作为广播流与主流进行连接。

维度数据保存到HBase的表中。
1)创建 KafkaUtil 工具类
和 Kafka 交互要用到 Flink 提供的 FlinkKafkaConsumer、FlinkKafkaProducer 类,为了提高模板代码的复用性,将其封装到 KafkaUtil 工具类中。
此处从 Kafka 读取数据,创建 getKafkaConsumer(String topic, String groupId) 方法
public class KafkaUtil {
static String BOOTSTRAP_SERVERS = "hadoop102:9092, hadoop103:9092, hadoop104:9092";
static String DEFAULT_TOPIC = "default_topic";
public static FlinkKafkaConsumer<String> getKafkaConsumer(String topic, String groupId) {
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS);
prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topic,
new KafkaDeserializationSchema<String>() {
@Override
public boolean isEndOfStream(String nextElement) {
return false;
}
@Override
public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
if(record != null && record.value() != null) {
return new String(record.value());
}
return null;
}
@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
}, prop);
return consumer;
}
}
2)主程序
package com.atguigu.app.dim;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.app.func.TableProcessFunction;
import com.atguigu.bean.TableProcess;
import com.atguigu.utils.MyKafkaUtil;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class DimApp {
public static void main(String[] args) throws Exception {
//1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);//生产环境,并行度应设置为kafka主题的分区数
/*
//生产环境下使用:
//1.1 开启checkpoint
env.enableCheckpointing(5*6000L, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(10*6000L);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,5000L));
//1.2 设置状态后端
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop107:8020/211126/ck");
System.setProperty("HADOOP_USER_NAME","atguigu");
*/
//2.读取kafka topic_db主题数据创建主流
String topic ="topic_db";
String groupId = "dim_app_211126";
DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(topic, groupId));
//3.过滤掉非JSON数据以及保留新增、变化以及初始化数据并将数据转换为JSON格式
SingleOutputStreamOperator<JSONObject> filterJsonDS = kafkaDS.flatMap(new FlatMapFunction<String, JSONObject>() {
@Override
public void flatMap(String value, Collector<JSONObject> collector) throws Exception {
try {
//将数据装换为JSON格式
JSONObject jsonObject = JSON.parseObject(value);
//获取数据中的操作类型字段
String type = jsonObject.getString("type");
//保留新增、变化、以及初始化数据
if ("insert".equals(type) || "update".equals(type) || "bootstrap-insert".equals(type)) {
collector.collect(jsonObject);
}
} catch (Exception e) {
System.out.println("脏数据:" + value);//或者写入侧输出流
}
}
});
//4.使用FlinkCDC读取mysql配置信息表创建配置流
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("hadoop107")
.port(3306)
.username("root")
.password("000000")
.databaseList("gmall-config")
.tableList("gmall-config.table_process")
.startupOptions(StartupOptions.initial())
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
DataStreamSource<String> mysqlSourceDS = env.fromSource(mySqlSource,
WatermarkStrategy.noWatermarks(),
"MysqlSource"
);
//5.将配置流处理为广播流
MapStateDescriptor<String, TableProcess> mapStateDescriptor = new MapStateDescriptor<>("map-state", String.class, TableProcess.class);
BroadcastStream<String> broadcastStream = mysqlSourceDS.broadcast(mapStateDescriptor);
//6.连接主流和广播流
BroadcastConnectedStream<JSONObject, String> connectedStream = filterJsonDS.connect(broadcastStream);
//7.处理连接流,根据配置信息处理主流数据
SingleOutputStreamOperator<JSONObject> dimDS = connectedStream.process(new TableProcessFunction(mapStateDescriptor));
//8.将数据写出到Phoenix
dimDS.print(">>>>>");
//9.启动任务
env.execute();
}
}
1)导入依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-spark</artifactId>
<version>5.0.0-HBase-2.0</version>
<exclusions>
<exclusion>
<groupId>org.glassfish</groupId>
<artifactId>javax.el</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 如果不引入 flink-table 相关依赖,则会报错:
Caused by: java.lang.ClassNotFoundException:
org.apache.flink.connector.base.source.reader.RecordEmitter
引入以下依赖可以解决这个问题(引入某些其它的 flink-table相关依赖也可)
-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>1.13.0</version>
</dependency>
2)创建配置表实体类
package com.atguigu.gmall.realtime.bean;
import lombok.Data;
@Data
public class TableProcess {
//来源表
String sourceTable;
//输出表
String sinkTable;
//输出字段
String sinkColumns;
//主键字段
String sinkPk;
//建表扩展
String sinkExtend;
}
3)编写操作读取配置表形成广播流
// TODO 6. FlinkCDC 读取配置流并广播流
// 6.1 FlinkCDC 读取配置表信息
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("hadoop102")
.port(3306)
.databaseList("gmall_config") // set captured database
.tableList("gmall_config.table_process") // set captured table
.username("root")
.password("000000")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.startupOptions(StartupOptions.initial())
.build();
// 6.2 封装为流
DataStreamSource<String> mysqlDSSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MysqlSource");
// 6.3 广播配置流
MapStateDescriptor<String, TableProcess> tableConfigDescriptor = new MapStateDescriptor<String, TableProcess>("table-process-state", String.class, TableProcess.class);
BroadcastStream<String> broadcastDS = mysqlDSSource.broadcast(tableConfigDescriptor);
// TODO 7. 连接流
BroadcastConnectedStream<JSONObject, String> connectedStream = filterDS.connect(broadcastDS);
4)定义一个项目中常用的配置常量类GmallConfig
package com.atguigu.gmall.realtime.common;
public class GmallConfig {
// Phoenix库名
public static final String HBASE_SCHEMA = "GMALL2022_REALTIME";
// Phoenix驱动
public static final String PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver";
// Phoenix连接参数
public static final String PHOENIX_SERVER = "jdbc:phoenix:hadoop102,hadoop103,hadoop104:2181";
}
5)自定义函数MyBroadcastFunction

(1)定义类MyBroadcastFunction
package com.atguigu.gmall.realtime.app.func;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
public class MyBroadcastFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {
private MapStateDescriptor<String, TableProcess> tableConfigDescriptor;
public MyBroadcastFunction(MapStateDescriptor<String, TableProcess> tableConfigDescriptor) {
this.tableConfigDescriptor = tableConfigDescriptor;
}
@Override
public void processElement(JSONObject jsonObj, ReadOnlyContext readOnlyContext, Collector<JSONObject> out) throws Exception {
}
@Override
public void processBroadcastElement(String jsonStr, Context context, Collector<JSONObject> out) throws Exception {
}
}
(2)自定义函数MyBroadcastFunction-open
// 定义Phoenix的连接
private Connection conn;
@Override
public void open(Configuration parameter) throws Exception {
super.open(parameter);
Class.forName(GmallConfig.PHOENIX_DRIVER);
conn = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
}
(3)自定义函数MyBroadcastFunction-processBroadcastElement
@Override
public void processBroadcastElement(String jsonStr, Context context, Collector<JSONObject> out) throws Exception {
JSONObject jsonObj = JSON.parseObject(jsonStr);
BroadcastState<String, TableProcess> tableConfigState = context.getBroadcastState(tableConfigDescriptor);
String op = jsonObj.getString("op");
if ("d".equals(op)) {
TableProcess before = jsonObj.getObject("before", TableProcess.class);
String sourceTable = before.getSourceTable();
tableConfigState.remove(sourceTable);
} else {
TableProcess config = jsonObj.getObject("after", TableProcess.class);
String sourceTable = config.getSourceTable();
String sinkTable = config.getSinkTable();
String sinkColumns = config.getSinkColumns();
String sinkPk = config.getSinkPk();
String sinkExtend = config.getSinkExtend();
tableConfigState.put(sourceTable, config);
checkTable(sinkTable, sinkColumns, sinkPk, sinkExtend);
}
}
(4)自定义函数MyBroadcastFunction-checkTable
在 Phoenix 建表之前要先创建命名空间 GMALL2022_REALTIM。
0: jdbc:phoenix:> create schema GMALL2022_REALTIME;
checkTable() 方法如下
/**
* Phoenix 建表函数
*
* @param sinkTable 目标表名 eg. test
* @param sinkColumns 目标表字段 eg. id,name,sex
* @param sinkPk 目标表主键 eg. id
* @param sinkExtend 目标表建表扩展字段 eg. ""
* eg. create table if not exists mydb.test(id varchar primary key, name varchar, sex varchar)...
*/
private void checkTable(String sinkTable, String sinkColumns, String sinkPk, String sinkExtend) {
// 封装建表 SQL
StringBuilder sql = new StringBuilder();
sql.append("create table if not exists " + GmallConfig.HBASE_SCHEMA
+ "." + sinkTable + "(\n");
String[] columnArr = sinkColumns.split(",");
// 为主键及扩展字段赋默认值
if (sinkPk == null) {
sinkPk = "id";
}
if (sinkExtend == null) {
sinkExtend = "";
}
// 遍历添加字段信息
for (int i = 0; i < columnArr.length; i++) {
sql.append(columnArr[i] + " varchar");
// 判断当前字段是否为主键
if (sinkPk.equals(columnArr[i])) {
sql.append(" primary key");
}
// 如果当前字段不是最后一个字段,则追加","
if (i < columnArr.length - 1) {
sql.append(",\n");
}
}
sql.append(")");
sql.append(sinkExtend);
String createStatement = sql.toString();
// 为数据库操作对象赋默认值,执行建表 SQL
PreparedStatement preparedSt = null;
try {
preparedSt = conn.prepareStatement(createStatement);
preparedSt.execute();
} catch (SQLException sqlException) {
sqlException.printStackTrace();
System.out.println("建表语句\n" + createStatement + "\n执行异常");
} finally {
if (preparedSt != null) {
try {
preparedSt.close();
} catch (SQLException sqlException) {
sqlException.printStackTrace();
throw new RuntimeException("数据库操作对象释放异常");
}
}
}
}
(5)自定义函数MyBroadcastFunction-processElement()
@Override
public void processElement(JSONObject jsonObj, ReadOnlyContext readOnlyContext, Collector<JSONObject> out) throws Exception {
ReadOnlyBroadcastState<String, TableProcess> tableConfigState = readOnlyContext.getBroadcastState(tableConfigDescriptor);
// 获取配置信息
String sourceTable = jsonObj.getString("table");
TableProcess tableConfig = tableConfigState.get(sourceTable);
if (tableConfig != null) {
JSONObject data = jsonObj.getJSONObject("data");
String sinkTable = tableConfig.getSinkTable();
// 根据 sinkColumns 过滤数据
String sinkColumns = tableConfig.getSinkColumns();
filterColumns(data, sinkColumns);
// 将目标表名加入到主流数据中
data.put("sinkTable", sinkTable);
out.collect(data);
}
}
(6)自定义函数MyBroadcastFunction-filterColumns(),校验字段,过滤掉多余的字段
private void filterColumns(JSONObject data, String sinkColumns) {
Set<Map.Entry<String, Object>> dataEntries = data.entrySet();
dataEntries.removeIf(r -> !sinkColumns.contains(r.getKey()));
}
(7)主程序DimSinkApp中调用MyBroadcastFunction提取维度数据
// TODO 8. 处理维度表数据
SingleOutputStreamOperator<JSONObject> dimDS = connectedStream.process(
new MyBroadcastFunction(tableConfigDescriptor)
);
1)程序流程分析

DimSink 继承了RickSinkFunction,这个function得分两条时间线:
一条是任务启动时执行open操作(图中紫线),我们可以把连接的初始化工作放在此处一次性执行;
另一条是随着每条数据的到达反复执行invoke()(图中黑线),在这里面我们要实现数据的保存,主要策略就是根据数据组合成sql提交给hbase。
2)创建 PhoenixUtil 工具类,在其中创建insertValues()方法
package com.atguigu.gmall.realtime.util;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.realtime.common.GmallConfig;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.lang3.StringUtils;
import java.sql.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
public class PhoenixUtil {
/**
* Phoenix 表数据导入方法
*
* @param conn 连接对象
* @param sinkTable 写入数据的 Phoenix 目标表名
* @param data 待写入的数据
*/
public static void insertValues(Connection conn, String sinkTable, JSONObject data) {
// 获取字段名
Set<String> columns = data.keySet();
// 获取字段对应的值
Collection<Object> values = data.values();
// 拼接字段名
String columnStr = StringUtils.join(columns, ",");
// 拼接字段值
String valueStr = StringUtils.join(values, "','");
// 拼接插入语句
String sql = "upsert into " + GmallConfig.HBASE_SCHEMA
+ "." + sinkTable + "(" +
columnStr + ") values ('" + valueStr + "')";
// 为数据库操作对象赋默认值
PreparedStatement preparedSt = null;
// 执行 SQL
try {
preparedSt = conn.prepareStatement(sql);
preparedSt.execute();
// 提交事务
conn.commit();
} catch (SQLException sqlException) {
sqlException.printStackTrace();
throw new RuntimeException("数据库操作对象获取或执行异常");
} finally {
if (preparedSt != null) {
try {
preparedSt.close();
} catch (SQLException sqlException) {
sqlException.printStackTrace();
throw new RuntimeException("数据库操作对象释放异常");
}
}
}
}
}
3)MyPhoenixSink
自定义 SinkFunction 子类 MyPhoenixSink,在其中调用 Phoenix 工具类的 insertValues(String sinkTable, JSONObject data) 方法,将维度数据写出到 Phoenix 的维度表中。为了提升效率,减少频繁创建销毁连接带来的性能损耗,创建连接池。
(1)添加德鲁伊连接池依赖
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.16</version>
</dependency>
(2)连接池创建工具类
package com.atguigu.gmall.realtime.util;
import com.alibaba.druid.pool.DruidDataSource;
public class DruidDSUtil {
private static DruidDataSource druidDataSource;
public static DruidDataSource createDataSource() {
// 创建连接池
druidDataSource = new DruidDataSource();
// 设置驱动全类名
druidDataSource.setDriverClassName(GmallConfig.PHOENIX_DRIVER);
// 设置连接 url
druidDataSource.setUrl(GmallConfig.PHOENIX_SERVER);
// 设置初始化连接池时池中连接的数量
druidDataSource.setInitialSize(5);
// 设置同时活跃的最大连接数
druidDataSource.setMaxActive(20);
// 设置空闲时的最小连接数,必须介于 0 和最大连接数之间,默认为 0
druidDataSource.setMinIdle(1);
// 设置没有空余连接时的等待时间,超时抛出异常,-1 表示一直等待
druidDataSource.setMaxWait(-1);
// 验证连接是否可用使用的 SQL 语句
druidDataSource.setValidationQuery("select 1");
// 指明连接是否被空闲连接回收器(如果有)进行检验,如果检测失败,则连接将被从池中去除
// 注意,默认值为 true,如果没有设置 validationQuery,则报错
// testWhileIdle is true, validationQuery not set
druidDataSource.setTestWhileIdle(true);
// 借出连接时,是否测试,设置为 false,不测试,否则很影响性能
druidDataSource.setTestOnBorrow(false);
// 归还连接时,是否测试
druidDataSource.setTestOnReturn(false);
// 设置空闲连接回收器每隔 30s 运行一次
druidDataSource.setTimeBetweenEvictionRunsMillis(30 * 1000L);
// 设置池中连接空闲 30min 被回收,默认值即为 30 min
druidDataSource.setMinEvictableIdleTimeMillis(30 * 60 * 1000L);
return druidDataSource;
}
}
(3)MyPhoenixSink 函数
package com.atguigu.gmall.realtime.app.func;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.realtime.util.DruidDSUtil;
import com.atguigu.gmall.realtime.util.PhoenixUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.SQLException;
public class MyPhoenixSink extends RichSinkFunction<JSONObject> {
private DruidDataSource druidDataSource;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 创建连接池
druidDataSource = DruidDSUtil.createDataSource();
}
@Override
public void invoke(JSONObject jsonObj, Context context) throws Exception {
// 获取目标表表名
String sinkTable = jsonObj.getString("sinkTable");
// 获取 id 字段的值
String id = jsonObj.getString("id");
// 清除 JSON 对象中的 sinkTable 字段
// 以便可将该对象直接用于 HBase 表的数据写入
jsonObj.remove("sinkTable");
// 获取连接对象
DruidPooledConnection conn = druidDataSource.getConnection();
try {
PhoenixUtil.insertValues(conn, sinkTable, jsonObj);
} catch (Exception e) {
System.out.println("维度数据写入异常");
e.printStackTrace();
} finally {
try {
// 归还数据库连接对象
conn.close();
} catch (SQLException sqlException) {
System.out.println("数据库连接对象归还异常");
sqlException.printStackTrace();
}
}
}
}
4)主程序 DimSinkApp 中调用 MyPhoenixSink
// TODO 9. 将数据写入 Phoenix 表
dimDS.addSink(new MyPhoenixSink());
6)测试
(1)启动HDFS、ZK、Kafka、Maxwell、HBase
(2)运行 IDEA 中的 DimSinkApp
(3)执行 mysql_to_kafka_init.sh 脚本
mysql_to_kafka_init.sh all
(4)通过phoenix查看hbase的schema以及表情况
附:整个流程的步骤以及所需要的进程
数据流:web/app -> nginx -> 业务服务器 -> Mysql(binlog) -> Maxwell -> Kafka(ODS) -> FlinkApp -> Phoenix
程序:Mock -> Mysql(binlog) -> Maxwell -> Kafka(Zk) -> DimApp(FlinkCDC/Mysql) -> Phoenix(HBase/ZK/HDFS)
/**
* 需要启动的进程:
* dfs -> zookeeper -> kafka -> maxwell -> hbase -> phoenix(客户端):bin/sqlline.py
*/
我正在编写一个包含C扩展的gem。通常当我写一个gem时,我会遵循TDD的过程,我会写一个失败的规范,然后处理代码直到它通过,等等......在“ext/mygem/mygem.c”中我的C扩展和在gemspec的“扩展”中配置的有效extconf.rb,如何运行我的规范并仍然加载我的C扩展?当我更改C代码时,我需要采取哪些步骤来重新编译代码?这可能是个愚蠢的问题,但是从我的gem的开发源代码树中输入“bundleinstall”不会构建任何native扩展。当我手动运行rubyext/mygem/extconf.rb时,我确实得到了一个Makefile(在整个项目的根目录中),然后当
我已经在Sinatra上创建了应用程序,它代表了一个简单的API。我想在生产和开发上进行部署。我想在部署时选择,是开发还是生产,一些方法的逻辑应该改变,这取决于部署类型。是否有任何想法,如何完成以及解决此问题的一些示例。例子:我有代码get'/api/test'doreturn"Itisdev"end但是在部署到生产环境之后我想在运行/api/test之后看到ItisPROD如何实现? 最佳答案 根据SinatraDocumentation:EnvironmentscanbesetthroughtheRACK_ENVenvironm
我们的git存储库中目前有一个Gemfile。但是,有一个gem我只在我的环境中本地使用(我的团队不使用它)。为了使用它,我必须将它添加到我们的Gemfile中,但每次我checkout到我们的master/dev主分支时,由于与跟踪的gemfile冲突,我必须删除它。我想要的是类似Gemfile.local的东西,它将继承从Gemfile导入的gems,但也允许在那里导入新的gems以供使用只有我的机器。此文件将在.gitignore中被忽略。这可能吗? 最佳答案 设置BUNDLE_GEMFILE环境变量:BUNDLE_GEMFI
这似乎非常适得其反,因为太多的gem会在window上破裂。我一直在处理很多mysql和ruby-mysqlgem问题(gem本身发生段错误,一个名为UnixSocket的类显然在Windows机器上不能正常工作,等等)。我只是在浪费时间吗?我应该转向不同的脚本语言吗? 最佳答案 我在Windows上使用Ruby的经验很少,但是当我开始使用Ruby时,我是在Windows上,我的总体印象是它不是Windows原生系统。因此,在主要使用Windows多年之后,开始使用Ruby促使我切换回原来的系统Unix,这次是Linux。Rub
我正在玩HTML5视频并且在ERB中有以下片段:mp4视频从在我的开发环境中运行的服务器很好地流式传输到chrome。然而firefox显示带有海报图像的视频播放器,但带有一个大X。问题似乎是mongrel不确定ogv扩展的mime类型,并且只返回text/plain,如curl所示:$curl-Ihttp://0.0.0.0:3000/pr6.ogvHTTP/1.1200OKConnection:closeDate:Mon,19Apr201012:33:50GMTLast-Modified:Sun,18Apr201012:46:07GMTContent-Type:text/plain
无论您是想搭建桌面端、WEB端或者移动端APP应用,HOOPSPlatform组件都可以为您提供弹性的3D集成架构,同时,由工业领域3D技术专家组成的HOOPS技术团队也能为您提供技术支持服务。如果您的客户期望有一种在多个平台(桌面/WEB/APP,而且某些客户端是“瘦”客户端)快速、方便地将数据接入到3D应用系统的解决方案,并且当访问数据时,在各个平台上的性能和用户体验保持一致,HOOPSPlatform将帮助您完成。利用HOOPSPlatform,您可以开发在任何环境下的3D基础应用架构。HOOPSPlatform可以帮您打造3D创新型产品,HOOPSSDK包含的技术有:快速且准确的CAD
在应用开发中,有时候我们需要获取系统的设备信息,用于数据上报和行为分析。那在鸿蒙系统中,我们应该怎么去获取设备的系统信息呢,比如说获取手机的系统版本号、手机的制造商、手机型号等数据。1、获取方式这里分为两种情况,一种是设备信息的获取,一种是系统信息的获取。1.1、获取设备信息获取设备信息,鸿蒙的SDK包为我们提供了DeviceInfo类,通过该类的一些静态方法,可以获取设备信息,DeviceInfo类的包路径为:ohos.system.DeviceInfo.具体的方法如下:ModifierandTypeMethodDescriptionstatic StringgetAbiList()Obt
@作者:SYFStrive @博客首页:HomePage📜:微信小程序📌:个人社区(欢迎大佬们加入)👉:社区链接🔗📌:觉得文章不错可以点点关注👉:专栏连接🔗💃:感谢支持,学累了可以先看小段由小胖给大家带来的街舞👉微信小程序(🔥)目录自定义组件-behaviors 1、什么是behaviors 2、behaviors的工作方式 3、创建behavior 4、导入并使用behavior 5、behavior中所有可用的节点 6、同名字段的覆盖和组合规则总结最后自定义组件-behaviors 1、什么是behaviorsbehaviors是小程序中,用于实现
了解Rails缓存如何工作的人可以真正帮助我。这是嵌套在Rails::Initializer.runblock中的代码:config.after_initializedoSomeClass.const_set'SOME_CONST','SOME_VAL'end现在,如果我运行script/server并发出请求,一切都很好。然而,在我的Rails应用程序的第二个请求中,一切都因单元化常量错误而变得糟糕。在生产模式下,我可以成功发出第二个请求,这意味着常量仍然存在。我已通过将以上内容更改为以下内容来解决问题:config.after_initializedorequire'some_cl
我有一个使用PDFKit呈现网页的pdf版本的Rails应用程序。我使用Thin作为开发服务器。问题是当我处于开发模式时。当我使用“bundleexecrailss”启动我的服务器并尝试呈现任何PDF时,整个过程会陷入僵局,因为当您呈现PDF时,会向服务器请求一些额外的资源,如图像和css,看起来只有一个线程.如何配置Rails开发服务器以运行多个工作线程?非常感谢。 最佳答案 我找到的最简单的解决方案是unicorn.geminstallunicorn创建一个unicorn.conf:worker_processes3然后使用它: