当前位置:   article > 正文

使用flink的sql-client.sh,测试mysql-->kafka-->kafka-->mysql实时流_flink sql-client.sh

flink sql-client.sh

目录

1. 环境介绍

2. mysql建表

3. flinksql建表

3.1 进入flinksql客户端 

​3.2 配置输出格式

​3.3 flink建表

3.4 任务流配置

4. 测试

4.1 插入测试数据

4.2 查看结果表数据​

4.3 新增测试数据

4.4 再次查看结果表数据


1. 环境介绍

服务版本
zookeeper3.8.0
kafka3.3.1
flink1.13.5
mysql5.7.34
jdk1.8
scala2.12

连接器作用
flink-sql-connector-upsert-kafka_2.11-1.13.6.jar连接kafka,支持主键更新
flink-connector-mysql-cdc-2.0.2.jar读mysql
flink-connector-jdbc_2.11-1.13.6.jar写mysql
mysql-connector-java-5.1.37.jar连接mysql

2. mysql中建表

  1. CREATE TABLE src_mysql_order(
  2. order_id BIGINT,
  3. store_id BIGINT,
  4. sales_amt double,
  5. PRIMARY KEY (`order_id`)
  6. );
  7. CREATE TABLE src_mysql_order_detail(
  8. order_id BIGINT,
  9. store_id BIGINT,
  10. goods_id BIGINT,
  11. sales_amt double,
  12. PRIMARY KEY (order_id,store_id,goods_id)
  13. );
  14. CREATE TABLE dim_store(
  15. store_id BIGINT,
  16. store_name varchar(100),
  17. PRIMARY KEY (`store_id`)
  18. );
  19. CREATE TABLE dim_goods(
  20. goods_id BIGINT,
  21. goods_name varchar(100),
  22. PRIMARY KEY (`goods_id`)
  23. );
  24. CREATE TABLE dwa_mysql_order_analysis (
  25. store_id BIGINT,
  26. store_name varchar(100),
  27. sales_goods_distinct_nums bigint,
  28. sales_amt double,
  29. order_nums bigint,
  30. PRIMARY KEY (store_id,store_name)
  31. );

3. flinksql建表

3.1 进入flinksql客户端 

sql-client.sh embedded

​3.2 配置输出格式

SET sql-client.execution.result-mode=tableau;


3.3 flink建表

  1. --mysql中的 订单主表
  2. CREATE TABLE src_mysql_order(
  3. order_id BIGINT,
  4. store_id BIGINT,
  5. sales_amt double,
  6. PRIMARY KEY (`order_id`) NOT ENFORCED
  7. ) WITH (
  8. 'connector' = 'mysql-cdc',
  9. 'hostname' = 'hadoop002',
  10. 'port' = '3306',
  11. 'username' = 'root',
  12. 'password' = 'root',
  13. 'database-name' = 'test',
  14. 'table-name' = 'src_mysql_order',
  15. 'scan.incremental.snapshot.enabled' = 'false'
  16. );
  17. --mysql中的 订单明细表
  18. CREATE TABLE src_mysql_order_detail(
  19. order_id BIGINT,
  20. store_id BIGINT,
  21. goods_id BIGINT,
  22. sales_amt double,
  23. PRIMARY KEY (order_id,store_id,goods_id) NOT ENFORCED
  24. ) WITH (
  25. 'connector' = 'mysql-cdc',
  26. 'hostname' = 'hadoop002',
  27. 'port' = '3306',
  28. 'username' = 'root',
  29. 'password' = 'root',
  30. 'database-name' = 'test',
  31. 'table-name' = 'src_mysql_order_detail',
  32. 'scan.incremental.snapshot.enabled' = 'false'
  33. );
  34. --mysql中的 商店维表
  35. CREATE TABLE dim_store(
  36. store_id BIGINT,
  37. store_name varchar(100),
  38. PRIMARY KEY (`store_id`) NOT ENFORCED
  39. ) WITH (
  40. 'connector' = 'mysql-cdc',
  41. 'hostname' = 'hadoop002',
  42. 'port' = '3306',
  43. 'username' = 'root',
  44. 'password' = 'root',
  45. 'database-name' = 'test',
  46. 'table-name' = 'dim_store',
  47. 'scan.incremental.snapshot.enabled' = 'false'
  48. );
  49. --mysql中的 商品维表
  50. CREATE TABLE dim_goods(
  51. goods_id BIGINT,
  52. goods_name varchar(100),
  53. PRIMARY KEY (`goods_id`) NOT ENFORCED
  54. ) WITH (
  55. 'connector' = 'mysql-cdc',
  56. 'hostname' = 'hadoop002',
  57. 'port' = '3306',
  58. 'username' = 'root',
  59. 'password' = 'root',
  60. 'database-name' = 'test',
  61. 'table-name' = 'dim_goods',
  62. 'scan.incremental.snapshot.enabled' = 'false'
  63. );
  64. --kafka中的 ods层 订单表
  65. CREATE TABLE ods_kafka_order (
  66. order_id BIGINT,
  67. store_id BIGINT,
  68. sales_amt double,
  69. PRIMARY KEY (`order_id`) NOT ENFORCED
  70. ) WITH (
  71. 'connector' = 'upsert-kafka',
  72. 'topic' = 'ods_kafka_order',
  73. 'properties.bootstrap.servers' = 'hadoop001:9092',
  74. 'properties.group.id' = 'ods_group1',
  75. 'key.format' = 'json',
  76. 'value.format' = 'json'
  77. );
  78. ----kafka中的 ods层 订单明细表
  79. CREATE TABLE ods_kafka_order_detail (
  80. order_id BIGINT,
  81. store_id BIGINT,
  82. goods_id BIGINT,
  83. sales_amt double,
  84. PRIMARY KEY (order_id,store_id,goods_id) NOT ENFORCED
  85. ) WITH (
  86. 'connector' = 'upsert-kafka',
  87. 'topic' = 'ods_kafka_order_detail',
  88. 'properties.bootstrap.servers' = 'hadoop001:9092',
  89. 'properties.group.id' = 'ods_group1',
  90. 'key.format' = 'json',
  91. 'value.format' = 'json'
  92. );
  93. --kafka中的 dwd层 订单表
  94. CREATE TABLE dwd_kafka_order (
  95. order_id BIGINT,
  96. store_id BIGINT,
  97. sales_amt double,
  98. PRIMARY KEY (`order_id`) NOT ENFORCED
  99. ) WITH (
  100. 'connector' = 'upsert-kafka',
  101. 'topic' = 'dwd_kafka_order',
  102. 'properties.bootstrap.servers' = 'hadoop001:9092',
  103. 'properties.group.id' = 'dwd_group1',
  104. 'key.format' = 'json',
  105. 'value.format' = 'json'
  106. );
  107. --kafka中的 dwd层 订单明细表
  108. CREATE TABLE dwd_kafka_order_detail (
  109. order_id BIGINT,
  110. store_id BIGINT,
  111. goods_id BIGINT,
  112. sales_amt double,
  113. PRIMARY KEY (order_id,store_id,goods_id) NOT ENFORCED
  114. ) WITH (
  115. 'connector' = 'upsert-kafka',
  116. 'topic' = 'dwd_kafka_order_detail',
  117. 'properties.bootstrap.servers' = 'hadoop001:9092',
  118. 'properties.group.id' = 'dwd_group1',
  119. 'key.format' = 'json',
  120. 'value.format' = 'json'
  121. );
  122. --mysql中的dwa 订单指标统计
  123. CREATE TABLE dwa_mysql_order_analysis (
  124. store_id BIGINT,
  125. store_name varchar(100),
  126. sales_goods_distinct_nums bigint,
  127. sales_amt double,
  128. order_nums bigint,
  129. PRIMARY KEY (store_id,store_name) NOT ENFORCED
  130. ) WITH (
  131. 'connector' = 'jdbc',
  132. 'url' = 'jdbc:mysql://hadoop002:3306/test',
  133. 'table-name' = 'dwa_mysql_order_analysis',
  134. 'driver' = 'com.mysql.cj.jdbc.Driver',
  135. 'username' = 'root',
  136. 'password' = 'root',
  137. 'sink.buffer-flush.max-rows' = '10'
  138. );

