当前位置:   article > 正文

Flink实时数仓_flink实时数仓项目

flink实时数仓项目
  1. 相关数据:
  2. 启动日志:
  3. {"common":{"ar":"310000","ba":"Redmi","ch":"xiaomi","is_new":"1","md":"Redmi k30","mid":"mid_7","os":"Android 11.0","uid":"23","vc":"v2.1.111"},"start":{"entry":"icon","loading_time":13312,"open_ad_id":13,"open_ad_ms":9203,"open_ad_skip_ms":8503},"ts":1690869978000}
  4. 页面日志/曝光日志:
  5. {"actions":[{"action_id":"get_coupon","item":"3","item_type":"coupon_id","ts":1690869987153}],"common":{"ar":"310000","ba":"Redmi","ch":"xiaomi","is_new":"1","md":"Redmi k30","mid":"mid_7","os":"Android 11.0","uid":"23","vc":"v2.1.111"},"displays":[{"display_type":"recommend","item":"2","item_type":"sku_id","order":1,"pos_id":1},{"display_type":"promotion","item":"9","item_type":"sku_id","order":2,"pos_id":5},{"display_type":"promotion","item":"6","item_type":"sku_id","order":3,"pos_id":5},{"display_type":"promotion","item":"10","item_type":"sku_id","order":4,"pos_id":5},{"display_type":"query","item":"9","item_type":"sku_id","order":5,"pos_id":4}],"page":{"during_time":18307,"item":"10","item_type":"sku_id","last_page_id":"good_list","page_id":"good_detail","source_type":"activity"},"ts":1690869978000}
  6. {"actions":[{"action_id":"cart_minus_num","item":"2","item_type":"sku_id","ts":1690869984446}],"common":{"ar":"310000","ba":"Redmi","ch":"xiaomi","is_new":"1","md":"Redmi k30","mid":"mid_7","os":"Android 11.0","uid":"23","vc":"v2.1.111"},"page":{"during_time":12892,"last_page_id":"good_detail","page_id":"cart"},"ts":1690869978000}
  7. {"common":{"ar":"310000","ba":"Redmi","ch":"xiaomi","is_new":"1","md":"Redmi k30","mid":"mid_7","os":"Android 11.0","uid":"23","vc":"v2.1.111"},"page":{"during_time":10393,"item":"1,9","item_type":"sku_ids","last_page_id":"cart","page_id":"trade"},"ts":1690869978000}

017-采集模块-日志数据采集之SpringBoot创建项目&加参数测试

019-采集模块-日志数据采集之数据落盘&写入Kafka  本地测试

  1. 启动zookeeper:
  2. bin/zkServer.sh start
  3. 启动kafaka:
  4. 启动kafka:
  5. bin/kafka-server-start.sh config/server9092.properties
  6. 启动消费者"ods_base_log":
  7. bin/kafka-console-consumer.sh --bootstrap-server bigdata-
  8. training01.erongda.com:9092 -from-beginning --topic ods_base_log
  9. 启动GmallLoggerApplication.java
  10. 启动:
  11. java -jar gma112020-mock-1og-2020-12-18.iar
logback.xml 添加"将某一个包下日志单独打印日志"控制台不会打印出启动日志
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <configuration>
  3. <property name="LOG_HOME" value="/opt/modules/gmall-flink/rt_applog/logs"/>
  4. <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
  5. <encoder>
  6. <pattern>%msg%n</pattern>
  7. </encoder>
  8. </appender>
  9. <appender name="rollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
  10. <file>${LOG_HOME}/app.log</file>
  11. <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
  12. <fileNamePattern>${LOG_HOME}/app.%d{yyyy-MM-dd}.log</fileNamePattern>
  13. </rollingPolicy>
  14. <encoder>
  15. <pattern>%msg%n</pattern>
  16. </encoder>
  17. </appender>
  18. <!-- 将某一个包下日志单独打印日志 -->
  19. <logger name="com.atguigu.gmalllogger.controller.LoggerController"
  20. level="INFO" additivity="false">
  21. <appender-ref ref="rollingFile"/>
  22. <appender-ref ref="console"/>
  23. </logger>
  24. <root level="error" additivity="false">
  25. <appender-ref ref="console"/>
  26. </root>
  27. </configuration>

 java -jar gma112020-mock-1og-2020-12-18.iar

 控制台数据有了:

 kafka中ods_base_log数据有了:

020-采集模块-日志数据采集之数据落盘&写入Kafka  单机测试

  1. 启动zookeeper:
  2. bin/zkServer.sh start
  3. 启动kafaka:
  4. 启动kafka:
  5. bin/kafka-server-start.sh config/server9092.properties
  6. 启动消费者"ods_base_log":
  7. bin/kafka-console-consumer.sh --bootstrap-server bigdata-
  8. training01.erongda.com:9092 -from-beginning --topic ods_base_log
  9. 启动:
  10. java -jar gma112020-mock-1og-2020-12-18.iar
  11. java -jar gmall-logger.jar

 

029-采集模块-业务数据采集之FlinkCDC  DataStream方式测试

  1. package com.atguigu;
  2. import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
  3. import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
  4. import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
  5. import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
  6. import org.apache.flink.runtime.state.filesystem.FsStateBackend;
  7. import org.apache.flink.streaming.api.CheckpointingMode;
  8. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  9. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  10. public class FlinkCDCWithCustomerDeserialization {
  11. public static void main(String[] args) throws Exception {
  12. //1.获取执行环境
  13. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  14. env.setParallelism(1);
  15. //2.通过FlinkCDC构建SourceFunction并读取数据
  16. DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
  17. .hostname("127.0.0.1")
  18. .port(3306)
  19. .username("root")
  20. .password("123456")
  21. .databaseList("gmall-210325-flink")
  22. .tableList("gmall-210325-flink.base_trademark") //如果不添加该参数,则消费指定数据库中所有表的数据.如果指定,指定方式为db.table
  23. .deserializer(new StringDebeziumDeserializationSchema())
  24. .startupOptions(StartupOptions.initial())
  25. .build();
  26. DataStreamSource<String> streamSource = env.addSource(sourceFunction);
  27. //3.打印数据
  28. streamSource.print();
  29. //4.启动任务
  30. env.execute("FlinkCDCWithCustomerDeserialization");
  31. }
  32. }

 

 

030-采集模块-FlinkCDC  DataStreamAPI  设置CK&打包&开启集群

032-采集模块-业务数据采集之FlinkCDC  FlinkSQLAPI  测试.mp4

  1. package com.atguigu;
  2. import org.apache.flink.api.java.tuple.Tuple2;
  3. import org.apache.flink.streaming.api.datastream.DataStream;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import org.apache.flink.table.api.Table;
  6. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  7. import org.apache.flink.types.Row;
  8. public class FlinkCDCWithSQL {
  9. public static void main(String[] args) throws Exception {
  10. //1.获取执行环境
  11. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  12. env.setParallelism(1);
  13. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  14. //2.DDL方式建表
  15. tableEnv.executeSql("CREATE TABLE mysql_binlog ( " +
  16. " id STRING NOT NULL, " +
  17. " tm_name STRING, " +
  18. " logo_url STRING " +
  19. ") WITH ( " +
  20. " 'connector' = 'mysql-cdc', " +
  21. " 'hostname' = '127.0.0.1', " +
  22. " 'port' = '3306', " +
  23. " 'username' = 'root', " +
  24. " 'password' = '123456', " +
  25. " 'database-name' = 'gmall-210325-flink', " +
  26. " 'table-name' = 'base_trademark' " +
  27. ")");
  28. //3.查询数据
  29. Table table = tableEnv.sqlQuery("select * from mysql_binlog");
  30. //4.将动态表转换为流
  31. DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(table, Row.class);
  32. retractStream.print();
  33. //5.启动任务
  34. env.execute("FlinkCDCWithSQL");
  35. }
  36. }

  1. 设置checkpoint:
  2. enableCheckpoint:上一次头和下一次头间隔的时间 生产环境为5min
  3. setCheckpointTime:超时时间为10000s 具体看生产环境中状态保存的时间,如果是5秒保存状态就需要设置为10s
  4. setMaxConcurrentCheckpoint:2 最多可以存在几个checkpoint
  5. setMinPauseBetweenCheckpoint:3000s 上一次头和下一次尾的间隔时间
  6. setRestartStrategy:(3,5)如果无法重启最多可以重启3次,每次间隔5s 注意:老版本需要设置,新版本不需要(新版本设置比较合理) 重启策越
  7. 1.10 默认重启int的最大值,所以需要配置(不然一直会重启) 生产环境默认就可以 如果三次都重启失败,任务就失败

修改序列化:

  1. package com.atguigu;
  2. import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
  3. import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
  4. import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
  5. import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
  6. import org.apache.flink.runtime.state.filesystem.FsStateBackend;
  7. import org.apache.flink.streaming.api.CheckpointingMode;
  8. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  9. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  10. public class FlinkCDCWithCustomerDeserialization {
  11. public static void main(String[] args) throws Exception {
  12. //1.获取执行环境
  13. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  14. env.setParallelism(1);
  15. //2.通过FlinkCDC构建SourceFunction并读取数据
  16. DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
  17. .hostname("127.0.0.1")
  18. .port(3306)
  19. .username("root")
  20. .password("123456")
  21. .databaseList("gmall-210325-flink")
  22. .tableList("gmall-210325-flink.base_trademark") //如果不添加该参数,则消费指定数据库中所有表的数据.如果指定,指定方式为db.table
  23. .deserializer(new CustomerDeserialization()) //自定义序列化
  24. //.deserializer(new StringDebeziumDeserializationSchema()) //默认序列化
  25. .startupOptions(StartupOptions.initial())
  26. .build();
  27. DataStreamSource<String> streamSource = env.addSource(sourceFunction);
  28. //3.打印数据
  29. streamSource.print();
  30. //4.启动任务
  31. env.execute("FlinkCDCWithCustomerDeserialization");
  32. }
  33. }

 

  1. package com.atguigu;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
  4. import io.debezium.data.Envelope;
  5. import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
  6. import org.apache.flink.api.common.typeinfo.TypeInformation;
  7. import org.apache.flink.util.Collector;
  8. import org.apache.kafka.connect.data.Field;
  9. import org.apache.kafka.connect.data.Schema;
  10. import org.apache.kafka.connect.data.Struct;
  11. import org.apache.kafka.connect.source.SourceRecord;
  12. import java.util.List;
  13. public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {
  14. /**
  15. * 封装的数据格式
  16. * {
  17. * "database":"",
  18. * "tableName":"",
  19. * "before":{"id":"","tm_name":""....},
  20. * "after":{"id":"","tm_name":""....},
  21. * "type":"c u d",
  22. * //"ts":156456135615
  23. * }
  24. */
  25. @Override
  26. public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
  27. //1.创建JSON对象用于存储最终数据
  28. JSONObject result = new JSONObject();
  29. //2.获取库名&表名
  30. String topic = sourceRecord.topic();
  31. String[] fields = topic.split("\\.");
  32. String database = fields[1];
  33. String tableName = fields[2];
  34. Struct value = (Struct) sourceRecord.value();
  35. //3.获取"before"数据
  36. Struct before = value.getStruct("before");
  37. JSONObject beforeJson = new JSONObject();
  38. if (before != null) {
  39. Schema beforeSchema = before.schema();
  40. List<Field> beforeFields = beforeSchema.fields();
  41. for (Field field : beforeFields) {
  42. Object beforeValue = before.get(field);
  43. beforeJson.put(field.name(), beforeValue);
  44. }
  45. }
  46. //4.获取"after"数据
  47. Struct after = value.getStruct("after");
  48. JSONObject afterJson = new JSONObject();
  49. if (after != null) {
  50. Schema afterSchema = after.schema();
  51. List<Field> afterFields = afterSchema.fields();
  52. for (Field field : afterFields) {
  53. Object afterValue = after.get(field);
  54. afterJson.put(field.name(), afterValue);
  55. }
  56. }
  57. //5.获取操作类型 CREATE UPDATE DELETE
  58. Envelope.Operation operation = Envelope.operationFor(sourceRecord);
  59. String type = operation.toString().toLowerCase();
  60. if ("create".equals(type)) {
  61. type = "insert";
  62. }
  63. //6.将字段写入JSON对象
  64. result.put("database", database);
  65. result.put("tableName", tableName);
  66. result.put("before", beforeJson);
  67. result.put("after", afterJson);
  68. result.put("type", type);
  69. //7.输出数据
  70. collector.collect(result.toJSONString());
  71. }
  72. @Override
  73. public TypeInformation<String> getProducedType() {
  74. return BasicTypeInfo.STRING_TYPE_INFO;
  75. }
  76. }
  1. 比较FlinkCDC:
  2. DataStream:
  3. 优点:多库多表
  4. 缺点:需要自定义反序列化器(灵活)
  5. FlinkSQL:
  6. 优点:不需要自定义反序列化器
  7. 缺点:单表查询(Flinkcdc可以通过参数传给bean)

