当前位置:   article > 正文

FlinkSQL详细系统的全面讲解及在企业生产的实践使用

flinksql

介绍:

        FLinkSQL是Flink目前流批一体式构建数仓的重要方向,是flink开发中最上层的API,将根据企业中实时统计订单量,浏览访问量的实现来介绍FlinkSQL在各个方面的开发实践,对于文章中提到的FlinkSQL相关的知识点可以在下文中的参考资料中找到对应的扩展,帮助进一步的全面了解FlinkSQL的使用。

背景:

        越来越多的公司不在仅仅满足于离线数仓T+1的模式来看到数据的处理统计,随着FlinkSQL的不断成熟,实时数据处理开发难度不断降低,实时数据展示也越来越成为将来的趋势,对于FLinkSQL的全面学习了解是十分重要且必要的,所以希望通过本篇文章可以帮助大家对FLinkSQL有一个较为清晰全面的了解。

技术点:

1.FlinkCDC

2.FlinkSQL

3.Flink对接clickhouse\kafka 

4.FlinkSql函数,join

5.SQL-client使用

......


目录

技术点:

一.FlinkSQL两种使用方式

1.代码jar包方式

(a).pom依赖

(b).创建数据源表

mysql-cdc监听mysql业务库数据,FlinkCDC的方式主要是监听业务库中订单相关的事实表数据,同时还通过flinkSQL中jdbc连接的方式在实时流中关联业务库中维度数据,

***通过jdbc链接mysql中数据:

***flinkSQL读取kafka中的数据源

 ***解析kafka中的嵌套数据

(c)订单访问 数据统计

***FlinkSQL创建中间临时表

***访客量的统计select distinct

(d)数据写入clickhouse两种方式

        如果对于Jdbc的方式,用idea开发的方式则稍微有点不同,需要在另外来写的方式实现

  (E).FlinkSQL join 

2.通过SQLclient方式执行flinkSQL

3.开源可视化SQL工具

二.FlinkSQL中使用watermark和时间属性定义

参考资料:

1.FlinkCDC

2.Flink内置SQL函数

3.Flink SQL CDC online! We have summed up 13 practical experiences in production

4.实时关联维表实现

5.Flink SQL客户端的使用

6.FLink SQL解析嵌套JSON数据

7.FLinkSQL中类似hive创建中间处理过程的临时表(create view)

8.FLinkSQL 不同类型join使用 中文参考

9.Flink SQL中参数配置设置相关的key-value

10.代码中使用JDBC方式写入clickhouse

11.Flink表和流之间的转换

三.补充强调:


一.FlinkSQL两种使用方式

1.代码jar包方式

