当前位置:   article > 正文

FLINK SQL 1.17.1读取KAFKA数据,实时计算后写入MYSQL_flink-sql-connector-kafka-1.17.1.jar

flink-sql-connector-kafka-1.17.1.jar

    为了后续搭建实时数据做准备,测试使用FLINK SQL实时读取KAFKA数据,通过实时计算后,写入MYSQL。

    原始数据为仿造的保单表和险种表的数据,在kafka中创建两张贴源层表:保单表和险种表,再建一张关联表和一张汇总表,然后将数据写入mysql。

1、环境搭建

    在官网下载flink和kafka的安装包,上传至服务器并解压,此处省略,需要注意的是由于需要连接kafka和mysql,需要在官网下载相应的jar包并上传至flink的lib目录:

  1. flink-connector-jdbc-3.1.0-1.17.jar
  2. flink-connector-kafka-1.17.1.jar
  3. flink-shaded-jackson-2.14.2-17.0.jar
  4. flink-sql-connector-kafka-1.17.1.jar
  5. flink-sql-csv-1.17.1.jar
  6. flink-sql-json-1.17.1.jar
  7. kafka-clients-3.5.1.jar
  8. mysql-connector-java-5.1.48.jar

2、数据准备

  1. ./kafka-topics.sh --bootstrap-server 10.9.135.16:9092  --create --topic cont
  2. ./kafka-console-producer.sh --bootstrap-server 10.9.135.16:9092 --topic cont
  3. {"contno": "1001", "prem":"20", "amnt": "100","riskcode": "001", "ts":"2023-11-01 20:38:05"}
  4. {"contno": "1002", "prem":"10", "amnt": "100","riskcode": "001", "ts":"2023-11-01 20:38:05"}
  5. {"contno": "1003", "prem":"10", "amnt": "100","riskcode": "001", "ts":"2023-11-01 20:38:05"}
  6. {"contno": "1004", "prem":"10", "amnt": "100","riskcode": "001", "ts":"2023-11-01 20:38:05"}
  7. {"contno": "1005", "prem":"10", "amnt": "100","riskcode": "001", "ts":"2023-11-01 20:38:05"}
  8. {"contno": "1006", "prem":"10", "amnt": "100","riskcode": "002", "ts":"2023-11-01 20:38:05"}
  9. {"contno": "1007", "prem":"10", "amnt": "100","riskcode": "002", "ts":"2023-11-01 20:38:05"}
  10. {"contno": "1008", "prem":"10", "amnt": "100","riskcode": "003", "ts":"2023-11-01 20:38:05"}
  11. {"contno": "1009", "prem":"10", "amnt": "100","riskcode": "004", "ts":"2023-11-01 20:38:05"}
  12. {"contno": "1010", "prem":"10", "amnt": "100","riskcode": "004", "ts":"2023-11-01 20:38:05"}
  13. {"contno": "1011", "prem":"10", "amnt": "100","riskcode": "005", "ts":"2023-11-01 20:38:05"}
  14. {"contno": "1012", "prem":"10", "amnt": "100","riskcode": "005", "ts":"2023-11-01 20:38:05"}
  15. {"contno": "1013", "prem":"10", "amnt": "100","riskcode": "005", "ts":"2023-11-01 20:38:05"}
  16. {"contno": "1014", "prem":"50", "amnt": "100","riskcode": "006", "ts":"2023-11-01 20:38:05"}
  17. {"contno": "1015", "prem":"60", "amnt": "100","riskcode": "006", "ts":"2023-11-01 20:38:05"}
  18. ./kafka-topics.sh --bootstrap-server 10.9.135.16:9092  --create --topic riskapp
  19. ./kafka-console-producer.sh --bootstrap-server 10.9.135.16:9092 --topic riskapp
  20. {"riskcode": "001", "riskname":"险种1", "ts":"2023-11-01 20:38:05"}
  21. {"riskcode": "002", "riskname":"险种2", "ts":"2023-11-01 20:38:05"}
  22. {"riskcode": "003", "riskname":"险种3", "ts":"2023-11-01 20:38:05"}
  23. {"riskcode": "004", "riskname":"险种4", "ts":"2023-11-01 20:38:05"}
  24. {"riskcode": "005", "riskname":"险种5", "ts":"2023-11-01 20:38:05"}
  25. {"riskcode": "006", "riskname":"险种6", "ts":"2023-11-01 20:38:05"}

3、FLINK SQL

  1. --贴源层保单表
  2. CREATE TABLE cont (
  3. contno string,
  4. prem int,
  5. amnt int,
  6. riskcode string,
  7. record_time TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
  8. ) WITH (
  9. 'connector' = 'kafka',
  10. 'topic' = 'cont',
  11. 'scan.startup.mode' = 'earliest-offset',
  12. 'properties.bootstrap.servers' = '10.9.135.16:9092',
  13. 'format' = 'json'
  14. );
  15. --贴源层险种表
  16. CREATE TABLE riskapp (
  17. riskcode string,
  18. riskname string,
  19. record_time TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
  20. ) WITH (
  21. 'connector' = 'kafka',
  22. 'topic' = 'riskapp',
  23. 'scan.startup.mode' = 'earliest-offset',
  24. 'properties.bootstrap.servers' = '10.9.135.16:9092',
  25. 'format' = 'json'
  26. );
  27. --保单和险种关联表
  28. CREATE TABLE kafka_cont_risk (
  29. contno string,
  30. prem int,
  31. amnt int,
  32. riskcode string,
  33. riskname string,
  34. primary key (contno) not enforced
  35. ) WITH (
  36. 'connector' = 'upsert-kafka',
  37. 'topic' = 'kafka_cont_risk',
  38. 'properties.bootstrap.servers' = '10.9.135.16:9092',
  39. 'key.format' = 'json',
  40. 'value.format' = 'json'
  41. );
  42. --汇总表
  43. CREATE TABLE risk_prem (
  44. riskname string,
  45. prem int,
  46. primary key (riskname) not enforced
  47. ) WITH (
  48. 'connector' = 'jdbc',
  49. 'url' = 'jdbc:mysql://10.9.134.14:3306/libra_report?useSSL=false',
  50. 'table-name' = 'risk_prem',
  51. 'driver' = 'com.mysql.jdbc.Driver',
  52. 'username' = 'root',
  53. 'password' = '123456'
  54. );
  55. --创建mysql表
  56. CREATE TABLE libra_report.risk_prem (
  57. riskname varchar(100),
  58. prem int,
  59. primary key (riskname)
  60. );
  61. --往保单和险种关联表中插入数据
  62. insert into kafka_cont_risk SELECT a.contno,a.prem,a.amnt,a.riskcode,b.riskname FROM (SELECT contno,prem,amnt,riskcode,ROW_NUMBER() OVER (PARTITION BY contno ORDER BY record_time desc) AS rownum FROM cont) a left join (SELECT riskcode,riskname,ROW_NUMBER() OVER (PARTITION BY riskcode ORDER BY record_time desc) AS rownum FROM riskapp)b on a.riskcode=b.riskcode WHERE a.rownum = 1 and b.rownum = 1;
  63. --往汇总表插数
  64. set 'table.exec.sink.not-null-enforcer'='DROP';
  65. insert into risk_prem select riskname,sum(prem) from kafka_cont_risk group by riskname;

4、查询结果

  1. --查询保单险种关联表
  2. select * from kafka_cont_risk;

  1. --查询汇总表
  2. select * from risk_prem;

  1. --查询mysql结果表数据
  2. select * from libra_report.risk_prem;

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/493562
推荐阅读
相关标签
  

闽ICP备14008679号