035--采集模块-业务数据采集之FlinkCDC  DataStreamAPI  自定义反序列化器  代码测试

  1. 序列化前:
  2. SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1693292078, file=mysql-bin.000071, pos=528, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.gmall-210325-flink.base_trademark', kafkaPartition=null, key=Struct{id=12}, keySchema=Schema{mysql_binlog_source.gmall_210325_flink.base_trademark.Key:STRUCT}, value=Struct{after=Struct{id=12,tm_name=test,logo_url=test},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1693292078000,db=gmall-210325-flink,table=base_trademark,server_id=1,file=mysql-bin.000071,pos=691,row=0,thread=6},op=c,ts_ms=1693292078631}, valueSchema=Schema{mysql_binlog_source.gmall_210325_flink.base_trademark.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
  3. SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1693292093, file=mysql-bin.000071, pos=843, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.gmall-210325-flink.base_trademark', kafkaPartition=null, key=Struct{id=12}, keySchema=Schema{mysql_binlog_source.gmall_210325_flink.base_trademark.Key:STRUCT}, value=Struct{before=Struct{id=12,tm_name=test,logo_url=test},after=Struct{id=12,tm_name=test,logo_url=tes},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1693292093000,db=gmall-210325-flink,table=base_trademark,server_id=1,file=mysql-bin.000071,pos=1006,row=0,thread=6},op=u,ts_ms=1693292093136}, valueSchema=Schema{mysql_binlog_source.gmall_210325_flink.base_trademark.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
  4. SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1693292104, file=mysql-bin.000071, pos=1179, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.gmall-210325-flink.base_trademark', kafkaPartition=null, key=Struct{id=12}, keySchema=Schema{mysql_binlog_source.gmall_210325_flink.base_trademark.Key:STRUCT}, value=Struct{before=Struct{id=12,tm_name=test,logo_url=tes},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1693292104000,db=gmall-210325-flink,table=base_trademark,server_id=1,file=mysql-bin.000071,pos=1342,row=0,thread=6},op=d,ts_ms=1693292104741}, valueSchema=Schema{mysql_binlog_source.gmall_210325_flink.base_trademark.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
  5. 序列化后:
  6. {"database":"gmall-210325-flink","before":{},"after":{"tm_name":"test","logo_url":"test","id":12},"type":"insert","tableName":"base_trademark"}
  7. {"database":"gmall-210325-flink","before":{"tm_name":"test","logo_url":"test","id":12},"after":{"tm_name":"test","logo_url":"tes","id":12},"type":"update","tableName":"base_trademark"}
  8. {"database":"gmall-210325-flink","before":{"tm_name":"test","logo_url":"tes","id":12},"after":{},"type":"delete","tableName":"base_trademark"}
  1. package com.atguigu;
  2. import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
  3. import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
  4. import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
  5. import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
  6. import org.apache.flink.runtime.state.filesystem.FsStateBackend;
  7. import org.apache.flink.streaming.api.CheckpointingMode;
  8. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  9. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  10. public class FlinkCDCWithCustomerDeserialization {
  11. public static void main(String[] args) throws Exception {
  12. //1.获取执行环境
  13. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  14. env.setParallelism(1);
  15. //2.通过FlinkCDC构建SourceFunction并读取数据
  16. DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
  17. .hostname("127.0.0.1")
  18. .port(3306)
  19. .username("root")
  20. .password("123456")
  21. .databaseList("gmall-210325-flink")
  22. .tableList("gmall-210325-flink.base_trademark") //如果不添加该参数,则消费指定数据库中所有表的数据.如果指定,指定方式为db.table
  23. .deserializer(new CustomerDeserialization()) //自定义序列化
  24. //.deserializer(new StringDebeziumDeserializationSchema()) //默认序列化
  25. .startupOptions(StartupOptions.initial())
  26. .build();
  27. DataStreamSource<String> streamSource = env.addSource(sourceFunction);
  28. //3.打印数据
  29. streamSource.print();
  30. //4.启动任务
  31. env.execute("FlinkCDCWithCustomerDeserialization");
  32. }
  33. }
  1. package com.atguigu;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
  4. import io.debezium.data.Envelope;
  5. import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
  6. import org.apache.flink.api.common.typeinfo.TypeInformation;
  7. import org.apache.flink.util.Collector;
  8. import org.apache.kafka.connect.data.Field;
  9. import org.apache.kafka.connect.data.Schema;
  10. import org.apache.kafka.connect.data.Struct;
  11. import org.apache.kafka.connect.source.SourceRecord;
  12. import java.util.List;
  13. public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {
  14. /**
  15. * 封装的数据格式
  16. * {
  17. * "database":"",
  18. * "tableName":"",
  19. * "before":{"id":"","tm_name":""....},
  20. * "after":{"id":"","tm_name":""....},
  21. * "type":"c u d",
  22. * //"ts":156456135615
  23. * }
  24. */
  25. @Override
  26. public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
  27. //1.创建JSON对象用于存储最终数据
  28. JSONObject result = new JSONObject();
  29. //2.获取库名&表名
  30. String topic = sourceRecord.topic();
  31. String[] fields = topic.split("\\.");
  32. String database = fields[1];
  33. String tableName = fields[2];
  34. Struct value = (Struct) sourceRecord.value();
  35. //3.获取"before"数据
  36. Struct before = value.getStruct("before");
  37. JSONObject beforeJson = new JSONObject();
  38. if (before != null) {
  39. Schema beforeSchema = before.schema();
  40. List<Field> beforeFields = beforeSchema.fields();
  41. for (Field field : beforeFields) {
  42. Object beforeValue = before.get(field);
  43. beforeJson.put(field.name(), beforeValue);
  44. }
  45. }
  46. //4.获取"after"数据
  47. Struct after = value.getStruct("after");
  48. JSONObject afterJson = new JSONObject();
  49. if (after != null) {
  50. Schema afterSchema = after.schema();
  51. List<Field> afterFields = afterSchema.fields();
  52. for (Field field : afterFields) {
  53. Object afterValue = after.get(field);
  54. afterJson.put(field.name(), afterValue);
  55. }
  56. }
  57. //5.获取操作类型 CREATE UPDATE DELETE
  58. Envelope.Operation operation = Envelope.operationFor(sourceRecord);
  59. String type = operation.toString().toLowerCase();
  60. if ("create".equals(type)) {
  61. type = "insert";
  62. }
  63. //6.将字段写入JSON对象
  64. result.put("database", database);
  65. result.put("tableName", tableName);
  66. result.put("before", beforeJson);
  67. result.put("after", afterJson);
  68. result.put("type", type);
  69. //7.输出数据
  70. collector.collect(result.toJSONString());
  71. }
  72. @Override
  73. public TypeInformation<String> getProducedType() {
  74. return BasicTypeInfo.STRING_TYPE_INFO;
  75. }
  76. }

041--采集模块-业务数据采集之读取MySQL数据并写入Kafka  测试

启动zookeeper:

  1. bin/zkServer.sh start
  2. 查看状态:
  3. bin/zkServer.sh status

 启动kafka:

  1. 创建消费者:
  2. bin/kafka-topics.sh --create --zookeeper bigdata-training01.erongda.com:2181/kafka --replication-factor 2 --partitions 3 --topic ods_base_db
  3. 启动消费者:
  4. bin/kafka-console-consumer.sh --bootstrap-server bigdata-training01.erongda.com:9092 -from-beginning --topic ods_base_db
  1. package com.atguigu.app.ods;
  2. import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
  3. import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
  4. import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
  5. import com.atguigu.app.function.CustomerDeserialization;
  6. import com.atguigu.utils.MyKafkaUtil;
  7. import org.apache.flink.runtime.state.filesystem.FsStateBackend;
  8. import org.apache.flink.streaming.api.CheckpointingMode;
  9. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  10. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  11. public class FlinkCDC {
  12. public static void main(String[] args) throws Exception {
  13. //1.获取执行环境
  14. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  15. env.setParallelism(1);
  16. //1.1 设置CK&状态后端
  17. //env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/gmall-flink-210325/ck"));
  18. //env.enableCheckpointing(5000L);
  19. //env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  20. //env.getCheckpointConfig().setCheckpointTimeout(10000L);
  21. //env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
  22. //env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);
  23. //env.setRestartStrategy(RestartStrategies.fixedDelayRestart());
  24. //2.通过FlinkCDC构建SourceFunction并读取数据
  25. DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
  26. .hostname("127.0.0.1")
  27. .port(3306)
  28. .username("root")
  29. .password("123456")
  30. .databaseList("gmall-210325-flink")
  31. .deserializer(new CustomerDeserialization())
  32. .startupOptions(StartupOptions.latest())
  33. .build();
  34. DataStreamSource<String> streamSource = env.addSource(sourceFunction);
  35. //3.打印数据并将数据写入Kafka
  36. streamSource.print();
  37. String sinkTopic = "ods_base_db";
  38. streamSource.addSink(MyKafkaUtil.getKafkaProducer(sinkTopic));
  39. //4.启动任务
  40. env.execute("FlinkCDC");
  41. }
  42. }

 

 045-DWD&DIM-行为数据  将数据转换为JSON对象

  1. public class FlinkCDCWithCustomerDeserialization {
  2. public static void main(String[] args) throws Exception {
  3. //1.获取执行环境
  4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5. env.setParallelism(1);
  6. //2.通过FlinkCDC构建SourceFunction并读取数据
  7. DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
  8. .hostname("127.0.0.1")
  9. .port(3306)
  10. .username("root")
  11. .password("123456")
  12. .databaseList("gmall-210325-flink")
  13. .tableList("gmall-210325-flink.base_trademark") //如果不添加该参数,则消费指定数据库中所有表的数据.如果指定,指定方式为db.table
  14. .deserializer(new CustomerDeserialization()) //自定义序列化
  15. //.deserializer(new StringDebeziumDeserializationSchema()) //默认序列化
  16. .startupOptions(StartupOptions.initial())
  17. .build();
  18. DataStreamSource<String> streamSource = env.addSource(sourceFunction);
  19. //3.打印数据
  20. streamSource.print();
  21. //4.启动任务
  22. env.execute("FlinkCDCWithCustomerDeserialization");
  23. }
  24. }