(a).pom依赖

  1. <properties>
  2. <flink.version>1.13.5</flink.version>
  3. <java.version>1.8</java.version>
  4. <scala.binary.version>2.12</scala.binary.version>
  5. <slf4j.version>1.7.30</slf4j.version>
  6. </properties>
  7. <dependencies>
  8. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
  9. <dependency>
  10. <groupId>org.apache.flink</groupId>
  11. <artifactId>flink-streaming-scala_2.12</artifactId>
  12. <version>${flink.version}</version>
  13. </dependency>
  14. <!-- https://mvnrepository.com/artifact/com.ververica/flink-connector-mysql-cdc -->
  15. <dependency>
  16. <groupId>com.ververica</groupId>
  17. <artifactId>flink-connector-mysql-cdc</artifactId>
  18. <version>2.1.1</version>
  19. </dependency>
  20. <dependency>
  21. <groupId>ru.yandex.clickhouse</groupId>
  22. <artifactId>clickhouse-jdbc</artifactId>
  23. <version>0.2.4</version>
  24. <exclusions>
  25. <exclusion>
  26. <groupId>com.fasterxml.jackson.core</groupId>
  27. <artifactId>jackson-databind</artifactId>
  28. </exclusion>
  29. <exclusion>
  30. <groupId>com.fasterxml.jackson.core</groupId>
  31. <artifactId>jackson-core</artifactId>
  32. </exclusion>
  33. </exclusions>
  34. </dependency>
  35. <dependency>
  36. <groupId>org.apache.flink</groupId>
  37. <artifactId>flink-clients_${scala.binary.version}</artifactId>
  38. <version>${flink.version}</version>
  39. </dependency>
  40. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala -->
  41. <dependency>
  42. <groupId>org.apache.flink</groupId>
  43. <artifactId>flink-scala_2.12</artifactId>
  44. <version>${flink.version}</version>
  45. </dependency>
  46. <dependency>
  47. <groupId>org.apache.flink</groupId>
  48. <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
  49. <version>${flink.version}</version>
  50. </dependency>
  51. <dependency>
  52. <groupId>org.apache.flink</groupId>
  53. <artifactId>flink-table-common</artifactId>
  54. <version>${flink.version}</version>
  55. <scope>provided</scope>
  56. </dependency>
  57. <dependency>
  58. <groupId>org.apache.flink</groupId>
  59. <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
  60. <version>${flink.version}</version>
  61. </dependency>
  62. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-scala-bridge -->
  63. <dependency>
  64. <groupId>org.apache.flink</groupId>
  65. <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
  66. <version>1.13.5</version>
  67. </dependency>
  68. <dependency>
  69. <groupId>org.apache.kafka</groupId>
  70. <artifactId>kafka-clients</artifactId>
  71. <version>2.2.1</version>
  72. </dependency>
  73. <!-- flink连接kafka的依赖-->
  74. <dependency>
  75. <groupId>org.apache.flink</groupId>
  76. <artifactId>flink-connector-kafka_2.12</artifactId>
  77. <version>${flink.version}</version>
  78. </dependency>
  79. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-json -->
  80. <dependency>
  81. <groupId>org.apache.flink</groupId>
  82. <artifactId>flink-json</artifactId>
  83. <version>1.13.5</version>
  84. </dependency>
  85. <dependency>
  86. <groupId>com.alibaba</groupId>
  87. <artifactId>fastjson</artifactId>
  88. <version>1.2.62</version>
  89. </dependency>
  90. </dependencies>
  91. <build>
  92. <plugins>
  93. <plugin>
  94. <groupId>org.apache.maven.plugins</groupId>
  95. <artifactId>maven-assembly-plugin</artifactId>
  96. <version>3.0.0</version>
  97. <configuration>
  98. <descriptorRefs>
  99. <descriptorRef>jar-with-dependencies</descriptorRef>
  100. </descriptorRefs>
  101. </configuration>
  102. <executions>
  103. <execution>
  104. <id>make-assembly</id>
  105. <phase>package</phase>
  106. <goals>
  107. <goal>single</goal>
  108. </goals>
  109. </execution>
  110. </executions>
  111. </plugin>
  112. </plugins>
  113. </build>
  114. </project>

(b).创建数据源表

mysql-cdc监听mysql业务库数据,FlinkCDC的方式主要是监听业务库中订单相关的事实表数据,同时还通过flinkSQL中jdbc连接的方式在实时流中关联业务库中维度数据,

  1. //mysqlCDC
  2. create table order_source_ms(
  3. id BIGINT,
  4. deal_amt DOUBLE,
  5. shop_id STRING,
  6. customer_id String,
  7. city_id bigint,
  8. product_count double,
  9. order_at timestamp(3),
  10. last_updated_at timestamp,
  11. pay_at timestamp,
  12. refund_at timestamp,
  13. tenant_id STRING,order_category STRING,
  14. h as hour(last_updated_at),
  15. m as MINUTE(last_updated_at),
  16. dt as to_DATE(cast(last_updated_at as string)),
  17. PRIMARY KEY(id) NOT ENFORCED)
  18. with(
  19. 'connector' ='mysql-cdc',
  20. 'hostname' ='ip',
  21. 'port'='3306',
  22. 'username' = 'xxxx',
  23. 'password' = 'xxxxx',
  24. 'database-name'='xxx',
  25. 'scan.startup.mode'='latest-offset',
  26. 'table-name'='xxxx')

