赞
踩
为了后续搭建实时数据做准备,测试使用FLINK SQL实时读取KAFKA数据,通过实时计算后,写入MYSQL。
原始数据为仿造的保单表和险种表的数据,在kafka中创建两张贴源层表:保单表和险种表,再建一张关联表和一张汇总表,然后将数据写入mysql。
在官网下载flink和kafka的安装包,上传至服务器并解压,此处省略,需要注意的是由于需要连接kafka和mysql,需要在官网下载相应的jar包并上传至flink的lib目录:
- flink-connector-jdbc-3.1.0-1.17.jar
-
- flink-connector-kafka-1.17.1.jar
-
- flink-shaded-jackson-2.14.2-17.0.jar
-
- flink-sql-connector-kafka-1.17.1.jar
-
- flink-sql-csv-1.17.1.jar
-
- flink-sql-json-1.17.1.jar
-
- kafka-clients-3.5.1.jar
-
- mysql-connector-java-5.1.48.jar
- ./kafka-topics.sh --bootstrap-server 10.9.135.16:9092 --create --topic cont
- ./kafka-console-producer.sh --bootstrap-server 10.9.135.16:9092 --topic cont
- {"contno": "1001", "prem":"20", "amnt": "100","riskcode": "001", "ts":"2023-11-01 20:38:05"}
- {"contno": "1002", "prem":"10", "amnt": "100","riskcode": "001", "ts":"2023-11-01 20:38:05"}
- {"contno": "1003", "prem":"10", "amnt": "100","riskcode": "001", "ts":"2023-11-01 20:38:05"}
- {"contno": "1004", "prem":"10", "amnt": "100","riskcode": "001", "ts":"2023-11-01 20:38:05"}
- {"contno": "1005", "prem":"10", "amnt": "100","riskcode": "001", "ts":"2023-11-01 20:38:05"}
- {"contno": "1006", "prem":"10", "amnt": "100","riskcode": "002", "ts":"2023-11-01 20:38:05"}
- {"contno": "1007", "prem":"10", "amnt": "100","riskcode": "002", "ts":"2023-11-01 20:38:05"}
- {"contno": "1008", "prem":"10", "amnt": "100","riskcode": "003", "ts":"2023-11-01 20:38:05"}
- {"contno": "1009", "prem":"10", "amnt": "100","riskcode": "004", "ts":"2023-11-01 20:38:05"}
- {"contno": "1010", "prem":"10", "amnt": "100","riskcode": "004", "ts":"2023-11-01 20:38:05"}
- {"contno": "1011", "prem":"10", "amnt": "100","riskcode": "005", "ts":"2023-11-01 20:38:05"}
- {"contno": "1012", "prem":"10", "amnt": "100","riskcode": "005", "ts":"2023-11-01 20:38:05"}
- {"contno": "1013", "prem":"10", "amnt": "100","riskcode": "005", "ts":"2023-11-01 20:38:05"}
- {"contno": "1014", "prem":"50", "amnt": "100","riskcode": "006", "ts":"2023-11-01 20:38:05"}
- {"contno": "1015", "prem":"60", "amnt": "100","riskcode": "006", "ts":"2023-11-01 20:38:05"}
-
- ./kafka-topics.sh --bootstrap-server 10.9.135.16:9092 --create --topic riskapp
- ./kafka-console-producer.sh --bootstrap-server 10.9.135.16:9092 --topic riskapp
- {"riskcode": "001", "riskname":"险种1", "ts":"2023-11-01 20:38:05"}
- {"riskcode": "002", "riskname":"险种2", "ts":"2023-11-01 20:38:05"}
- {"riskcode": "003", "riskname":"险种3", "ts":"2023-11-01 20:38:05"}
- {"riskcode": "004", "riskname":"险种4", "ts":"2023-11-01 20:38:05"}
- {"riskcode": "005", "riskname":"险种5", "ts":"2023-11-01 20:38:05"}
- {"riskcode": "006", "riskname":"险种6", "ts":"2023-11-01 20:38:05"}
- --贴源层保单表
- CREATE TABLE cont (
- contno string,
- prem int,
- amnt int,
- riskcode string,
- record_time TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
- ) WITH (
- 'connector' = 'kafka',
- 'topic' = 'cont',
- 'scan.startup.mode' = 'earliest-offset',
- 'properties.bootstrap.servers' = '10.9.135.16:9092',
- 'format' = 'json'
- );
-
- --贴源层险种表
- CREATE TABLE riskapp (
- riskcode string,
- riskname string,
- record_time TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
- ) WITH (
- 'connector' = 'kafka',
- 'topic' = 'riskapp',
- 'scan.startup.mode' = 'earliest-offset',
- 'properties.bootstrap.servers' = '10.9.135.16:9092',
- 'format' = 'json'
- );
-
- --保单和险种关联表
- CREATE TABLE kafka_cont_risk (
- contno string,
- prem int,
- amnt int,
- riskcode string,
- riskname string,
- primary key (contno) not enforced
- ) WITH (
- 'connector' = 'upsert-kafka',
- 'topic' = 'kafka_cont_risk',
- 'properties.bootstrap.servers' = '10.9.135.16:9092',
- 'key.format' = 'json',
- 'value.format' = 'json'
- );
-
- --汇总表
- CREATE TABLE risk_prem (
- riskname string,
- prem int,
- primary key (riskname) not enforced
- ) WITH (
- 'connector' = 'jdbc',
- 'url' = 'jdbc:mysql://10.9.134.14:3306/libra_report?useSSL=false',
- 'table-name' = 'risk_prem',
- 'driver' = 'com.mysql.jdbc.Driver',
- 'username' = 'root',
- 'password' = '123456'
- );
-
- --创建mysql表
- CREATE TABLE libra_report.risk_prem (
- riskname varchar(100),
- prem int,
- primary key (riskname)
- );
-
- --往保单和险种关联表中插入数据
- insert into kafka_cont_risk SELECT a.contno,a.prem,a.amnt,a.riskcode,b.riskname FROM (SELECT contno,prem,amnt,riskcode,ROW_NUMBER() OVER (PARTITION BY contno ORDER BY record_time desc) AS rownum FROM cont) a left join (SELECT riskcode,riskname,ROW_NUMBER() OVER (PARTITION BY riskcode ORDER BY record_time desc) AS rownum FROM riskapp)b on a.riskcode=b.riskcode WHERE a.rownum = 1 and b.rownum = 1;
-
- --往汇总表插数
- set 'table.exec.sink.not-null-enforcer'='DROP';
- insert into risk_prem select riskname,sum(prem) from kafka_cont_risk group by riskname;
- --查询保单险种关联表
- select * from kafka_cont_risk;
- --查询汇总表
- select * from risk_prem;
- --查询mysql结果表数据
- select * from libra_report.risk_prem;
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。