048-DWD&DIM-行为数据  

  1. 启动kafka:
  2. bin/kafka-server-start.sh config/server9092.properties
  3. 启动消费者:
  4. bin/kafka-console-consumer.sh --bootstrap-server bigdata-training01.erongda.com:9092 -from-beginning --topic dwd_start_log
  5. bin/kafka-console-consumer.sh --bootstrap-server bigdata-training01.erongda.com:9092 -from-beginning --topic dwd_page_log
  6. bin/kafka-console-consumer.sh --bootstrap-server bigdata-training01.erongda.com:9092 -from-beginning --topic dwd_display_log
  7. 启动生产者:
  8. bin/kafka-console-producer.sh --broker-list bigdata-training01.erongda.com:9092 --topic ods_base_log
  9. 相关数据:
  10. 启动日志:
  11. {"common":{"ar":"310000","ba":"Redmi","ch":"xiaomi","is_new":"1","md":"Redmi k30","mid":"mid_7","os":"Android 11.0","uid":"23","vc":"v2.1.111"},"start":{"entry":"icon","loading_time":13312,"open_ad_id":13,"open_ad_ms":9203,"open_ad_skip_ms":8503},"ts":1690869978000}
  12. 页面日志/曝光日志:
  13. {"actions":[{"action_id":"get_coupon","item":"3","item_type":"coupon_id","ts":1690869987153}],"common":{"ar":"310000","ba":"Redmi","ch":"xiaomi","is_new":"1","md":"Redmi k30","mid":"mid_7","os":"Android 11.0","uid":"23","vc":"v2.1.111"},"displays":[{"display_type":"recommend","item":"2","item_type":"sku_id","order":1,"pos_id":1},{"display_type":"promotion","item":"9","item_type":"sku_id","order":2,"pos_id":5},{"display_type":"promotion","item":"6","item_type":"sku_id","order":3,"pos_id":5},{"display_type":"promotion","item":"10","item_type":"sku_id","order":4,"pos_id":5},{"display_type":"query","item":"9","item_type":"sku_id","order":5,"pos_id":4}],"page":{"during_time":18307,"item":"10","item_type":"sku_id","last_page_id":"good_list","page_id":"good_detail","source_type":"activity"},"ts":1690869978000}
  14. {"actions":[{"action_id":"cart_minus_num","item":"2","item_type":"sku_id","ts":1690869984446}],"common":{"ar":"310000","ba":"Redmi","ch":"xiaomi","is_new":"1","md":"Redmi k30","mid":"mid_7","os":"Android 11.0","uid":"23","vc":"v2.1.111"},"page":{"during_time":12892,"last_page_id":"good_detail","page_id":"cart"},"ts":1690869978000}
  15. {"common":{"ar":"310000","ba":"Redmi","ch":"xiaomi","is_new":"1","md":"Redmi k30","mid":"mid_7","os":"Android 11.0","uid":"23","vc":"v2.1.111"},"page":{"during_time":10393,"item":"1,9","item_type":"sku_ids","last_page_id":"cart","page_id":"trade"},"ts":1690869978000}
  1. //数据流:web/app -> Nginx -> SpringBoot -> Kafka(ods) -> FlinkApp -> Kafka(dwd)
  2. //程 序:mockLog -> Nginx -> Logger.sh -> Kafka(ZK) -> BaseLogApp -> kafka
  3. public class BaseLogApp {
  4. public static void main(String[] args) throws Exception {
  5. //TODO 1.获取执行环境
  6. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  7. env.setParallelism(1);
  8. //1.1 设置CK&状态后端
  9. //env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/gmall-flink-210325/ck"));
  10. //env.enableCheckpointing(5000L);
  11. //env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  12. //env.getCheckpointConfig().setCheckpointTimeout(10000L);
  13. //env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
  14. //env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);
  15. //env.setRestartStrategy(RestartStrategies.fixedDelayRestart());
  16. //TODO 2.消费 ods_base_log 主题数据创建流
  17. String sourceTopic = "ods_base_log";
  18. String groupId = "base_log_app_210325";
  19. DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getKafkaConsumer(sourceTopic, groupId));
  20. //TODO 3.将每行数据转换为JSON对象
  21. OutputTag<String> outputTag = new OutputTag<String>("Dirty") {
  22. };
  23. SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.process(new ProcessFunction<String, JSONObject>() {
  24. @Override
  25. public void processElement(String value, Context ctx, Collector<JSONObject> out) throws Exception {
  26. try {
  27. JSONObject jsonObject = JSON.parseObject(value);
  28. out.collect(jsonObject);
  29. } catch (Exception e) {
  30. //发生异常,将数据写入侧输出流
  31. ctx.output(outputTag, value);
  32. }
  33. }
  34. });
  35. //打印脏数据
  36. jsonObjDS.getSideOutput(outputTag).print("Dirty>>>>>>>>>>>");
  37. //TODO 4.新老用户校验 状态编程
  38. SingleOutputStreamOperator<JSONObject> jsonObjWithNewFlagDS = jsonObjDS.keyBy(jsonObj -> jsonObj.getJSONObject("common").getString("mid"))
  39. .map(new RichMapFunction<JSONObject, JSONObject>() {
  40. private ValueState<String> valueState;
  41. @Override
  42. public void open(Configuration parameters) throws Exception {
  43. valueState = getRuntimeContext().getState(new ValueStateDescriptor<String>("value-state", String.class));
  44. }
  45. @Override
  46. public JSONObject map(JSONObject value) throws Exception {
  47. //获取数据中的"is_new"标记
  48. String isNew = value.getJSONObject("common").getString("is_new");
  49. //判断isNew标记是否为"1"
  50. if ("1".equals(isNew)) {
  51. //获取状态数据
  52. String state = valueState.value();
  53. if (state != null) {
  54. //修改isNew标记
  55. value.getJSONObject("common").put("is_new", "0");
  56. } else {
  57. valueState.update("1");
  58. }
  59. }
  60. return value;
  61. }
  62. });
  63. //TODO 5.分流 侧输出流 页面:主流 启动:侧输出流 曝光:侧输出流
  64. OutputTag<String> startTag = new OutputTag<String>("start") {
  65. };
  66. OutputTag<String> displayTag = new OutputTag<String>("display") {
  67. };
  68. SingleOutputStreamOperator<String> pageDS = jsonObjWithNewFlagDS.process(new ProcessFunction<JSONObject, String>() {
  69. @Override
  70. public void processElement(JSONObject value, Context ctx, Collector<String> out) throws Exception {
  71. //获取启动日志字段
  72. String start = value.getString("start");
  73. if (start != null && start.length() > 0) {
  74. //将数据写入启动日志侧输出流
  75. ctx.output(startTag, value.toJSONString());
  76. } else {
  77. //将数据写入页面日志主流
  78. out.collect(value.toJSONString());
  79. //取出数据中的曝光数据
  80. JSONArray displays = value.getJSONArray("displays");
  81. if (displays != null && displays.size() > 0) {
  82. //获取页面ID
  83. String pageId = value.getJSONObject("page").getString("page_id");
  84. for (int i = 0; i < displays.size(); i++) {
  85. JSONObject display = displays.getJSONObject(i);
  86. //添加页面id
  87. display.put("page_id", pageId);
  88. //将输出写出到曝光侧输出流
  89. ctx.output(displayTag, display.toJSONString());
  90. }
  91. }
  92. }
  93. }
  94. });
  95. //TODO 6.提取侧输出流
  96. DataStream<String> startDS = pageDS.getSideOutput(startTag);
  97. DataStream<String> displayDS = pageDS.getSideOutput(displayTag);
  98. //TODO 7.将三个流进行打印并输出到对应的Kafka主题中
  99. startDS.print("Start>>>>>>>>>>>");
  100. pageDS.print("Page>>>>>>>>>>>");
  101. displayDS.print("Display>>>>>>>>>>>>");
  102. startDS.addSink(MyKafkaUtil.getKafkaProducer("dwd_start_log"));
  103. pageDS.addSink(MyKafkaUtil.getKafkaProducer("dwd_page_log"));
  104. displayDS.addSink(MyKafkaUtil.getKafkaProducer("dwd_display_log"));
  105. //TODO 8.启动任务
  106. env.execute("BaseLogApp");
  107. }
  108. }

 

 

054- DWD&DIM-业务数据只代码编写 开启配置表Binlog并测试

开启Binlog:

  1. 修改my.conf文件:/etc
  2. ##监控多个库
  3. [mysqld]
  4. # log_bin
  5. log-bin = mysql-bin
  6. binlog-format = ROW
  7. server_id = 1
  8. binlog-do-db=gmall-210325-flink
  9. binlog-do-db=gmall-210325-realtime

063-DWD&DIM-业务数据之代码编写测试

067-DWD&DIM-业务数据之整体测试  测试完成

  1. 启动zookeeper:
  2. bin/zkServer.sh start
  3. 启动hdfs:
  4. sbin/hadoop-daemon.sh start namenode
  5. sbin/hadoop-daemon.sh start datanode
  6. 启动hbase:
  7. bin/hbase-daemon.sh start master
  8. bin/hbase-daemon.sh start regionserver
  9. 启动kafaka:
  10. 启动kafka:
  11. bin/kafka-server-start.sh config/server9092.properties
  12. 启动消费者"ods_base_db":
  13. bin/kafka-console-consumer.sh --bootstrap-server bigdata-
  14. training01.erongda.com:9092 -from-beginning --topic ods_base_db
  15. 启动程序:
  16. BaseDBApp.java
  17. ods/FlinkCDC.java
  18. 数据流:web/app -> nginx -> SpringBoot -> Mysql -> FlinkApp -> Kafka(ods) -> FlinkApp -> Kafka(dwd)/Phoenix(dim)
  19. 程 序: mockDb -> Mysql -> FlinkCDC -> Kafka(ZK) -> BaseDBApp -> Kafka/Phoenix(hbase,zk,hdfs)
注意:为了开启 hbase 的 namespace 和 phoenix 的 schema 的映射,在程序中需要加这
个配置文件,另外在 linux 服务上,也需要在 hbase 以及 phoenix 的 hbase-site.xml 配置