***通过jdbc链接mysql中数据:

        也就是可以通过jdbc的方式实现在实时流中关联维度数据(需要注意mysql监听的表具有主键,否则会报错)

-- Caused by: org.apache.flink.util.FlinkRuntimeException: Generate Splits for table test.delect_test error
  1. //mysqlJDBC
  2. create table shop_dim(
  3. id BIGINT
  4. ,shop_id String,
  5. PRIMARY KEY (id) NOT ENFORCED)
  6. with(
  7. 'connector' ='jdbc',
  8. 'url'='jdbc:mysql://ip:3306/product_db',
  9. 'table-name'='p_shop',
  10. 'username'='user',
  11. 'password'='password')

***flinkSQL读取kafka中的数据源

        读kafka中的埋点访问数据源,埋点数据在kafka读取是为了求取的流量中pv和uv

  1. create table kafka_log_source (distinct_id string,
  2. properties Row(tenantid String,shop_id String,shop_group_1_id String,shop_group_2_id String),
  3. event String,
  4. `time` bigint,
  5. dt as to_DATE(FROM_UNIXTIME(`time`/1000,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd'),
  6. m as SUBSTRING(FROM_UNIXTIME(`time`/1000,'yyyy-MM-dd HH:mm:ss'),0,16),
  7. h as HOUR(to_timeStamp(FROM_UNIXTIME(`time`/1000,'yyyy-MM-dd HH:mm:ss'))),
  8. rowtime as TO_TIMESTAMP(FROM_UNIXTIME(`time`/1000,'yyyy-MM-dd HH:mm:ss')),
  9. watermark for rowtime as rowtime - interval '5' second
  10. )with ('connector' = 'kafka',
  11. 'topic' = 'bigDataSensorAnalyse',
  12. 'properties.bootstrap.servers' = '10.150.20.12:9092',
  13. 'properties.group.id' = 'realtime',
  14. 'format' = 'json',
  15. 'scan.startup.mode' = 'latest-offset')

 ***解析kafka中的嵌套数据

properties Row(tenantid String,shop_id String,shop_group_1_id String,shop_group_2_id String),

这一行的数据是为了解析kafka中嵌套的JSON数据,类似下面的数据

  1. "properties":{
  2. "$os":"Android",
  3. "$network_type":"WIFI",
  4. "$screen_height":792,
  5. "$referrer":"直接打开",
  6. "$referrer_title":"",
  7. "shop_group_2_id":"2",
  8. "$app_id":"wx8d678aafacbae019",
  9. "tenantid":"1000000007",
  10. "$url":"pages/dynamicTabbarPages/tab2/index",
  11. "f_page_id":"pages/category/index",
  12. "$os_version":"10",
  13. "$is_first_day":false,
  14. "$model":"NOH-AN01",
  15. "$screen_width":384,
  16. "$mp_client_basic_library_version":"2.21.3",
  17. "$brand":"HUAWEI",
  18. "f_page_name":"分类",
  19. "$lib":"MiniProgram",
  20. "shop_id":"500000"
  21. }

可以通过下面的方法获取嵌套数据中的值:

properties.shop_id

(c)订单访问 数据统计

***FlinkSQL创建中间临时表

  1. //通过create view的方式创建临时视图表,相当于hive数仓业务统计比较复杂的情况下用SQL创建临时表的方式来处理,而在flinksql流式处理中就是通过下面的语法
  2. create view deal_static_tmp as
  3. select
  4. shop_id,tenant_id,
  5. 0 as uv,
  6. 0 as pv,
  7. count (distinct customer_id) as pay_pcnt,
  8. sum(if (order_category ='sale', deal_amt,0)) as pay_amt,
  9. sum(if (order_category ='sale', product_count,0)) as pay_qty,
  10. sum(if (order_category ='sale', 1,0)) as pay_cnt,
  11. sum(if (order_category ='reverse', deal_amt,0)) as refund_amt,
  12. sum(if (order_category ='reverse', 1,0)) as refund_cnt,
  13. 0 as recruit_qty,
  14. dt
  15. from order_source_ms
  16. where pay_at is not null or refund_at is not null
  17. group by shop_id,tenant_id,city_id,dt

***访客量的统计select distinct

  1. //通过select distinct的方式来统计uv也就是访问网页的去重人数
  2. create view view_static_tmp as
  3. select
  4. properties.shop_id as shop_id,properties.tenantid as tenant_id,
  5. count (distinct distinct_id) as uv,
  6. sum(1) as pv,
  7. 0 as pay_pcnt,
  8. 0 as pay_amt,
  9. 0 as pay_qty,
  10. 0 as pay_cnt,
  11. 0 as refund_amt,
  12. 0 as refund_cnt,
  13. 0 as recruit_qty,
  14. dt
  15. from kafka_log_source
  16. where (event = 'pageShow' or event = 'detailPageView') and properties.tenantid is not null
  17. group by properties.shop_id ,properties.tenantid,dt

(d)数据写入clickhouse两种方式

        最终数据的展示是在clickhouse中进行的,clickhouse具有更快的计算效率,对于实时数仓来说是一个比较合适的选择

***clikchouse方式建表

  1. //clickhouse这种方式要使用需要添加额外的jar包依赖,因为这个maven仓库中无法下载,
  2. //所以如果用开发工具需要在项目下手动添加jar包依赖,
  3. //如果使用下面的SQLclient的方式需要在lib目录下添加相关的jar
  4. create table test(
  5. tenant_id String,
  6. amt double,
  7. write_time timestamp,
  8. primary key (tenant_id) not enforced
  9. )with(
  10. 'connector'='clickhouse',
  11. 'url'='clickhouse://ip:8123',
  12. 'database-name'='test',
  13. 'username'='username',
  14. 'sink.batch-size' = '1',
  15. 'sink.flush-interval' = '10',
  16. 'sink.max-retries' = '3',
  17. 'password'='password',
  18. 'table-name'='test'
  19. )

***jdbc方式建表连接clickhouse表

  1. create table test(
  2. tenant_id String,
  3. shop_id String,
  4. pv bigint,
  5. uv bigint,
  6. pay_pcnt bigint,
  7. pay_amt double,
  8. pay_qty double,
  9. pay_cnt bigint,
  10. refund_amt double,
  11. refund_cnt bigint,
  12. recruit_qty bigint,
  13. write_time timestamp,
  14. h int,
  15. dt Date,
  16. primary key (tenant_id) not enforced
  17. )with(
  18. 'connector'='jdbc',
  19. 'url'='jdbc:clickhouse://ip:8123/test',
  20. 'username'='username',
  21. 'driver'='ru.yandex.clickhouse.ClickHouseDriver',
  22. 'password'='password',
  23. 'table-name'='test'
  24. )

        注:clickhouse相较于jdbc的方式只能用于sink方式先表里面写入数据,而不能进行数据查询。

        对于两种方式将数据写入对应的clickhouse表来说,通过SQLclient的方式都是一样的通过下面的方式

insert into select * from *

        如果对于Jdbc的方式,用idea开发的方式则稍微有点不同,需要在另外来写的方式实现

  1. val viewDeal: DataStream[(Boolean, DataStatic)] = tableEnv.toRetractStream[DataStatic](viewDealTable).filter(_._1)
  2. // viewDeal.print()
  3. val viewDealStream: DataStream[DataStatic] = viewDeal.map(_._2)
  4. viewDealStream.filter(_.pv!=0).addSink(
  5. JdbcSink.sink(
  6. //插入数据的SQL语句
  7. "insert into deal_view_sh(tenant_id,shop_id,pv,uv,pay_pcnt,pay_amt,pay_qty,pay_cnt,refund_amt,refund_cnt,recruit_qty,write_time,h,dt)values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
  8. new JdbcStatementBuilder[DataStatic] {
  9. override def accept(ps: PreparedStatement, t: DataStatic): Unit = {
  10. ps.setString(1,t.tenant_id)
  11. ps.setString(2,t.shop_id)
  12. ps.setLong(3,t.pv)
  13. ps.setLong(4,t.uv)
  14. ps.setLong(5,t.pay_pcnt)
  15. ps.setDouble(6,t.pay_amt)
  16. ps.setDouble(7,t.pay_qty)
  17. ps.setLong(8,t.pay_cnt)
  18. ps.setDouble(9,t.refund_amt)
  19. ps.setLong(10,t.refund_cnt)
  20. ps.setLong(11,t.recruit_qty)
  21. ps.setString(12,t.write_time)
  22. ps.setLong(13,t.h)
  23. ps.setDate(14,t.dt)
  24. }
  25. },
  26. //写入的参数配置
  27. JdbcExecutionOptions.builder()
  28. .withBatchSize(5)
  29. .withBatchIntervalMs(100)
  30. .build(),
  31. //连接参数配置
  32. new JdbcConnectionOptionsBuilder()
  33. .withUrl("jdbc:clickhouse://ip:8123/real_sh")
  34. .withUsername("ck")
  35. .withPassword("password")
  36. .withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
  37. .build()
  38. )
  39. )

  (E).FlinkSQL join 

            对于FLinkjoin中的使用场景来说有三种,regular join ,interval join ,temproal table join。

            对于Flink SQL 三种 join 方式的使用,一般对于流式 join 来说,对于双流join 的场景,推荐使用 interval join,对于流和维度表 join 的场景推荐使用 temproal table join。

2.通过SQLclient方式执行flinkSQL

通过脚本命令可以直接启动一个SQL窗口客户端,然后在执行上面写好的SQL语句就可以了,前提是需要启动一个flink的session的集群,也可以提交到k8s启动的flinkSession集群中,具体的使用是比较简单的,可以通过下面的参考资料的官网连接去查看具体的使用方式。

./bin/sql-client.sh embedded

3.开源可视化SQL工具

这个工具相对来说还不是特别的完善,但是具有SQL检验和任务管理的功能,可以配合使用但是不利于开发调试,不做过多的介绍,该兴趣的小伙伴可以自行安装使用。

开源工具GitHub地址

二.FlinkSQL中使用watermark和时间属性定义

参考资料:

1.FlinkCDC

2.Flink内置SQL函数

3.Flink SQL CDC online! We have summed up 13 practical experiences in production

4.实时关联维表实现

5.Flink SQL客户端的使用

6.FLink SQL解析嵌套JSON数据

7.FLinkSQL中类似hive创建中间处理过程的临时表(create view)

8.FLinkSQL 不同类型join使用 中文参考

9.Flink SQL中参数配置设置相关的key-value

10.代码中使用JDBC方式写入clickhouse

11.Flink表和流之间的转换

三.补充强调:

        这里要补充的是,注意在使用Flinkcdc,或者JDBC,创建流中的source表或者sink表的时候要注意数据类型的对应,尤其是金额类的数据类型,我在展示中的是使用double类型来创建的表,但是这样做在后来出现的问题就是金额统计时候出现精度损失,导致结果少写入clickhouse库后少了0.01,这里强调一下对于金额类的数据要使用Decimal这个数据类型来创建对应的流表;

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/396710?site
推荐阅读
相关标签
  

闽ICP备14008679号