赞
踩
介绍:
FLinkSQL是Flink目前流批一体式构建数仓的重要方向,是flink开发中最上层的API,将根据企业中实时统计订单量,浏览访问量的实现来介绍FlinkSQL在各个方面的开发实践,对于文章中提到的FlinkSQL相关的知识点可以在下文中的参考资料中找到对应的扩展,帮助进一步的全面了解FlinkSQL的使用。
背景:
越来越多的公司不在仅仅满足于离线数仓T+1的模式来看到数据的处理统计,随着FlinkSQL的不断成熟,实时数据处理开发难度不断降低,实时数据展示也越来越成为将来的趋势,对于FLinkSQL的全面学习了解是十分重要且必要的,所以希望通过本篇文章可以帮助大家对FLinkSQL有一个较为清晰全面的了解。
1.FlinkCDC
2.FlinkSQL
3.Flink对接clickhouse\kafka
4.FlinkSql函数,join
5.SQL-client使用
......
目录
mysql-cdc监听mysql业务库数据,FlinkCDC的方式主要是监听业务库中订单相关的事实表数据,同时还通过flinkSQL中jdbc连接的方式在实时流中关联业务库中维度数据,
如果对于Jdbc的方式,用idea开发的方式则稍微有点不同,需要在另外来写的方式实现
3.Flink SQL CDC online! We have summed up 13 practical experiences in production
7.FLinkSQL中类似hive创建中间处理过程的临时表(create view)
9.Flink SQL中参数配置设置相关的key-value
三.补充强调:
-
- <properties>
- <flink.version>1.13.5</flink.version>
- <java.version>1.8</java.version>
- <scala.binary.version>2.12</scala.binary.version>
- <slf4j.version>1.7.30</slf4j.version>
- </properties>
- <dependencies>
- <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-scala_2.12</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- <!-- https://mvnrepository.com/artifact/com.ververica/flink-connector-mysql-cdc -->
- <dependency>
- <groupId>com.ververica</groupId>
- <artifactId>flink-connector-mysql-cdc</artifactId>
- <version>2.1.1</version>
- </dependency>
-
-
- <dependency>
- <groupId>ru.yandex.clickhouse</groupId>
- <artifactId>clickhouse-jdbc</artifactId>
- <version>0.2.4</version>
- <exclusions>
- <exclusion>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-scala_2.12</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-common</artifactId>
- <version>${flink.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-scala-bridge -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
- <version>1.13.5</version>
- </dependency>
-
-
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>2.2.1</version>
- </dependency>
-
- <!-- flink连接kafka的依赖-->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka_2.12</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-json -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-json</artifactId>
- <version>1.13.5</version>
- </dependency>
-
-
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>1.2.62</version>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-assembly-plugin</artifactId>
- <version>3.0.0</version>
- <configuration>
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </project>

- //mysqlCDC
-
- create table order_source_ms(
- id BIGINT,
- deal_amt DOUBLE,
- shop_id STRING,
- customer_id String,
- city_id bigint,
- product_count double,
- order_at timestamp(3),
- last_updated_at timestamp,
- pay_at timestamp,
- refund_at timestamp,
- tenant_id STRING,order_category STRING,
- h as hour(last_updated_at),
- m as MINUTE(last_updated_at),
- dt as to_DATE(cast(last_updated_at as string)),
- PRIMARY KEY(id) NOT ENFORCED)
- with(
- 'connector' ='mysql-cdc',
- 'hostname' ='ip',
- 'port'='3306',
- 'username' = 'xxxx',
- 'password' = 'xxxxx',
- 'database-name'='xxx',
- 'scan.startup.mode'='latest-offset',
- 'table-name'='xxxx')

也就是可以通过jdbc的方式实现在实时流中关联维度数据(需要注意mysql监听的表具有主键,否则会报错)
-- Caused by: org.apache.flink.util.FlinkRuntimeException: Generate Splits for table test.delect_test error
- //mysqlJDBC
-
- create table shop_dim(
- id BIGINT
- ,shop_id String,
- PRIMARY KEY (id) NOT ENFORCED)
- with(
- 'connector' ='jdbc',
- 'url'='jdbc:mysql://ip:3306/product_db',
- 'table-name'='p_shop',
- 'username'='user',
- 'password'='password')
读kafka中的埋点访问数据源,埋点数据在kafka读取是为了求取的流量中pv和uv
- create table kafka_log_source (distinct_id string,
- properties Row(tenantid String,shop_id String,shop_group_1_id String,shop_group_2_id String),
- event String,
- `time` bigint,
- dt as to_DATE(FROM_UNIXTIME(`time`/1000,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd'),
- m as SUBSTRING(FROM_UNIXTIME(`time`/1000,'yyyy-MM-dd HH:mm:ss'),0,16),
- h as HOUR(to_timeStamp(FROM_UNIXTIME(`time`/1000,'yyyy-MM-dd HH:mm:ss'))),
- rowtime as TO_TIMESTAMP(FROM_UNIXTIME(`time`/1000,'yyyy-MM-dd HH:mm:ss')),
- watermark for rowtime as rowtime - interval '5' second
- )with ('connector' = 'kafka',
- 'topic' = 'bigDataSensorAnalyse',
- 'properties.bootstrap.servers' = '10.150.20.12:9092',
- 'properties.group.id' = 'realtime',
- 'format' = 'json',
- 'scan.startup.mode' = 'latest-offset')
properties Row(tenantid String,shop_id String,shop_group_1_id String,shop_group_2_id String),
这一行的数据是为了解析kafka中嵌套的JSON数据,类似下面的数据
- "properties":{
- "$os":"Android",
- "$network_type":"WIFI",
- "$screen_height":792,
- "$referrer":"直接打开",
- "$referrer_title":"",
- "shop_group_2_id":"2",
- "$app_id":"wx8d678aafacbae019",
- "tenantid":"1000000007",
- "$url":"pages/dynamicTabbarPages/tab2/index",
- "f_page_id":"pages/category/index",
- "$os_version":"10",
- "$is_first_day":false,
- "$model":"NOH-AN01",
- "$screen_width":384,
- "$mp_client_basic_library_version":"2.21.3",
- "$brand":"HUAWEI",
- "f_page_name":"分类",
- "$lib":"MiniProgram",
- "shop_id":"500000"
- }

可以通过下面的方法获取嵌套数据中的值:
properties.shop_id
- //通过create view的方式创建临时视图表,相当于hive数仓业务统计比较复杂的情况下用SQL创建临时表的方式来处理,而在flinksql流式处理中就是通过下面的语法
-
- create view deal_static_tmp as
- select
- shop_id,tenant_id,
- 0 as uv,
- 0 as pv,
- count (distinct customer_id) as pay_pcnt,
- sum(if (order_category ='sale', deal_amt,0)) as pay_amt,
- sum(if (order_category ='sale', product_count,0)) as pay_qty,
- sum(if (order_category ='sale', 1,0)) as pay_cnt,
- sum(if (order_category ='reverse', deal_amt,0)) as refund_amt,
- sum(if (order_category ='reverse', 1,0)) as refund_cnt,
- 0 as recruit_qty,
- dt
- from order_source_ms
- where pay_at is not null or refund_at is not null
- group by shop_id,tenant_id,city_id,dt

- //通过select distinct的方式来统计uv也就是访问网页的去重人数
-
- create view view_static_tmp as
- select
- properties.shop_id as shop_id,properties.tenantid as tenant_id,
- count (distinct distinct_id) as uv,
- sum(1) as pv,
- 0 as pay_pcnt,
- 0 as pay_amt,
- 0 as pay_qty,
- 0 as pay_cnt,
- 0 as refund_amt,
- 0 as refund_cnt,
- 0 as recruit_qty,
- dt
- from kafka_log_source
- where (event = 'pageShow' or event = 'detailPageView') and properties.tenantid is not null
- group by properties.shop_id ,properties.tenantid,dt

最终数据的展示是在clickhouse中进行的,clickhouse具有更快的计算效率,对于实时数仓来说是一个比较合适的选择
***clikchouse方式建表
- //clickhouse这种方式要使用需要添加额外的jar包依赖,因为这个maven仓库中无法下载,
- //所以如果用开发工具需要在项目下手动添加jar包依赖,
- //如果使用下面的SQLclient的方式需要在lib目录下添加相关的jar
-
- create table test(
- tenant_id String,
- amt double,
- write_time timestamp,
- primary key (tenant_id) not enforced
- )with(
- 'connector'='clickhouse',
- 'url'='clickhouse://ip:8123',
- 'database-name'='test',
- 'username'='username',
- 'sink.batch-size' = '1',
- 'sink.flush-interval' = '10',
- 'sink.max-retries' = '3',
- 'password'='password',
- 'table-name'='test'
- )

***jdbc方式建表连接clickhouse表
- create table test(
- tenant_id String,
- shop_id String,
- pv bigint,
- uv bigint,
- pay_pcnt bigint,
- pay_amt double,
- pay_qty double,
- pay_cnt bigint,
- refund_amt double,
- refund_cnt bigint,
- recruit_qty bigint,
- write_time timestamp,
- h int,
- dt Date,
- primary key (tenant_id) not enforced
- )with(
- 'connector'='jdbc',
- 'url'='jdbc:clickhouse://ip:8123/test',
- 'username'='username',
- 'driver'='ru.yandex.clickhouse.ClickHouseDriver',
- 'password'='password',
- 'table-name'='test'
- )

注:clickhouse相较于jdbc的方式只能用于sink方式先表里面写入数据,而不能进行数据查询。
对于两种方式将数据写入对应的clickhouse表来说,通过SQLclient的方式都是一样的通过下面的方式
insert into select * from *
- val viewDeal: DataStream[(Boolean, DataStatic)] = tableEnv.toRetractStream[DataStatic](viewDealTable).filter(_._1)
- // viewDeal.print()
- val viewDealStream: DataStream[DataStatic] = viewDeal.map(_._2)
-
- viewDealStream.filter(_.pv!=0).addSink(
- JdbcSink.sink(
- //插入数据的SQL语句
- "insert into deal_view_sh(tenant_id,shop_id,pv,uv,pay_pcnt,pay_amt,pay_qty,pay_cnt,refund_amt,refund_cnt,recruit_qty,write_time,h,dt)values(?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
- new JdbcStatementBuilder[DataStatic] {
- override def accept(ps: PreparedStatement, t: DataStatic): Unit = {
- ps.setString(1,t.tenant_id)
- ps.setString(2,t.shop_id)
- ps.setLong(3,t.pv)
- ps.setLong(4,t.uv)
- ps.setLong(5,t.pay_pcnt)
- ps.setDouble(6,t.pay_amt)
- ps.setDouble(7,t.pay_qty)
- ps.setLong(8,t.pay_cnt)
- ps.setDouble(9,t.refund_amt)
- ps.setLong(10,t.refund_cnt)
- ps.setLong(11,t.recruit_qty)
- ps.setString(12,t.write_time)
- ps.setLong(13,t.h)
- ps.setDate(14,t.dt)
- }
- },
- //写入的参数配置
- JdbcExecutionOptions.builder()
- .withBatchSize(5)
- .withBatchIntervalMs(100)
- .build(),
- //连接参数配置
- new JdbcConnectionOptionsBuilder()
- .withUrl("jdbc:clickhouse://ip:8123/real_sh")
- .withUsername("ck")
- .withPassword("password")
- .withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
- .build()
- )
- )

对于FLinkjoin中的使用场景来说有三种,regular join ,interval join ,temproal table join。
对于Flink SQL 三种 join 方式的使用,一般对于流式 join 来说,对于双流join 的场景,推荐使用 interval join,对于流和维度表 join 的场景推荐使用 temproal table join。
通过脚本命令可以直接启动一个SQL窗口客户端,然后在执行上面写好的SQL语句就可以了,前提是需要启动一个flink的session的集群,也可以提交到k8s启动的flinkSession集群中,具体的使用是比较简单的,可以通过下面的参考资料的官网连接去查看具体的使用方式。
./bin/sql-client.sh embedded
这个工具相对来说还不是特别的完善,但是具有SQL检验和任务管理的功能,可以配合使用但是不利于开发调试,不做过多的介绍,该兴趣的小伙伴可以自行安装使用。
这里要补充的是,注意在使用Flinkcdc,或者JDBC,创建流中的source表或者sink表的时候要注意数据类型的对应,尤其是金额类的数据类型,我在展示中的是使用double类型来创建的表,但是这样做在后来出现的问题就是金额统计时候出现精度损失,导致结果少写入clickhouse库后少了0.01,这里强调一下对于金额类的数据要使用Decimal这个数据类型来创建对应的流表;
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。