文件中,加上以上两个配置,并使用 xsync 进行同步。

  1. <property>
  2. <name>phoenix.schema.isNamespaceMappingEnabled</name>
  3. <value>true</value>
  4. </property>
  5. <property>
  6. <name>phoenix.schema.mapSystemTablesToNamespace</name>
  7. <value>true</value>
  8. </property>

 

 启动phoenix后新建schema:

create  schema  GMALL210_REALTIME;

测试:

1.phoenix数据表启动时候或者修改后是否新建

    1.1 启动BaseDBApp新建表: 

 

    1.2  新增、修改、删除gmall-210325-flink.table_process也会去创建表

2.type为hbase数据是否正确输出

  

3.type为kafka的数据是否正确输出

bin/kafka-console-consumer.sh --bootstrap-server  bigdata-training01.erongda.com:9092 -from-beginning --topic dwd_order_info 

078-DWM层-访客UV  代码测试

  1. 启动zookeeper:
  2. bin/zkServer.sh start
  3. 启动kafaka:
  4. 启动kafka:
  5. bin/kafka-server-start.sh config/server9092.properties
  6. 启动生产者:
  7. bin/kafka-console-producer.sh --broker-list bigdata-training01.erongda.com:9092
  8. --topic dwd_page_log
  9. 启动消费者:
  10. bin/kafka-console-consumer.sh --bootstrap-server bigdata-
  11. training01.erongda.com:9092 -from-beginning --topic dwm_unique_visit
  12. 启动:
  13. java -jar gma112020-mock-1og-2020-12-18.iar
  14. java -jar gmall-logger.jar
  15. 启动:
  16. BaseLogApp.java
  17. UniqueVisitApp.java
  18. 测试数据:
  19. 去除last_page_id:
  20. {"common":{"ar":"310000","ba":"Redmi","ch":"xiaomi","is_new":"1","md":"Redmi k30","mid":"mid7","os":"Android 11.0","uid":"23","vc":"v2.1.111"},"page":{"during_time":10393,"item":"1,9","item_type":"sku_ids","page_id":"trade"},"ts":1690869978000}
  21. 换一个mid:
  22. {"common":{"ar":"310000","ba":"Redmi","ch":"xiaomi","is_new":"1","md":"Redmi k30","mid":"mid_8","os":"Android 11.0","uid":"23","vc":"v2.1.111"},"page":{"during_time":10393,"item":"1,9","item_type":"sku_ids","page_id":"trade"},"ts":1690869978000}
  23. //数据流:web/app -> Nginx -> SpringBoot -> Kafka(ods) -> FlinkApp -> Kafka(dwd)
  24. //程 序:mockLog -> Nginx -> Logger.sh -> Kafka(ZK) -> BaseLogApp -> kafka

 

自测:

 造行为数据测试:   java -jar gma112020-mock-1og-2020-12-18.iar

081-DWM层-跳出明细  代码测试

  1. 启动zookeeper:
  2. bin/zkServer.sh start
  3. 启动kafaka:
  4. 启动kafka:
  5. bin/kafka-server-start.sh config/server9092.properties
  6. 启动生产者:
  7. bin/kafka-console-producer.sh --broker-list bigdata-training01.erongda.com:9092
  8. --topic dwd_page_log
  9. 启动消费者:
  10. bin/kafka-console-consumer.sh --bootstrap-server bigdata-training01.erongda.com:9092 -from-beginning --topic dwm_user_jump_detail
  11. 启动:
  12. UserJumpDetailApp.java
  13. 测试数据:
  14. 去除last_page_id:
  15. {"common":{"ar":"310000","ba":"Redmi","ch":"xiaomi","is_new":"1","md":"Redmi k30","mid":"mid_8","os":"Android 11.0","uid":"23","vc":"v2.1.111"},"page":{"during_time":10393,"item":"1,9","item_type":"sku_ids","page_id":"trade"},"ts":1690889970000}
  16. {"common":{"ar":"310000","ba":"Redmi","ch":"xiaomi","is_new":"1","md":"Redmi k30","mid":"mid_8","os":"Android 11.0","uid":"23","vc":"v2.1.111"},"page":{"during_time":10393,"item":"1,9","item_type":"sku_ids","page_id":"trade"},"ts":1690889975000}
  17. {"common":{"ar":"310000","ba":"Redmi","ch":"xiaomi","is_new":"1","md":"Redmi k30","mid":"mid_8","os":"Android 11.0","uid":"23","vc":"v2.1.111"},"page":{"during_time":10393,"item":"1,9","item_type":"sku_ids","page_id":"trade"},"ts":1690889979000}
  18. {"common":{"ar":"310000","ba":"Redmi","ch":"xiaomi","is_new":"1","md":"Redmi k30","mid":"mid_8","os":"Android 11.0","uid":"23","vc":"v2.1.111"},"page":{"during_time":10393,"item":"1,9","item_type":"sku_ids","page_id":"trade"},"ts":1690889992000}
  19. //数据流:web/app -> Nginx -> SpringBoot -> Kafka(ods) -> FlinkApp -> Kafka(dwd) -> FlinkApp -> Kafka(dwm)
  20. //程 序:mockLog -> Nginx -> Logger.sh -> Kafka(ZK) -> BaseLogApp -> kafka -> UserJumpDetailApp -> Kafka

 

082-DWM层-跳出明细  测试

  1. 启动zookeeper:
  2. bin/zkServer.sh start
  3. 启动kafaka:
  4. 启动kafka:
  5. bin/kafka-server-start.sh config/server9092.properties
  6. 启动消费者:
  7. bin/kafka-console-consumer.sh --bootstrap-server bigdata-training01.erongda.com:9092 -from-beginning --topic dwm_user_jump_detail
  8. 启动:
  9. java -jar gma112020-mock-1og-2020-12-18.iar
  10. java -jar gmall-logger.jar
  11. 启动:
  12. BaseLogApp.java
  13. UserJumpDetailApp.java

091-DWM层-订单宽表  代码测试  测试完成

  1. 启动zookeeper:
  2. bin/zkServer.sh start
  3. 启动hdfs:
  4. sbin/hadoop-daemon.sh start namenode
  5. sbin/hadoop-daemon.sh start datanode
  6. 启动hbase:
  7. bin/hbase-daemon.sh start master
  8. bin/hbase-daemon.sh start regionserver
  9. 启动kafaka:
  10. 启动kafka:
  11. bin/kafka-server-start.sh config/server9092.properties
  12. 启动消费者"ods_base_db":
  13. bin/kafka-console-consumer.sh --bootstrap-server bigdata-
  14. training01.erongda.com:9092 -from-beginning --topic ods_base_db
  15. bin/kafka-console-consumer.sh --bootstrap-server bigdata-
  16. training01.erongda.com:9092 -from-beginning --topic dwd_order_detail
  17. bin/kafka-console-consumer.sh --bootstrap-server bigdata-
  18. training01.erongda.com:9092 -from-beginning --topic dwd_order_info
  19. bin/kafka-console-consumer.sh --bootstrap-server bigdata-
  20. training01.erongda.com:9092 -from-beginning --topic dwm_order_wide
  21. 启动程序:
  22. BaseDBApp.java
  23. ods/FlinkCDC.java
  24. OrderWideApp.java
  25. 注意:mock.clear=1 每次都会置空数据库,便于测试
  26. 数据流:web/app -> nginx -> SpringBoot -> Mysql -> FlinkApp -> Kafka(ods) -> FlinkApp -> Kafka/Phoenix(dwd-dim) -> FlinkApp(redis) -> Kafka(dwm)
  27. 程 序: MockDb -> Mysql -> FlinkCDC -> Kafka(ZK) -> BaseDbApp -> Kafka/Phoenix(zk/hdfs/hbase) -> OrderWideApp(Redis) -> Kafka

测试:

1.手动创建订单数据

2.测试数据有无丢失

3.相关数据的实例

BaseDBApp:

Kafka>>>>>>>>:2> {"sinkTable":"dwd_order_detail","database":"gmall-210325-flink","before":{},"after":{"sku_num":"2","create_time":"2023-06-12 16:33:42","sku_id":20,"order_price":2899.00,"source_type":"2401","sku_name":"小米电视E65X 65英寸 全面屏 4K超高清HDR 蓝牙遥控内置小爱 2+8GB AI人工智能液晶网络平板电视 L65M5-EA","id":79949,"order_id":26689,"split_total_amount":5798.00},"type":"insert","tableName":"order_detail"}

FlinkCDC:

{"database":"gmall-210325-flink","before":{},"after":{"sku_num":"2","create_time":"2023-06-12 16:33:42","sku_id":20,"order_price":2899.00,"source_type":"2401","img_url":

"http://47.93.148.192:8080/group1/M00/00/02/rBHu8l-0kIGAWtMyAAGxs6Q350k510.jpg","sku_name":"小米电视E65X 65英寸 全面屏 4K超高清HDR 蓝牙遥控内置小爱 2+8GB AI人工智能液晶网络平板电视 L65M5-EA","id":79949,"order_id":26689,"split_total_amount":5798.00},"type":"insert","tableName":"order_detail"}

OrderWideApp:
orderWideWithNoDimDS>>>>>>>>>> OrderWide(detail_id=79949, order_id=26689, sku_id=20, order_price=2899.00, sku_num=2, sku_name=小米电视E65X 65英寸 全面屏 4K超高清HDR 蓝牙遥控内置小爱 2+8GB AI人工智能液晶网络平板电视 L65M5-EA, province_id=19, order_status=1001, user_id=316, total_amount=10490.00, activity_reduce_amount=0.00, coupon_reduce_amount=0.00, original_total_amount=10484.00, feight_fee=6.00, split_feight_fee=null, split_activity_amount=null, split_coupon_amount=null, split_total_amount=5798.00, expire_time=null, create_time=2023-06-12 16:33:42, operate_time=null, create_date=2023-06-12, create_hour=16, province_name=null, province_area_code=null, province_iso_code=null, province_3166_2_code=null, user_age=null, user_gender=null, spu_id=null, tm_id=null, category3_id=null, spu_name=null, tm_name=null, category3_name=null)

orderWideWithCategory3DS>>>>>>>>>>>> OrderWide(detail_id=79949, order_id=26689, sku_id=20, order_price=2899.00, sku_num=2, sku_name=小米电视E65X 65英寸 全面屏 4K超高清HDR 蓝牙遥控内置小爱 2+8GB AI人工智能液晶网络平板电视 L65M5-EA, province_id=19, order_status=1001, user_id=316, total_amount=10490.00, activity_reduce_amount=0.00, coupon_reduce_amount=0.00, original_total_amount=10484.00, feight_fee=6.00, split_feight_fee=null, split_activity_amount=null, split_coupon_amount=null, split_total_amount=5798.00, expire_time=null, create_time=2023-06-12 16:33:42, operate_time=null, create_date=2023-06-12, create_hour=16, province_name=甘肃, province_area_code=620000, province_iso_code=CN-62, province_3166_2_code=CN-GS, user_age=50, user_gender=F, spu_id=6, tm_id=5, category3_id=86, spu_name=小米电视 内置小爱 智能网络液晶平板教育电视, tm_name=小米, category3_name=平板电视)

