赞
踩
打开三台虚拟机
再test.java下创建一个测试类
com.atguigu.Test
写入以下内容
package com.atguigu; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.atguigu.utils.MyKafkaUtil; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; public class Test { public static void main(String[] args) throws Exception { //1.获取执行环境,设置并行度,开启CK,设置状态后端(HDFS) StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //为Kafka主题的分区数 env.setParallelism(1); //1.1 设置状态后端 // env.setStateBackend(new FsStateBackend("hdfs://hadoop102:50070/gmall/dwd_log/ck")); // //1.2 开启CK // env.enableCheckpointing(10000L, CheckpointingMode.EXACTLY_ONCE); // env.getCheckpointConfig().setCheckpointTimeout(60000L); //修改用户名 System.setProperty("HADOOP_USER_NAME", "atguigu"); //2.读取Kafka ods_base_log 主题数据 FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource("test", "test"); DataStreamSource<String> kafkaDS = env.addSource(kafkaSource); //3.将每行数据转换为JsonObject SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(new MapFunction<String, JSONObject>() { @Override public JSONObject map(String s) throws Exception { System.out.println(5/0); return JSON.parseObject(s); } }); //4.打印数据 kafkaDS.print("Kafka>>>>>>>"); jsonObjDS.print("JSON>>>>>>>"); //5.执行任务 env.execute(); } }
启动zookeeper,kafka,hadoop集群
启动一个生产者,并输入一个数据
[root@hadoop103 kafka]# bin/kafka-console-producer.sh --broker-list hadoop103:9092 --topic test
>{"id":"1001"}
之后执行程序
得到以下报错
Caused by: java.lang.ArithmeticException: / by zero
将1.1和1.2的注释打开
重新运行程序
DWD层测试
日志数据DWD层测试所需进程
mock—>nginx—>springBoot—>zookeeper—>flinkApp(HDFS)—>Kafka(三个消费者)
启动三个kafka消费者
[root@hadoop103 kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic dwd_display_log
[root@hadoop103 kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic dwd_start_log
[root@hadoop103 kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic dwd_page_log
之后启动mock
之后在其余窗口看到数据出现,则成功。
业务数据的变化,我们可以通过MaxWell采集到,但是MaxWell是把全部数据统一写入一个Topic中, 这些数据包括业务数据,也包含维度数据,这样显然不利于日后的数据处理,所以这个功能是从Kafka的业务数据ODS层读取数据,经过处理后,将维度数据保存到HBase,将事实数据写回Kafka作为业务数据的DWD层。
不再将维度数据放在流中
对MaxWell抓取数据进行ETL,有用的部分保留,没用的过滤掉(如空值)
由于MaxWell是把全部数据统一写入一个Topic中, 这样显然不利于日后的数据处理。所以需要把各个表拆开处理。但是由于每个表有不同的特点,有些表是维度表,有些表是事实表,有的表既是事实表在某种情况下也是维度表。
在实时计算中一般把维度数据写入存储容器,一般是方便通过主键查询的数据库比如HBase,Redis,MySQL等。一般把事实数据写入流中,进行进一步处理,最终形成宽表。但是作为Flink实时计算任务,如何得知哪些表是维度表,哪些是事实表呢?而这些表又应该采集哪些字段呢?
这样的配置不适合写在配置文件中,因为这样的话,业务端随着需求变化每增加一张表,就要修改配置重启计算程序。所以这里需要一种动态配置方案,把这种配置长期保存起来,一旦配置有变化,实时计算可以自动感知。
这种可以有两个方案实现
Ø 一种是用Zookeeper存储,通过Watch感知数据变化。
Ø 另一种是用mysql数据库存储,周期性的同步。
这里选择第二种方案,主要是mysql对于配置数据初始化和维护管理,用sql都比较方便,虽然周期性操作时效性差一点,但是配置变化并不频繁。
所以就有了如下图:
业务数据保存到Kafka的主题中
维度数据保存到HBase的表中
package com.atguigu.app.dwd; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.atguigu.utils.MyKafkaUtil; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; public class DbBassApp { public static void main(String[] args) throws Exception { //1、获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //1.1 设置状态后端 // env.setStateBackend(new FsStateBackend("hdfs://hadoop102:9000/gmall/dwd_log/ck")); // //1.2 开启CK // env.enableCheckpointing(10000L, CheckpointingMode.EXACTLY_ONCE); // env.getCheckpointConfig().setCheckpointTimeout(60000L); //2、读取Kafka数据 FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource("ods_base_db_m", "ods_db_group"); DataStreamSource<String> kafkaDS = env.addSource(kafkaSource); //3、将每行数据转换为JSON对象 SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSON::parseObject); //4、过滤 SingleOutputStreamOperator<JSONObject> filterDS = jsonObjDS.filter(new FilterFunction<JSONObject>() { @Override public boolean filter(JSONObject jsonObject) throws Exception { //获取data字段 String data = jsonObject.getString("data"); return data != null && data.length() > 0;//只要data不为null且长度大于0,就保留数据 } }); //打印测试 filterDS.print(); //5、分流,ProcessFunction //6、去除分流输出将数据写入Kafka活Phoenix //7、执行任务 env.execute(); } }
运行,在虚拟机上启动maxwell和mock(rt_db目录下的)
需要看到明显的变化的话,可以直接进入数据库,添加一行数据
之后控制台会输出
在数据库创建gmall-realtime-200821
CREATE TABLE `table_process` (
`source_table` varchar(200) NOT NULL COMMENT '来源表',
`operate_type` varchar(200) NOT NULL COMMENT '操作类型 insert,update,delete',
`sink_type` varchar(200) DEFAULT NULL COMMENT '输出类型 hbase kafka',
`sink_table` varchar(200) DEFAULT NULL COMMENT '输出表(主题)',
`sink_columns` varchar(2000) DEFAULT NULL COMMENT '输出字段',
`sink_pk` varchar(200) DEFAULT NULL COMMENT '主键字段',
`sink_extend` varchar(200) DEFAULT NULL COMMENT '建表扩展',
PRIMARY KEY (`source_table`,`operate_type`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
在pom文件中添加以下依赖
<!--lomback插件依赖--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.12</version> <scope>provided</scope> </dependency> <!--commons-beanutils是Apache开源组织提供的用于操作JAVA BEAN的工具包。 使用commons-beanutils,我们可以很方便的对bean对象的属性进行操作--> <dependency> <groupId>commons-beanutils</groupId> <artifactId>commons-beanutils</artifactId> <version>1.9.3</version> </dependency> <!--Guava工程包含了若干被Google的Java项目广泛依赖的核心库,方便开发--> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>29.0-jre</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.47</version> </dependency>
在bean包下创建TableProcess 类
package com.atguigu.bean; public class TableProcess { //动态分流Sink常量 public static final String SINK_TYPE_HBASE = "HBASE"; public static final String SINK_TYPE_KAFKA = "KAFKA"; public static final String SINK_TYPE_CK = "CLICKHOUSE"; //来源表 String sourceTable; //操作类型 insert,update,delete String operateType; //输出类型 hbase kafka String sinkType; //输出表(主题) String sinkTable; //输出字段 String sinkColumns; //主键字段 String sinkPk; //建表扩展 String sinkExtend; }
在utils包下创建MySQLUtil类
package com.atguigu.utils; import com.atguigu.bean.TableProcess; import com.google.common.base.CaseFormat; import org.apache.commons.beanutils.BeanUtils; import java.sql.*; import java.util.ArrayList; import java.util.List; /** * ORM * Object Relation Mapping * 对象 关系 映射 */ public class MySQLUtil { public static <T> List<T> queryList(String sql, Class<T> cls, boolean underScoreToCamel) { Connection connection = null; PreparedStatement preparedStatement = null; ResultSet resultSet = null; try { //1.注册驱动 Class.forName("com.mysql.jdbc.Driver"); //2.获取连接 connection = DriverManager.getConnection("jdbc:mysql://hadoop103:3306/gmall-realtime-200821", "root", "123456" ); //3.编译SQL,并给占位符赋值 preparedStatement = connection.prepareStatement(sql); //4.执行查询 resultSet = preparedStatement.executeQuery(); //6.解析查询结果 ArrayList<T> list = new ArrayList<>(); //取出列的元数据 ResultSetMetaData metaData = resultSet.getMetaData(); int columnCount = metaData.getColumnCount(); while (resultSet.next()) { //封装JavaBean并加入集合 T t = cls.newInstance(); for (int i = 1; i <= columnCount; i++) { //获取列名 String columnName = metaData.getColumnName(i); if (underScoreToCamel) { columnName = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, columnName); } Object object = resultSet.getObject(i); //给JavaBean对象赋值 BeanUtils.setProperty(t, columnName, object); } list.add(t); } //返回结果 return list; } catch (Exception e) { e.printStackTrace(); throw new RuntimeException("查询配置信息失败!!!"); } finally { //7.释放资源 if (resultSet != null) { try { resultSet.close(); } catch (SQLException e) { e.printStackTrace(); } } if (preparedStatement != null) { try { preparedStatement.close(); } catch (SQLException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (SQLException e) { e.printStackTrace(); } } } } public static void main(String[] args) { List<TableProcess> tableProcesses = queryList("select * from table_process", TableProcess.class, true); System.out.println(tableProcesses); } }
HBase是一种分布式、可扩展、支持海量数据存储的NoSQL数据库。
逻辑上Hbase的数据模型与关系型数据库很相似,将所有数据存储在一张表中,有行有列,但是从底层物理存储结构**(k-v)**来看,HBase更像是一个multi-dimensional map。
HBase逻辑结构:
创建全局索引时,会在HBase中建立一张新表,也就是说索引数据和数据表是放在不同表中的,因此全局索引适用于多读少写的业务场景。
本地索引适用于少读多写的场景。
当查询语句能够获取索引时就不需要使用原表。
程序流程分析
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-CLVdlNs4-1628993986181)(D:\实时数仓\图片12.png)]
TableProcessFunction是一个自定义算子,主要包括三条时间线任务
Ø 图中紫线,这个时间线与数据流入无关,只要任务启动就会执行。主要的任务方法是open()这个方法在任务启动时就会执行。他的主要工作就是初始化一些连接,开启周期调度。
Ø 图中绿线,这个时间线也与数据流入无关,只要周期调度启动,会自动周期性执行。主要的任务是同步配置表(tableProcessMap)。通过在open()方法中加入timer实现。同时还有个附带任务就是如果发现不存在数据表,要根据配置自动创建数据库表。
Ø 图中黑线,这个时间线就是随着数据的流入持续发生,这部分的任务就是根据同步到内存的tableProcessMap,来为流入的数据进行标识,同时清理掉没用的字段。
引入Phoenix依赖
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-spark</artifactId>
<version>5.0.0-HBase-2.0</version>
<exclusions>
<exclusion>
<groupId>org.glassfish</groupId>
<artifactId>javax.el</artifactId>
</exclusion>
</exclusions>
</dependency>
在common包下创建类GmallConfig作为Phoenix的配置常量类
package com.atguigu.common;
public class GmallConfig {
//Phoenix库名
public static final String HBASE_SCHEMA="GMALL200821_REALTIME";
//Phoenix驱动
public static final String PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver";
//Phoenix连接参数
public static final String PHOENIX_SERVER="jdbc:phoenix:hadoop102,hadoop103,hadoop104:2181";
}
package com.atguigu.app.func; import com.alibaba.fastjson.JSONObject; import com.atguigu.bean.TableProcess; import com.atguigu.common.GmallConfig; import com.atguigu.utils.MySQLUtil; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.*; public class DbSplitProcessFunction extends ProcessFunction<JSONObject, JSONObject> { //定义属性 private OutputTag<JSONObject> outputTag; //定义配置信息的Map private HashMap<String, TableProcess> tableProcessHashMap; //定义Set用于记录当前Phoenix中已经存在的表 private HashSet<String> existTables; //定义Phoenix的连接 private Connection connection =null; @Override public void open(Configuration parameters) throws Exception { //初始化配置信息的Map tableProcessHashMap = new HashMap<>(); //初始化已经存在的表的set existTables = new HashSet<>(); //初始化Phoenix的连接 Class.forName(GmallConfig.PHOENIX_DRIVER); connection = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER); //读取配置信息 refreshMeta(); //开启定时调度任务,周期性执行读取配置信息方法 Timer timer = new Timer(); timer.schedule(new TimerTask() { @Override public void run() { refreshMeta(); } },5000,5000);//开始时等待5s后执行run方法,之后每隔5秒执行一次 } /** * 周期性调度的方法 * 1.去取MySQ中的配置信息 * 2.将查询结果封装为Map,以便于后续每条数据获取 * 3.检查Phoenix中是否存在该表,如不存在,则在Phoenix中创建该表 */ private void refreshMeta() { //1.读取MySQL中的配置信息 List<TableProcess> tableProcesses = MySQLUtil.queryList("select * from table_process" , TableProcess.class, true); //2.将查询结果封装为Map,以便于后续每条数据的获取 for (TableProcess tableProcess : tableProcesses){ //获取源表 String sourceTable = tableProcess.getSourceTable(); //获取操作类型 String operateType = tableProcess.getOperateType(); String key = sourceTable+":"+operateType; tableProcessHashMap.put(key,tableProcess); //3.检查Phoneix中是否存在该表,如果不存在则建表 if (TableProcess.SINK_TYPE_HBASE.equals(tableProcess.getSinkType())){ //校验Phoenix中是否已经存在该表 boolean noExist = existTables.add(tableProcess.getSinkTable());//判断是否存在,可以添加(表不存在)返回true if (noExist){ checkTable(tableProcess.getSinkTable(), tableProcess.getSinkColumns(), tableProcess.getSinkPk(), tableProcess.getSinkExtend()); } } } //校验 if (tableProcessHashMap == null || tableProcessHashMap.size()==0){ throw new RuntimeException("读取Mysq配置信息失败!"); } } /** * Phoneix 建表 * @param sinkTable 表名 test * @param sinkColumns 表名字段 id,name,sex * @param sinkPk 表主键 id * @param sinkExtend 表扩展字段 "" * */ private void checkTable(String sinkTable, String sinkColumns, String sinkPk, String sinkExtend) { //给主键以及扩展字段赋默认值 if (sinkTable == null){ sinkTable = "id"; } if (sinkExtend == null){ sinkExtend=""; } //封装建表SQL StringBuilder createSql = new StringBuilder(" create table if not exist ").append(sinkTable).append(GmallConfig.HBASE_SCHEMA).append(".").append(sinkTable).append("("); //遍历添加字段信息 String[] fields = sinkColumns.split(","); for (int i = 0; i < fields.length; i++) { //取出字段 String field = fields[i]; //判断当前字段是否是主键 if (sinkPk.equals(field)){ createSql.append(field).append(" varchar primary key ");//是主键,那就作为主键 }else { createSql.append(field).append(" varchar "); } //如果当前字段不是最后一个字段,则追加“,” if (i < fields.length - 1) { createSql.append(","); } } createSql.append(")"); createSql.append(sinkExtend); System.out.println(createSql); PreparedStatement preparedStatement = null; //执行SQL try { preparedStatement = connection.prepareStatement(createSql.toString()); preparedStatement.execute(); } catch (SQLException throwables) { throwables.printStackTrace(); throw new RuntimeException("创建Phoenix表"+sinkTable+"失败!"); }finally { if (preparedStatement != null){ try { preparedStatement.close(); } catch (SQLException throwables) { throwables.printStackTrace(); } } } } @Override public void processElement(JSONObject jsonObject, Context context, Collector<JSONObject> collector) throws Exception { collector.collect(jsonObject); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。