3.4 任务流配置

  1. --任务流配置
  2. insert into ods_kafka_order select * from src_mysql_order;
  3. insert into ods_kafka_order_detail select * from src_mysql_order_detail;
  4. insert into dwd_kafka_order select * from ods_kafka_order;
  5. insert into dwd_kafka_order_detail select * from ods_kafka_order_detail;
  6. insert into dwa_mysql_order_analysis
  7. select
  8. orde.store_id as store_id
  9. ,store.store_name as store_name
  10. ,count(distinct order_detail.goods_id) as sales_goods_distinct_nums
  11. ,sum(order_detail.sales_amt) as sales_amt
  12. ,count(distinct orde.order_id) as order_nums
  13. from dwd_kafka_order as orde
  14. join dwd_kafka_order_detail as order_detail
  15. on orde.order_id = order_detail.order_id
  16. join dim_store as store
  17. on orde.store_id = store.store_id
  18. group by
  19. orde.store_id
  20. ,store.store_name
  21. ;

查看flink管理界面,可以看到有5个正在运行的任务,实时流就配置好了

4. 测试

4.1 插入测试数据

  1. insert into src_mysql_order values
  2. (20221210001,10000,50),
  3. (20221210002,10000,20),
  4. (20221210003,10001,10);
  5. insert into src_mysql_order_detail values
  6. (20221210001,10000,100000,30),
  7. (20221210001,10000,100001,20),
  8. (20221210002,10000,100001,20),
  9. (20221210003,10001,100000,10);
  10. insert into dim_store values
  11. (10000, '宇唐总店'),
  12. (10001, '宇唐一店'),
  13. (10002, '宇唐二店'),
  14. (10003, '宇唐三店');
  15. insert into dim_goods values
  16. (100000, '天狮达特浓缩枣浆'),
  17. (100001, '蜜炼柚子茶');

4.2 查看结果表数据

4.3 新增测试数据

  1. insert into src_mysql_order values
  2. (20221210004,10002,50),
  3. (20221210005,10003,30);
  4. insert into src_mysql_order_detail values
  5. (20221210004,10002,100000,30),
  6. (20221210004,10002,100001,20),
  7. (20221210005,10003,100000,10),
  8. (20221210005,10003,100001,20);

 4.4 再次查看结果表数据

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/598283
推荐阅读
相关标签
  

闽ICP备14008679号