095-DWM层-订单宽表  关联维度 JDBCUtil 测试

  1. 启动JdbcUtil
  2. public static void main(String[] args) throws Exception {
  3. Class.forName(GmallConfig.PHOENIX_DRIVER);
  4. Connection connection =
  5. DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
  6. List<JSONObject> queryList = queryList(connection,
  7. "select * from GMALL210325_REALTIME.DIM_USER_INFO",
  8. JSONObject.class,
  9. true);
  10. for (JSONObject jsonObject : queryList) {
  11. System.out.println(jsonObject);
  12. }
  13. connection.close();
  14. }
测试mysql、clickhouse和日志数据是否一致:

 100--DWM层-订单宽表  关联维度  优化1旁路缓存  代码测试

  1. package com.atguigu.utils;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.atguigu.common.GmallConfig;
  4. import redis.clients.jedis.Jedis;
  5. import java.sql.Connection;
  6. import java.sql.DriverManager;
  7. import java.util.List;
  8. public class DimUtil {
  9. public static JSONObject getDimInfo(Connection connection, String tableName, String id) throws Exception {
  10. //查询Phoenix之前先查询Redis
  11. Jedis jedis = RedisUtil.getJedis();
  12. //DIM:DIM_USER_INFO:143
  13. String redisKey = "DIM:" + tableName + ":" + id;
  14. String dimInfoJsonStr = jedis.get(redisKey);
  15. if (dimInfoJsonStr != null) {
  16. //重置过期时间
  17. jedis.expire(redisKey, 24 * 60 * 60);
  18. //归还连接
  19. jedis.close();
  20. //返回结果
  21. return JSONObject.parseObject(dimInfoJsonStr);
  22. }
  23. //拼接查询语句
  24. //select * from db.tn where id='18';
  25. String querySql = "select * from " + GmallConfig.HBASE_SCHEMA + "." + tableName +
  26. " where id='" + id + "'";
  27. //查询Phoenix
  28. List<JSONObject> queryList = JdbcUtil.queryList(connection, querySql, JSONObject.class, false);
  29. JSONObject dimInfoJson = queryList.get(0);
  30. //在返回结果之前,将数据写入Redis
  31. jedis.set(redisKey, dimInfoJson.toJSONString());
  32. jedis.expire(redisKey, 24 * 60 * 60);
  33. jedis.close();
  34. //返回结果
  35. return dimInfoJson;
  36. }
  37. public static void delRedisDimInfo(String tableName, String id) {
  38. Jedis jedis = RedisUtil.getJedis();
  39. String redisKey = "DIM:" + tableName + ":" + id;
  40. jedis.del(redisKey);
  41. jedis.close();
  42. }
  43. public static void main(String[] args) throws Exception {
  44. Class.forName(GmallConfig.PHOENIX_DRIVER);
  45. Connection connection = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
  46. long start = System.currentTimeMillis();
  47. //System.out.println(getDimInfo(connection, "DIM_BASE_TRADEMARK", "15 "));
  48. System.out.println(getDimInfo(connection, "DIM_USER_INFO", "143"));
  49. long end = System.currentTimeMillis();
  50. System.out.println(getDimInfo(connection, "DIM_USER_INFO", "143"));
  51. long end2 = System.currentTimeMillis();
  52. System.out.println(getDimInfo(connection, "DIM_USER_INFO", "143"));
  53. long end3 = System.currentTimeMillis();
  54. System.out.println(end - start);
  55. System.out.println(end2 - end);
  56. System.out.println(end3 - end2);
  57. connection.close();
  58. }
  59. }

107-DWM层-订单宽表  关联维度  优化2  异步IO编码  测试完成

  1. 启动zookeeper:
  2. bin/zkServer.sh start
  3. 启动hdfs:
  4. sbin/hadoop-daemon.sh start namenode
  5. sbin/hadoop-daemon.sh start datanode
  6. 启动hbase:
  7. bin/hbase-daemon.sh start master
  8. bin/hbase-daemon.sh start regionserver
  9. 启动程序:
  10. BaseDBApp.java
  11. ods/FlinkCDC.java
  12. OrderWideApp.java
  13. 启动:
  14. gmall2020-mock-db-2020-11-27.jar
  15. select count(*) from `gmall-210325-flink`.order_detail

 

109-DWM层-订单宽表  最终测试

bin/kafka-console-consumer.sh --bootstrap-server  bigdata-training01.erongda.com:9092 -from-beginning --topic  dwm_order_wide 

113-尚硅谷-Flink实时数仓-DWM层-支付宽表  代码测试

  1. 启动kafaka:
  2. bin/kafka-console-consumer.sh --bootstrap-server bigdata-
  3. training01.erongda.com:9092 -from-beginning --topic dwm_payment_wide
  4. 启动程序:
  5. BaseDBApp.java
  6. ods/FlinkCDC.java
  7. OrderWideApp.java
  8. PaymentWideApp.java
  9. SELECT
  10. count(*)
  11. FROM
  12. payment_info p
  13. JOIN order_detail o ON p.order_id = o.order_id;

 测试数据是否一致:

 

 相关数据的例子:

  1. 注意:对应数据关联上了,对应的维度数据也查询到了
  2. OrderWideApp:
  3. orderWideWithCategory3DS>>>>>>>>>>>> OrderWide(detail_id=80399, order_id=26873, sku_id=16, order_price=4488.00, sku_num=1, sku_name=华为 HUAWEI P40 麒麟990 5G SoC芯片 5000万超感知徕卡三摄 30倍数字变焦 8GB+128GB亮黑色全网通5G手机, province_id=16, order_status=1001, user_id=3209, total_amount=13959.00, activity_reduce_amount=0.00, coupon_reduce_amount=0.00, original_total_amount=13945.00, feight_fee=14.00, split_feight_fee=null, split_activity_amount=null, split_coupon_amount=null, split_total_amount=4488.00, expire_time=null, create_time=2023-06-12 19:59:53, operate_time=null, create_date=2023-06-12, create_hour=19, province_name=吉林, province_area_code=220000, province_iso_code=CN-22, province_3166_2_code=CN-JL, user_age=18, user_gender=F, spu_id=4, tm_id=3, category3_id=61, spu_name=HUAWEI P40, tm_name=华为, category3_name=手机)
  4. orderWideWithCategory3DS>>>>>>>>>>>> OrderWide(detail_id=80404, order_id=26874, sku_id=31, order_price=69.00, sku_num=1, sku_name=CAREMiLLE珂曼奶油小方口红 雾面滋润保湿持久丝缎唇膏 M03赤茶, province_id=32, order_status=1001, user_id=3539, total_amount=20366.00, activity_reduce_amount=0.00, coupon_reduce_amount=0.00, original_total_amount=20361.00, feight_fee=5.00, split_feight_fee=null, split_activity_amount=null, split_coupon_amount=null, split_total_amount=69.00, expire_time=null, create_time=2023-06-12 19:59:53, operate_time=null, create_date=2023-06-12, create_hour=19, province_name=贵州, province_area_code=520000, province_iso_code=CN-52, province_3166_2_code=CN-GZ, user_age=24, user_gender=M, spu_id=10, tm_id=9, category3_id=477, spu_name=CAREMiLLE珂曼奶油小方口红 雾面滋润保湿持久丝缎唇膏, tm_name=CAREMiLLE, category3_name=唇部)
  5. PaymentWideApp:
  6. >>>>>>>>>> PaymentWide(payment_id=18002, subject=华为 HUAWEI P40 麒麟990 5G SoC芯片 5000万超感知徕卡三摄 30倍数字变焦 8GB+128GB亮黑色全网通5G手机等4件商品, payment_type=1101, payment_create_time=2023-06-12 19:59:53, callback_time=2023-06-12 20:00:13, detail_id=80399, order_id=26873, sku_id=16, order_price=4488.00, sku_num=1, sku_name=华为 HUAWEI P40 麒麟990 5G SoC芯片 5000万超感知徕卡三摄 30倍数字变焦 8GB+128GB亮黑色全网通5G手机, province_id=16, order_status=1001, user_id=3209, total_amount=13959.00, activity_reduce_amount=0.00, coupon_reduce_amount=0.00, original_total_amount=13945.00, feight_fee=14.00, split_feight_fee=null, split_activity_amount=null, split_coupon_amount=null, split_total_amount=4488.00, order_create_time=2023-06-12 19:59:53, province_name=吉林, province_area_code=220000, province_iso_code=CN-22, province_3166_2_code=CN-JL, user_age=18, user_gender=F, spu_id=4, tm_id=3, category3_id=61, spu_name=HUAWEI P40, tm_name=华为, category3_name=手机)
  7. >>>>>>>>>> PaymentWide(payment_id=17996, subject=TCL 85Q6 85英寸 巨幕私人影院电视 4K超高清 AI智慧屏 全景全面屏 MEMC运动防抖 2+16GB 液晶平板电视机等8件商品, payment_type=1102, payment_create_time=2023-06-12 19:59:53, callback_time=2023-06-12 20:00:13, detail_id=80382, order_id=26864, sku_id=16, order_price=4488.00, sku_num=2, sku_name=华为 HUAWEI P40 麒麟990 5G SoC芯片 5000万超感知徕卡三摄 30倍数字变焦 8GB+128GB亮黑色全网通5G手机, province_id=8, order_status=1001, user_id=1008, total_amount=45106.00, activity_reduce_amount=0.00, coupon_reduce_amount=0.00, original_total_amount=45090.00, feight_fee=16.00, split_feight_fee=null, split_activity_amount=null, split_coupon_amount=null, split_total_amount=8976.00, order_create_time=2023-06-12 19:59:53, province_name=浙江, province_area_code=330000, province_iso_code=CN-33, province_3166_2_code=CN-ZJ, user_age=56, user_gender=F, spu_id=4, tm_id=3, category3_id=61, spu_name=HUAWEI P40, tm_name=华为, category3_name=手机)

120-Flink实时数仓-DWS层-访客主题宽表  打印测试

  1. 启动:
  2. BaseLogApp.java
  3. UniqueVisitApp.java
  4. UserJumpDetailApp.java
  5. VisitorStatsApp.java
  6. 启动日志:
  7. java -jar gmall2020-mock-log-2020-12-18.jar
  8. java -jar gmall-logger.jar

 

  1. UserJumpDetailApp:
  2. {"common":{"ar":"310000","uid":"1","os":"iOS 13.2.3","ch":"Appstore","is_new":"0","md":"iPhone 8","mid":"mid_20","vc":"v2.1.134","ba":"iPhone"},"page":{"page_id":"home","during_time":2584},"displays":[{"display_type":"activity","item":"1","item_type":"activity_id","pos_id":4,"order":1},{"display_type":"activity","item":"1","item_type":"activity_id","pos_id":4,"order":2},{"display_type":"query","item":"9","item_type":"sku_id","pos_id":2,"order":3},{"display_type":"query","item":"7","item_type":"sku_id","pos_id":5,"order":4},{"display_type":"promotion","item":"3","item_type":"sku_id","pos_id":1,"order":5},{"display_type":"query","item":"10","item_type":"sku_id","pos_id":1,"order":6},{"display_type":"query","item":"8","item_type":"sku_id","pos_id":1,"order":7},{"display_type":"query","item":"4","item_type":"sku_id","pos_id":4,"order":8}],"ts":1690880391000}
  3. {"common":{"ar":"110000","uid":"31","os":"Android 11.0","ch":"huawei","is_new":"0","md":"Xiaomi 10 Pro ","mid":"mid_20","vc":"v2.1.134","ba":"Xiaomi"},"page":{"page_id":"home","during_time":19457},"displays":[{"display_type":"activity","item":"1","item_type":"activity_id","pos_id":5,"order":1},{"display_type":"activity","item":"1","item_type":"activity_id","pos_id":5,"order":2},{"display_type":"promotion","item":"3","item_type":"sku_id","pos_id":2,"order":3},{"display_type":"query","item":"9","item_type":"sku_id","pos_id":1,"order":4},{"display_type":"recommend","item":"10","item_type":"sku_id","pos_id":2,"order":5},{"display_type":"query","item":"5","item_type":"sku_id","pos_id":4,"order":6},{"display_type":"query","item":"1","item_type":"sku_id","pos_id":2,"order":7},{"display_type":"promotion","item":"2","item_type":"sku_id","pos_id":3,"order":8},{"display_type":"promotion","item":"8","item_type":"sku_id","pos_id":5,"order":9},{"display_type":"query","item":"5","item_type":"sku_id","pos_id":4,"order":10},{"display_type":"promotion","item":"7","item_type":"sku_id","pos_id":4,"order":11}],"ts":1690880394000}
  4. VisitorStatsApp:
  5. >>>>>>>>>>>> VisitorStats(stt=2023-08-01 16:59:20, edt=2023-08-01 16:59:30, vc=v2.0.1, ch=Appstore, ar=310000, is_new=0, uv_ct=1, pv_ct=9, sv_ct=1, uj_ct=0, dur_sum=116391, ts=1690880362000)
  6. >>>>>>>>>>>> VisitorStats(stt=2023-08-01 16:59:20, edt=2023-08-01 16:59:30, vc=v2.1.132, ch=xiaomi, ar=110000, is_new=0, uv_ct=1, pv_ct=2, sv_ct=1, uj_ct=0, dur_sum=16767, ts=1690880364000)
  7. >>>>>>>>>>>> VisitorStats(stt=2023-08-01 16:59:20, edt=2023-08-01 16:59:30, vc=v2.1.134, ch=oppo, ar=310000, is_new=0, uv_ct=1, pv_ct=9, sv_ct=1, uj_ct=0, dur_sum=111592, ts=1690880365000)
  8. >>>>>>>>>>>> VisitorStats(stt=2023-08-01 16:59:20, edt=2023-08-01 16:59:30, vc=v2.1.134, ch=Appstore, ar=420000, is_new=0, uv_ct=0, pv_ct=2, sv_ct=1, uj_ct=0, dur_sum=29586, ts=1690880369000)
  9. >>>>>>>>>>>> VisitorStats(stt=2023-08-01 16:59:20, edt=2023-08-01 16:59:30, vc=v2.1.132, ch=Appstore, ar=420000, is_new=0, uv_ct=1, pv_ct=2, sv_ct=1, uj_ct=0, dur_sum=8265, ts=1690880369000)

uj_ct都为0的原因:

 解决方案:
     方案1:将事件改为处理时间:但这样不太好,消费相同的数据不具备幂等性
     方案2:水准线改为11秒 需要损失一定的时效性

  1. VisitorStatsApp.java
  2. //TODO 5.提取时间戳生成WaterMark
  3. SingleOutputStreamOperator<VisitorStats> visitorStatsWithWMDS = unionDS.assignTimestampsAndWatermarks(WatermarkStrategy
  4. .<VisitorStats>forBoundedOutOfOrderness(Duration.ofSeconds(11))
  5. .withTimestampAssigner(new SerializableTimestampAssigner<VisitorStats>() {
  6. @Override
  7. public long extractTimestamp(VisitorStats element, long recordTimestamp) {
  8. return element.getTs();
  9. }
  10. }));

143-DWS层-访客主题  ClickHouseUtil  测试完成

  1. 启动clickhouse:
  2. docker exec -it clickhouse-server /bin/bash
  3. clickhouse-client
  4. 创建clickhouse表:
  5. create table visitor_stats_210325 (
  6. stt DateTime,
  7. edt DateTime,
  8. vc String,
  9. ch String,
  10. ar String,
  11. is_new String,
  12. uv_ct UInt64,
  13. pv_ct UInt64,
  14. sv_ct UInt64,
  15. uj_ct UInt64,
  16. dur_sum UInt64,
  17. ts UInt64
  18. ) engine =ReplacingMergeTree(ts)
  19. partition by toYYYYMMDD(stt)
  20. order by (stt,edt,is_new,vc,ch,ar);
  21. 启动程序:
  22. BaseLogApp.java
  23. UniqueVisitApp.java
  24. UserJumpDetailApp.java
  25. VisitorStatsApp.java

造日志数据: /opt/modules/gmall-flink/rt_applog

 sudo java -jar gmall-logger.jar 

 sudo java -jar gmall2020-mock-log-2020-12-18.jar 

 select count(*) from visitor_stats_210325;

注意:插入是5的倍数,所以为55条

 

  1. 相关数据:
  2. UniqueVisitApp:
  3. {"common":{"ar":"310000","uid":"42","os":"Android 10.0","ch":"web","is_new":"0","md":"Oneplus 7","mid":"mid_6","vc":"v2.1.134","ba":"Oneplus"},"page":{"page_id":"home","during_time":14678},"displays":[{"display_type":"activity","item":"2","item_type":"activity_id","pos_id":5,"order":1},{"display_type":"activity","item":"1","item_type":"activity_id","pos_id":5,"order":2},{"display_type":"query","item":"3","item_type":"sku_id","pos_id":1,"order":3},{"display_type":"query","item":"1","item_type":"sku_id","pos_id":4,"order":4},{"display_type":"query","item":"1","item_type":"sku_id","pos_id":4,"order":5},{"display_type":"recommend","item":"9","item_type":"sku_id","pos_id":3,"order":6},{"display_type":"query","item":"9","item_type":"sku_id","pos_id":3,"order":7}],"ts":1690880847000}
  4. {"common":{"ar":"110000","uid":"5","os":"Android 11.0","ch":"oppo","is_new":"0","md":"Sumsung Galaxy S20","mid":"mid_15","vc":"v2.1.134","ba":"Sumsung"},"page":{"page_id":"home","during_time":2184},"displays":[{"display_type":"activity","item":"1","item_type":"activity_id","pos_id":1,"order":1},{"display_type":"activity","item":"1","item_type":"activity_id","pos_id":1,"order":2},{"display_type":"promotion","item":"6","item_type":"sku_id","pos_id":5,"order":3},{"display_type":"promotion","item":"9","item_type":"sku_id","pos_id":1,"order":4},{"display_type":"query","item":"7","item_type":"sku_id","pos_id":2,"order":5},{"display_type":"promotion","item":"2","item_type":"sku_id","pos_id":5,"order":6},{"display_type":"promotion","item":"5","item_type":"sku_id","pos_id":5,"order":7},{"display_type":"recommend","item":"2","item_type":"sku_id","pos_id":5,"order":8},{"display_type":"query","item":"10","item_type":"sku_id","pos_id":4,"order":9},{"display_type":"promotion","item":"9","item_type":"sku_id","pos_id":3,"order":10},{"display_type":"query","item":"3","item_type":"sku_id","pos_id":1,"order":11},{"display_type":"query","item":"8","item_type":"sku_id","pos_id":1,"order":12}],"ts":1690880874000}
  5. UserJumpDetailApp:
  6. {"common":{"ar":"370000","uid":"20","os":"Android 11.0","ch":"web","is_new":"0","md":"Xiaomi Mix2 ","mid":"mid_16","vc":"v2.1.134","ba":"Xiaomi"},"page":{"page_id":"home","during_time":9663},"displays":[{"display_type":"activity","item":"2","item_type":"activity_id","pos_id":4,"order":1},{"display_type":"recommend","item":"10","item_type":"sku_id","pos_id":2,"order":2},{"display_type":"promotion","item":"7","item_type":"sku_id","pos_id":3,"order":3},{"display_type":"query","item":"3","item_type":"sku_id","pos_id":5,"order":4},{"display_type":"query","item":"3","item_type":"sku_id","pos_id":1,"order":5},{"display_type":"query","item":"10","item_type":"sku_id","pos_id":5,"order":6},{"display_type":"recommend","item":"8","item_type":"sku_id","pos_id":2,"order":7}],"ts":1690880883000}
  7. {"common":{"ar":"110000","uid":"1","os":"iOS 13.3.1","ch":"Appstore","is_new":"0","md":"iPhone 8","mid":"mid_9","vc":"v2.1.111","ba":"iPhone"},"page":{"page_id":"home","during_time":16573},"displays":[{"display_type":"activity","item":"2","item_type":"activity_id","pos_id":4,"order":1},{"display_type":"query","item":"9","item_type":"sku_id","pos_id":4,"order":2},{"display_type":"promotion","item":"7","item_type":"sku_id","pos_id":1,"order":3},{"display_type":"promotion","item":"8","item_type":"sku_id","pos_id":3,"order":4},{"display_type":"promotion","item":"8","item_type":"sku_id","pos_id":5,"order":5},{"display_type":"query","item":"3","item_type":"sku_id","pos_id":3,"order":6},{"display_type":"query","item":"1","item_type":"sku_id","pos_id":1,"order":7},{"display_type":"query","item":"8","item_type":"sku_id","pos_id":3,"order":8},{"display_type":"recommend","item":"6","item_type":"sku_id","pos_id":2,"order":9},{"display_type":"query","item":"1","item_type":"sku_id","pos_id":1,"order":10},{"display_type":"query","item":"9","item_type":"sku_id","pos_id":4,"order":11}],"ts":1690880894000}
  8. {"common":{"ar":"110000","uid":"33","os":"Android 11.0","ch":"oppo","is_new":"0","md":"vivo iqoo3","mid":"mid_8","vc":"v2.1.132","ba":"vivo"},"page":{"page_id":"home","during_time":2347},"displays":[{"display_type":"activity","item":"1","item_type":"activity_id","pos_id":1,"order":1},{"display_type":"activity","item":"2","item_type":"activity_id","pos_id":1,"order":2},{"display_type":"query","item":"6","item_type":"sku_id","pos_id":2,"order":3},{"display_type":"query","item":"5","item_type":"sku_id","pos_id":3,"order":4},{"display_type":"promotion","item":"7","item_type":"sku_id","pos_id":1,"order":5},{"display_type":"recommend","item":"2","item_type":"sku_id","pos_id":5,"order":6},{"display_type":"query","item":"8","item_type":"sku_id","pos_id":2,"order":7},{"display_type":"query","item":"1","item_type":"sku_id","pos_id":3,"order":8},{"display_type":"query","item":"4","item_type":"sku_id","pos_id":4,"order":9},{"display_type":"query","item":"9","item_type":"sku_id","pos_id":4,"order":10},{"display_type":"query","item":"5","item_type":"sku_id","pos_id":4,"order":11},{"display_type":"query","item":"10","item_type":"sku_id","pos_id":1,"order":12}],"ts":1690880914000}
  9. >>>>>>>>>>>> VisitorStats(stt=2023-08-01 16:57:10, edt=2023-08-01 16:57:20, vc=v2.1.132, ch=vivo, ar=310000, is_new=0, uv_ct=1, pv_ct=0, sv_ct=0, uj_ct=0, dur_sum=0, ts=1690880232000)
  10. >>>>>>>>>>>> VisitorStats(stt=2023-08-01 16:57:10, edt=2023-08-01 16:57:20, vc=v2.1.134, ch=oppo, ar=530000, is_new=0, uv_ct=1, pv_ct=0, sv_ct=0, uj_ct=0, dur_sum=0, ts=1690880233000)
  11. >>>>>>>>>>>> VisitorStats(stt=2023-08-01 17:07:20, edt=2023-08-01 17:07:30, vc=v2.1.134, ch=web, ar=310000, is_new=0, uv_ct=1, pv_ct=0, sv_ct=0, uj_ct=0, dur_sum=0, ts=1690880847000)

 120.

  1. Start the procedure:
  2. BaseLogApp.java
  3. VisitorStatsApp.java
  4. UniqueVisitApp.java
  5. UserJumpDetailApp.java
  6. Startup log:
  7. java -jar gmall2020-mock-log-20-12-18.jar
  8. java -jar gmall-logger.jar

 

  1. 实例:
  2. UserJumpDetailApp:
  3. {"common":{"ar":"530000","uid":"50","os":"Android 11.0","ch":"oppo","is_new":"0","md":"Xiaomi 10 Pro ","mid":"mid_3","vc":"v2.1.134","ba":"Xiaomi"},"page":{"page_id":"home","during_time":19370},"displays":[{"display_type":"activity","item":"1","item_type":"activity_id","pos_id":5,"order":1},{"display_type":"activity","item":"2","item_type":"activity_id","pos_id":5,"order":2},{"display_type":"query","item":"8","item_type":"sku_id","pos_id":5,"order":3},{"display_type":"query","item":"2","item_type":"sku_id","pos_id":3,"order":4},{"display_type":"promotion","item":"8","item_type":"sku_id","pos_id":4,"order":5},{"display_type":"promotion","item":"6","item_type":"sku_id","pos_id":3,"order":6},{"display_type":"promotion","item":"5","item_type":"sku_id","pos_id":1,"order":7}],"ts":1690879690000}
  4. {"common":{"ar":"110000","uid":"24","os":"Android 11.0","ch":"oppo","is_new":"0","md":"Sumsung Galaxy S20","mid":"mid_19","vc":"v2.1.134","ba":"Sumsung"},"page":{"page_id":"home","during_time":12777},"displays":[{"display_type":"activity","item":"2","item_type":"activity_id","pos_id":4,"order":1},{"display_type":"activity","item":"2","item_type":"activity_id","pos_id":4,"order":2},{"display_type":"query","item":"4","item_type":"sku_id","pos_id":3,"order":3},{"display_type":"recommend","item":"8","item_type":"sku_id","pos_id":2,"order":4},{"display_type":"query","item":"5","item_type":"sku_id","pos_id":2,"order":5},{"display_type":"promotion","item":"6","item_type":"sku_id","pos_id":4,"order":6}],"ts":1690879770000}
  5. {"common":{"ar":"440000","uid":"20","os":"Android 11.0","ch":"vivo","is_new":"0","md":"vivo iqoo3","mid":"mid_7","vc":"v2.1.132","ba":"vivo"},"page":{"page_id":"home","during_time":13454},"displays":[{"display_type":"activity","item":"2","item_type":"activity_id","pos_id":1,"order":1},{"display_type":"activity","item":"2","item_type":"activity_id","pos_id":1,"order":2},{"display_type":"query","item":"4","item_type":"sku_id","pos_id":3,"order":3},{"display_type":"query","item":"2","item_type":"sku_id","pos_id":1,"order":4},{"display_type":"query","item":"1","item_type":"sku_id","pos_id":3,"order":5},{"display_type":"promotion","item":"3","item_type":"sku_id","pos_id":1,"order":6},{"display_type":"promotion","item":"9","item_type":"sku_id","pos_id":2,"order":7},{"display_type":"recommend","item":"5","item_type":"sku_id","pos_id":5,"order":8},{"display_type":"query","item":"5","item_type":"sku_id","pos_id":5,"order":9}],"ts":1690879766000}
  6. VisitorStatsApp:
  7. >>>>>>>>>>>> VisitorStats(stt=2023-08-01 16:49:30, edt=2023-08-01 16:49:40, vc=v2.1.134, ch=Appstore, ar=370000, is_new=0, uv_ct=0, pv_ct=2, sv_ct=1, uj_ct=0, dur_sum=6736, ts=1690879775000)
  8. >>>>>>>>>>>> VisitorStats(stt=2023-08-01 16:49:30, edt=2023-08-01 16:49:40, vc=v2.1.134, ch=vivo, ar=110000, is_new=0, uv_ct=0, pv_ct=2, sv_ct=1, uj_ct=0, dur_sum=30120, ts=1690879774000)
  9. >>>>>>>>>>>> VisitorStats(stt=2023-08-01 16:49:30, edt=2023-08-01 16:49:40, vc=v2.1.134, ch=oppo, ar=110000, is_new=0, uv_ct=0, pv_ct=1, sv_ct=1, uj_ct=1, dur_sum=12777, ts=1690879770000)

152-Flink实时数仓-DWS层-商品主题-整体测试

  1. 启动任务:
  2. BaseDBApp.java
  3. ods/FlinkCDC.java
  4. BaseLogApp.java
  5. OrderWideApp.java
  6. PaymentWideApp.java
  7. ProductStatsApp.java
  8. 创建表:
  9. create table product_stats_210325 (
  10. stt DateTime,
  11. edt DateTime,
  12. sku_id UInt64,
  13. sku_name String,
  14. sku_price Decimal64(2),
  15. spu_id UInt64,
  16. spu_name String ,
  17. tm_id UInt64,
  18. tm_name String,
  19. category3_id UInt64,
  20. category3_name String ,
  21. display_ct UInt64,
  22. click_ct UInt64,
  23. favor_ct UInt64,
  24. cart_ct UInt64,
  25. order_sku_num UInt64,
  26. order_amount Decimal64(2),
  27. order_ct UInt64 ,
  28. payment_amount Decimal64(2),
  29. paid_order_ct UInt64,
  30. refund_order_ct UInt64,
  31. refund_amount Decimal64(2),
  32. comment_ct UInt64,
  33. good_comment_ct UInt64 ,
  34. ts UInt64
  35. )engine =ReplacingMergeTree(ts)
  36. partition by toYYYYMMDD(stt)
  37. order by (stt,edt,sku_id );
  38. 造数据:
  39. 注意:日期都改为当天
  40. java -jar gmall2020-mock-log-2020-12-18.jar
  41. java -jar gmall-logger.jar
  42. java -jar gmall2020-mock-db-2020-11-27.jar

  1. FlinkCDC.java
  2. {"database":"gmall-210325-flink","before":{},"after":{"create_time":"2023-08-30 17:49:43","user_id":1017,"appraise":"1201","comment_txt":"评论内容:42324344452486777298996128427291877464398933868315","sku_id":15,"id":1696822496305336323,"spu_id":4,"order_id":27720},"type":"insert","tableName":"comment_info"}
  3. {"database":"gmall-210325-flink","before":{},"after":{"create_time":"2023-08-30 17:49:43","user_id":686,"appraise":"1201","comment_txt":"评论内容:23348134682275467263161282892337962443344368354913","sku_id":20,"id":1696822496309530626,"spu_id":6,"order_id":27724},"type":"insert","tableName":"comment_info"}
  4. {"database":"gmall-210325-flink","before":{},"after":{"create_time":"2023-08-30 17:49:43","user_id":3004,"appraise":"1204","comment_txt":"评论内容:49324881795631648686613552784978122797446563289775","sku_id":29,"id":1696822496309530627,"spu_id":10,"order_id":27724},"type":"insert","tableName":"comment_info"}
  5. BaseDBApp.java
  6. Kafka>>>>>>>>> {"sinkTable":"dwd_order_info_update","database":"gmall-210325-flink","before":{"delivery_address":"第11大街第21号楼8单元158门","consignee":"沈蕊","create_time":"2023-08-30 17:49:42","order_comment":"描述654765","expire_time":"2023-08-30 18:04:42","original_total_amount":20992.00,"coupon_reduce_amount":0.00,"order_status":"1002","out_trade_no":"131946498159351","total_amount":21005.00,"user_id":3151,"img_url":"http:img.gmall.com/196714.jpg","province_id":23,"feight_fee":13.00,"consignee_tel":"13100154190","trade_body":"Apple iPhone 12 (A2404) 128GB 黑色 支持移动联通电信5G 双卡双待手机等4件商品","id":27729,"activity_reduce_amount":0.00,"operate_time":"2023-08-30 17:49:42"},"after":{"delivery_address":"第11大街第21号楼8单元158门","consignee":"沈蕊","create_time":"2023-08-30 17:49:42","order_comment":"描述654765","expire_time":"2023-08-30 18:04:42","original_total_amount":20992.00,"coupon_reduce_amount":0.00,"order_status":"1004","out_trade_no":"131946498159351","total_amount":21005.00,"user_id":3151,"img_url":"http:img.gmall.com/196714.jpg","province_id":23,"feight_fee":13.00,"consignee_tel":"13100154190","trade_body":"Apple iPhone 12 (A2404) 128GB 黑色 支持移动联通电信5G 双卡双待手机等4件商品","id":27729,"activity_reduce_amount":0.00,"operate_time":"2023-08-30 17:49:43"},"type":"update","tableName":"order_info"}
  7. BaseLogApp.java
  8. Display>>>>>>>>>>>>> {"display_type":"recommend","page_id":"home","item":"8","item_type":"sku_id","pos_id":2,"order":11}
  9. Page>>>>>>>>>>>> {"common":{"ar":"110000","uid":"35","os":"Android 11.0","ch":"xiaomi","is_new":"0","md":"Xiaomi Mix2 ","mid":"mid_15","vc":"v2.1.134","ba":"Xiaomi"},"page":{"page_id":"good_detail","item":"1","during_time":16024,"item_type":"sku_id","last_page_id":"home","source_type":"query"},"displays":[{"display_type":"promotion","item":"6","item_type":"sku_id","pos_id":4, "order":1},{"display_type":"promotion","item":"3","item_type":"sku_id","pos_id":1,"order":2},{"display_type":"query","item":"5","item_type":"sku_id","pos_id":4,"order":3},{"display_type":"promotion","item":"8","item_type":"sku_id","pos_id":4,"order":4},{"display_type":"query","item":"9","item_type":"sku_id","pos_id":3, "order":5},{"display_type":"query","item":"2","item_type":"sku_id","pos_id":5,"order":6}],"actions":[{"item":"1","action_id":"get_coupon","item_type":"coupon_id","ts":1693388957012}],"ts":1693388949000}
  10. Display>>>>>>>>>>>>> {"display_type":"promotion","page_id":"good_detail","item":"6","item_type":"sku_id","pos_id":4,"order":1}
  11. Display>>>>>>>>>>>>> {"display_type":"promotion","page_id":"good_detail","item":"3","item_type":"sku_id","pos_id":1,"order":2}
  12. Display>>>>>>>>>>>>> {"display_type":"query","page_id":"good_detail","item":"5","item_type":"sku_id","pos_id":4,"order":3}
  13. Display>>>>>>>>>>>>> {"display_type":"promotion","page_id":"good_detail","item":"8","item_type":"sku_id","pos_id":4,"order":4}
  14. Display>>>>>>>>>>>>> {"display_type":"query","page_id":"good_detail","item":"9","item_type":"sku_id","pos_id":3,"order":5}
  15. Display>>>>>>>>>>>>> {"display_type":"query","page_id":"good_detail","item":"2","item_type":"sku_id","pos_id":5,"order":6}
  16. Start>>>>>>>>>>>> {"common":{"ar":"500000","uid":"39","os":"Android 11.0","ch":"xiaomi","is_new":"0","md":"Huawei P30","mid":"mid_4","vc":"v2.1.134","ba":"Huawei"},"start":{"entry":"icon","open_ad_skip_ms":0,"open_ad_ms":3325,"loading_time":3394,"open_ad_id":3},"ts":1693388949000}
  17. OrderWideApp.java
  18. orderWideWithCategory3DS>>>>>>>>>>>> OrderWide(detail_id=82477, order_id=27728, sku_id=15, order_price=4488.00, sku_num=1, sku_name=华为 HUAWEI P40 麒麟990 5G SoC芯片 5000万超感知徕卡三摄 30倍数字变焦 8GB+128GB冰霜银全网通5G手机, province_id=13, order_status=1001, user_id=3018, total_amount=12712.00, activity_reduce_amount=0.00, coupon_reduce_amount=0.00, original_total_amount=12696.00, feight_fee=16.00, split_feight_fee=null, split_activity_amount=null, split_coupon_amount=null, split_total_amount=4488.00, expire_time=null, create_time=2023-08-30 17:49:42, operate_time=null, create_date=2023-08-30, create_hour=17, province_name=重庆, province_area_code=500000, province_iso_code=CN-50, province_3166_2_code=CN-CQ, user_age=37, user_gender=F, spu_id=4, tm_id=3, category3_id=61, spu_name=HUAWEI P40, tm_name=华为, category3_name=手机)
  19. orderWideWithCategory3DS>>>>>>>>>>>> OrderWide(detail_id=82479, order_id=27729, sku_id=12, order_price=9197.00, sku_num=2, sku_name=Apple iPhone 12 (A2404) 128GB 黑色 支持移动联通电信5G 双卡双待手机, province_id=23, order_status=1001, user_id=3151, total_amount=21005.00, activity_reduce_amount=0.00, coupon_reduce_amount=0.00, original_total_amount=20992.00, feight_fee=13.00, split_feight_fee=null, split_activity_amount=null, split_coupon_amount=null, split_total_amount=18394.00, expire_time=null, create_time=2023-08-30 17:49:42, operate_time=null, create_date=2023-08-30, create_hour=17, province_name=河南, province_area_code=410000, province_iso_code=CN-41, province_3166_2_code=CN-HA, user_age=22, user_gender=M, spu_id=3, tm_id=2, category3_id=61, spu_name=Apple iPhone 12, tm_name=苹果, category3_name=手机)
  20. PaymentWideApp.java
  21. >>>>>>>>>> PaymentWide(payment_id=18606, subject=金沙河面条 原味银丝挂面 龙须面 方便速食拉面 清汤面 900g等3件商品, payment_type=1102, payment_create_time=2023-08-30 17:49:42, callback_time=2023-08-30 17:50:02, detail_id=82477, order_id=27728, sku_id=15, order_price=4488.00, sku_num=1, sku_name=华为 HUAWEI P40 麒麟990 5G SoC芯片 5000万超感知徕卡三摄 30倍数字变焦 8GB+128GB冰霜银全网通5G手机, province_id=13, order_status=1001, user_id=3018, total_amount=12712.00, activity_reduce_amount=0.00, coupon_reduce_amount=0.00, original_total_amount=12696.00, feight_fee=16.00, split_feight_fee=null, split_activity_amount=null, split_coupon_amount=null, split_total_amount=4488.00, order_create_time=2023-08-30 17:49:42, province_name=重庆, province_area_code=500000, province_iso_code=CN-50, province_3166_2_code=CN-CQ, user_age=37, user_gender=F, spu_id=4, tm_id=3, category3_id=61, spu_name=HUAWEI P40, tm_name=华为, category3_name=手机)
  22. >>>>>>>>>> PaymentWide(payment_id=18607, subject=Apple iPhone 12 (A2404) 128GB 黑色 支持移动联通电信5G 双卡双待手机等4件商品, payment_type=1102, payment_create_time=2023-08-30 17:49:42, callback_time=2023-08-30 17:50:02, detail_id=82479, order_id=27729, sku_id=12, order_price=9197.00, sku_num=2, sku_name=Apple iPhone 12 (A2404) 128GB 黑色 支持移动联通电信5G 双卡双待手机, province_id=23, order_status=1001, user_id=3151, total_amount=21005.00, activity_reduce_amount=0.00, coupon_reduce_amount=0.00, original_total_amount=20992.00, feight_fee=13.00, split_feight_fee=null, split_activity_amount=null, split_coupon_amount=null, split_total_amount=18394.00, order_create_time=2023-08-30 17:49:42, province_name=河南, province_area_code=410000, province_iso_code=CN-41, province_3166_2_code=CN-HA, user_age=22, user_gender=M, spu_id=3, tm_id=2, category3_id=61, spu_name=Apple iPhone 12, tm_name=苹果, category3_name=手机)
  23. ProductStatsApp.java
  24. ProductStats(stt=2023-08-30 17:49:00, edt=2023-08-30 17:49:10, sku_id=2, sku_name=小米10 至尊纪念版 双模5G 骁龙865 120HZ高刷新率 120倍长焦镜头 120W快充 12GB+256GB 陶瓷黑 游戏手机, sku_price=6999, spu_id=1, spu_name=小米10, tm_id=5, tm_name=小米, category3_id=61, category3_name=手机, display_ct=27, click_ct=4, favor_ct=0, cart_ct=0, order_sku_num=0, order_amount=0, order_ct=0, payment_amount=0, paid_order_ct=0, refund_order_ct=0, refund_amount=0, comment_ct=0, good_comment_ct=0, orderIdSet=[], paidOrderIdSet=[], refundOrderIdSet=[], ts=1693388941000)
  25. ProductStats(stt=2023-08-30 17:49:00, edt=2023-08-30 17:49:10, sku_id=8, sku_name=Apple iPhone 12 (A2404) 64GB 黑色 支持移动联通电信5G 双卡双待手机, sku_price=8197, spu_id=3, spu_name=Apple iPhone 12, tm_id=2, tm_name=苹果, category3_id=61, category3_name=手机, display_ct=36, click_ct=0, favor_ct=0, cart_ct=0, order_sku_num=0, order_amount=0, order_ct=0, payment_amount=0, paid_order_ct=0, refund_order_ct=0, refund_amount=0, comment_ct=0, good_comment_ct=0, orderIdSet=[], paidOrderIdSet=[], refundOrderIdSet=[], ts=1693388940000)

 

157-DWS层-商品主题  代码编写  将数据写入ClickHouse&测试

  1. 启动kafka:
  2. bin/kafka-server-start.sh config/server9092.properties
  3. 启动clickhouse:
  4. docker exec -it clickhouse-server /bin/bash
  5. clickhouse-client
  6. 启动reids:
  7. docker exec -it redis redis-cli
  8. 启动程序:
  9. BaseDBApp.java
  10. ods/FlinkCDC.java
  11. OrderWideApp.java
  12. ProvinceStatsSqlApp.java
  13. create table province_stats_210325 (
  14. stt DateTime,
  15. edt DateTime,
  16. province_id UInt64,
  17. province_name String,
  18. area_code String,
  19. iso_code String,
  20. iso_3166_2 String,
  21. order_amount Decimal64(2),
  22. order_count UInt64,
  23. ts UInt64
  24. )engine =ReplacingMergeTree(ts)
  25. partition by toYYYYMMDD(stt)
  26. order by (stt,edt,province_id);

标题

164-DWS层-关键词主题 代码测试

  1. 启动kafka:
  2. bin/kafka-server-start.sh config/server9092.properties
  3. 启动clickhouse:
  4. docker exec -it clickhouse-server /bin/bash
  5. clickhouse-client
  6. 启动程序:
  7. KeywordStatsApp.java
  8. BaseLogApp.java
  9. create table keyword_stats_210325 (
  10. stt DateTime,
  11. edt DateTime,
  12. keyword String,
  13. source String,
  14. ct UInt64,
  15. ts UInt64
  16. )engine =ReplacingMergeTree(ts)
  17. partition by toYYYYMMDD(stt)
  18. order by (stt,edt,keyword,source);
  19. 启动行为日志:
  20. /opt/modules/gmall-flink/rt_applog
  21. sudo java -jar gmall-logger.jar
  22. sudo java -jar gmall2020-mock-log-2020-12-18.jar

gmall2020-mock-log-2020-12-18.jar 日志是不间断的造数据

每隔10秒(窗口大小是10秒)看数据的输出:

注意:clickhouse数据大小每次设置为5的倍数

 

  1. Start>>>>>>>>>>>> {"common":{"ar":"230000","uid":"18","os":"iOS 13.3.1","ch":"Appstore","is_new":"0","md":"iPhone 8","mid":"mid_15","vc":"v2.1.134","ba":"iPhone"},"start":{"entry":"icon","open_ad_skip_ms":0,"open_ad_ms":7254,"loading_time":13906,"open_ad_id":16},"ts":1608304326000}
  2. Page>>>>>>>>>>>> {"common":{"ar":"230000","uid":"18","os":"iOS 13.3.1","ch":"Appstore","is_new":"0","md":"iPhone 8","mid":"mid_15","vc":"v2.1.134","ba":"iPhone"},"page":{"page_id":"home","during_time":5869},"displays":[{"display_type":"activity","item":"1","item_type":"activity_id","pos_id":1,"order":1},{"display_type":"activity","item":"1","item_type":"activity_id","pos_id":1,"order":2},{"display_type":"query","item":"5","item_type":"sku_id","pos_id":3,"order":3},{"display_type":"query","item":"7","item_type":"sku_id","pos_id":1,"order":4},{"display_type":"promotion","item":"4","item_type":"sku_id","pos_id":4,"order":5},{"display_type":"promotion","item":"1","item_type":"sku_id","pos_id":4,"order":6},{"display_type":"query","item":"4","item_type":"sku_id","pos_id":5,"order":7}],"ts":1608304326000}
  3. Display>>>>>>>>>>>>> {"display_type":"activity","page_id":"home","item":"1","item_type":"activity_id","pos_id":1,"order":1}

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号