赞
踩
一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
本文介绍了Flink 通过异步IO的方式访问第三方数据源,介绍了2个示例,即通过缓存和redis的方式。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,本文还依赖redis的环境。
本专题分为以下几篇文章:
【flink番外篇】15、Flink维表实战之6种实现方式-初始化的静态数据
【flink番外篇】15、Flink维表实战之6种实现方式-维表来源于第三方数据源
【flink番外篇】15、Flink维表实战之6种实现方式-通过广播将维表数据传递到下游
【flink番外篇】15、Flink维表实战之6种实现方式-通过Temporal table实现维表数据join
【flink番外篇】15、Flink维表实战之6种实现方式-完整版(1)
【flink番外篇】15、Flink维表实战之6种实现方式-完整版(2)
本文的所有示例均依赖本部分的pom.xml内容,可能针对下文中的某些示例存在过多的引入,根据自己的情况进行删减。
<properties> <encoding>UTF-8</encoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <java.version>1.8</java.version> <scala.version>2.12</scala.version> <flink.version>1.17.0</flink.version> </properties> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.12</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-uber --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-uber</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-runtime --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc</artifactId> <version>3.1.0-1.17</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.38</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>32.0.1-jre</version> </dependency> <!-- flink连接器 --> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>${flink.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sql-connector-kafka</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-compress</artifactId> <version>1.24.0</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.2</version> </dependency> <dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.12</artifactId> <version>1.1.0</version> <exclusions> <exclusion> <artifactId>flink-streaming-java_2.12</artifactId> <groupId>org.apache.flink</groupId> </exclusion> <exclusion> <artifactId>flink-runtime_2.12</artifactId> <groupId>org.apache.flink</groupId> </exclusion> <exclusion> <artifactId>flink-core</artifactId> <groupId>org.apache.flink</groupId> </exclusion> <exclusion> <artifactId>flink-java</artifactId> <groupId>org.apache.flink</groupId> </exclusion> <exclusion> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java</artifactId> </exclusion> <exclusion> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.12</artifactId> </exclusion> <exclusion> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> </exclusion> <exclusion> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.12</artifactId> </exclusion> </exclusions> </dependency> <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>2.0.43</version> </dependency> </dependencies>
本示例仅仅为实现需求:将订单中uId与用户id进行关联,然后输出Tuple2<Order, String>。
// 事实表
@Data
@NoArgsConstructor
@AllArgsConstructor
static class Order {
private Integer id;
private Integer uId;
private Double total;
}
// 维表
@Data
@NoArgsConstructor
@AllArgsConstructor
static class User {
private Integer id;
private String name;
private Double balance;
private Integer age;
private String email;
}
事实流数据有几种,具体见示例部分,比如socket、redis、kafka等
维度表流有几种,具体见示例部分,比如静态数据、mysql、socket、kafka等。
如此,实现本文中的示例就需要准备好相应的环境,即mysql、redis、kafka、netcat等。
本文提供的所有示例均为验证通过的示例,测试的数据均在每个示例中,分为事实流、维度流和运行结果进行注释,在具体的示例中关于验证不再赘述。
这种方式是将维表数据存储在Redis、HBase、MySQL等外部存储中,事实流在关联维表数据的时候实时去外部存储中查询。
由于维度数据量不受内存限制,可以存储很大的数据量。同时维表数据来源于第三方数据源,读取速度受制于外部存储的读取速度。一般常见的做法该种方式较多。
如果频繁的访问第三方数据源进行join,会带来很大的开销,为降低该种情况的开销,一般使用cache来减轻访问压力,但该种方式存在数据同步的不一致或延迟情况。如果使用缓存,则会存在将数据存在内存中,也会增加系统开销。该种情况的实际应用以具体的业务场景而定。本示例使用的是guava Cache,缓存的实现有很多种方式,具体以自己的实际情况进行选择。
本示例的数据源仅仅以静态的数据进行展示,实际上可能数据来源于Hbase、mysql等。
import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalNotification; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /* * @Author: alanchan * @LastEditors: alanchan * @Description: */ public class TestJoinDimFromCacheDataDemo { // 维表 @Data @NoArgsConstructor @AllArgsConstructor static class User { private Integer id; private String name; private Double balance; private Integer age; private String email; } // 事实表 @Data @NoArgsConstructor @AllArgsConstructor static class Order { private Integer id; private Integer uId; private Double total; } public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // order 实时流 DataStream<Order> orderDs = env.socketTextStream("192.168.10.42", 9999) .map(o -> { String[] lines = o.split(","); return new Order(Integer.valueOf(lines[0]), Integer.valueOf(lines[1]), Double.valueOf(lines[2])); }); // user 维表 DataStream<Tuple2<Order, String>> result = orderDs.map(new RichMapFunction<Order, Tuple2<Order, String>>() { // 缓存接口这里是LoadingCache,LoadingCache在缓存项不存在时可以自动加载缓存 LoadingCache<Integer, User> userDim; @Override public void open(Configuration parameters) throws Exception { // 使用google LoadingCache来进行缓存 // CacheBuilder的构造函数是私有的,只能通过其静态方法newBuilder()来获得CacheBuilder的实例 userDim = CacheBuilder.newBuilder() // 设置并发级别为8,并发级别是指可以同时写缓存的线程数 .concurrencyLevel(8) // 最多缓存个数,超过了就根据最近最少使用算法来移除缓存 .maximumSize(1000) // 设置写缓存后10分钟过期 .expireAfterWrite(10, TimeUnit.MINUTES) // 设置缓存容器的初始容量为10 .initialCapacity(10) // 设置要统计缓存的命中率 .recordStats() // 指定移除通知 .removalListener(new RemovalListener<Integer, User>() { @Override public void onRemoval(RemovalNotification<Integer, User> removalNotification) { System.out.println(removalNotification.getKey() + "被移除了,值为:" + removalNotification.getValue()); } }) .build( // 指定加载缓存的逻辑 new CacheLoader<Integer, User>() { @Override public User load(Integer uId) throws Exception { return dataSource(uId); } }); System.out.println("userDim:" + userDim.get(1002)); } private User dataSource(Integer uId) { // 可以是任何数据源,本处仅仅示例 Map<Integer, User> users = new HashMap<>(); users.put(1001, new User(1001, "alan", 20d, 18, "alan.chan.chn@163.com")); users.put(1002, new User(1002, "alanchan", 22d, 20, "alan.chan.chn@163.com")); users.put(1003, new User(1003, "alanchanchn", 23d, 22, "alan.chan.chn@163.com")); users.put(1004, new User(1004, "alan_chan", 21d, 19, "alan.chan.chn@163.com")); users.put(1005, new User(1005, "alan_chan_chn", 23d, 21, "alan.chan.chn@163.com")); User user = null; if (users.containsKey(uId)) { user = users.get(uId); } return user; } @Override public Tuple2<Order, String> map(Order value) throws Exception { return new Tuple2(value, userDim.get(value.getUId()).getName()); } }); result.print(); // 输入数据 // 7,1003,111 // 8,1005,234 // 9,1002,875 // 控制台输出数据 // 5> (TestJoinDimFromCacheDataDemo.Order(id=7, uId=1003, total=111.0),alanchanchn) // 6> (TestJoinDimFromCacheDataDemo.Order(id=8, uId=1005, total=234.0),alan_chan_chn) // 7> (TestJoinDimFromCacheDataDemo.Order(id=9, uId=1002, total=875.0),alanchan) env.execute("TestJoinDimFromCacheDataDemo"); } }
Flink与外部存储系统进行读写操作的时候可以使用同步方式,也就是发送一个请求后等待外部系统响应,然后再发送第二个读写请求,这样的方式吞吐量比较低,可以用提高并行度的方式来提高吞吐量,但是并行度多了也就导致了进程数量多了,占用了大量的资源。
Flink中可以使用异步IO来读写外部系统,这要求外部系统客户端支持异步IO,比如redis、MongoDB等。
更多内容见文章:
55、Flink之用于外部数据访问的异步 I/O介绍及示例
package org.tablesql.join; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; import org.tablesql.join.TestJoinDimFromAsyncDataStreamDemo.Order; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; /* * @Author: alanchan * @LastEditors: alanchan * @Description: */ public class JoinAyncFunctionByRedis extends RichAsyncFunction<Order, Tuple2<Order, String>> { private JedisPoolConfig config = null; private static String ADDR = "192.168.10.41"; private static int PORT = 6379; private static int TIMEOUT = 10000; private JedisPool jedisPool = null; private Jedis jedis = null; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); config = new JedisPoolConfig(); jedisPool = new JedisPool(config, ADDR, PORT, TIMEOUT); jedis = jedisPool.getResource(); } @Override public void asyncInvoke(Order input, ResultFuture<Tuple2<Order, String>> resultFuture) throws Exception { // order 实时流中的单行数据 System.out.println("输入参数input----:" + input); // 发起一个异步请求,返回结果 CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { // 数据格式:1002,alanchan,19,25,alan.chan.chn@163.com String userLine = jedis.hget("AsyncReadUserById_Redis", input.getUId() + ""); String[] userTemp = userLine.split(","); // 返回 用户名 return userTemp[1]; } }).thenAccept((String dbResult) -> { // 设置请求完成时的回调,将结果返回 List list = new ArrayList<Tuple2<Order, String>>(); list.add(new Tuple2<>(input, dbResult)); resultFuture.complete(list); }); } // 连接超时的时候调用的方法 public void timeout(Order input, ResultFuture<Tuple2<Order, String>> resultFuture) throws Exception { List list = new ArrayList<Tuple2<Order, String>>(); // 数据源超时,不能获取到维表信息,置为" list.add(new Tuple2<>(input, "")); resultFuture.complete(list); } @Override public void close() throws Exception { super.close(); if (jedis.isConnected()) { jedis.close(); } } }
package org.tablesql.join; import java.util.concurrent.TimeUnit; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.AsyncDataStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /* * @Author: alanchan * @LastEditors: alanchan * @Description: */ public class TestJoinDimFromAsyncDataStreamDemo { // 维表 @Data @NoArgsConstructor @AllArgsConstructor static class User { private Integer id; private String name; private Double balance; private Integer age; private String email; } // 事实表 @Data @NoArgsConstructor @AllArgsConstructor static class Order { private Integer id; private Integer uId; private Double total; } public static void main(String[] args) throws Exception { testJoinAyncFunctionByRedis(); } static void testJoinAyncFunctionByRedis() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // order 实时流 DataStream<Order> orderDs = env.socketTextStream("192.168.10.42", 9999) .map(o -> { String[] lines = o.split(","); return new Order(Integer.valueOf(lines[0]), Integer.valueOf(lines[1]), Double.valueOf(lines[2])); }); // 保证顺序:异步返回的结果保证顺序,超时时间1秒,最大容量2,超出容量触发反压 DataStream<Tuple2<Order, String>> result = AsyncDataStream.orderedWait(orderDs, new JoinAyncFunctionByRedis(), 1000L, TimeUnit.MILLISECONDS, 2); result.print("result:"); // 允许乱序:异步返回的结果允许乱序,超时时间1秒,最大容量2,超出容量触发反压 DataStream<Tuple2<Order, String>> unorderedResult = AsyncDataStream .unorderedWait(orderDs, new JoinAyncFunctionByRedis(), 1000L, TimeUnit.MILLISECONDS, 2) .setParallelism(1); unorderedResult.print("unorderedResult"); // redis的操作命令及数据 // 127.0.0.1:6379> hset AsyncReadUserById_Redis 1001 '1001,alan,18,20,alan.chan.chn@163.com' // (integer) 1 // 127.0.0.1:6379> hset AsyncReadUserById_Redis 1002 '1002,alanchan,19,25,alan.chan.chn@163.com' // (integer) 1 // 127.0.0.1:6379> hset AsyncReadUserById_Redis 1003 '1003,alanchanchn,20,30,alan.chan.chn@163.com' // (integer) 1 // 127.0.0.1:6379> hset AsyncReadUserById_Redis 1004 '1004,alan_chan,27,20,alan.chan.chn@163.com' // (integer) 1 // 127.0.0.1:6379> hset AsyncReadUserById_Redis 1005 '1005,alan_chan_chn,36,10,alan.chan.chn@163.com' // (integer) 1 // 127.0.0.1:6379> hgetall AsyncReadUserById_Redis // 1) "1001" // 2) "1001,alan,18,20,alan.chan.chn@163.com" // 3) "1002" // 4) "1002,alanchan,19,25,alan.chan.chn@163.com" // 5) "1003" // 6) "1003,alanchanchn,20,30,alan.chan.chn@163.com" // 7) "1004" // 8) "1004,alan_chan,27,20,alan.chan.chn@163.com" // 9) "1005" // 10) "1005,alan_chan_chn,36,10,alan.chan.chn@163.com" // 输入数据 // 13,1002,811 // 14,1004,834 // 15,1005,975 // 控制台输出数据 // 输入参数input----:TestJoinDimFromAsyncDataStreamDemo.Order(id=13, uId=1002, total=811.0) // result::12> (TestJoinDimFromAsyncDataStreamDemo.Order(id=13, uId=1002, total=811.0),1002,alanchan,19,25,alan.chan.chn@163.com) // 输入参数input----:TestJoinDimFromAsyncDataStreamDemo.Order(id=13, uId=1002, total=811.0) // unorderedResult:9> (TestJoinDimFromAsyncDataStreamDemo.Order(id=13, uId=1002, total=811.0),1002,alanchan,19,25,alan.chan.chn@163.com) // result::5> (TestJoinDimFromAsyncDataStreamDemo.Order(id=14, uId=1004, total=834.0),alan_chan) // 输入参数input----:TestJoinDimFromAsyncDataStreamDemo.Order(id=14, uId=1004, total=834.0) // unorderedResult:2> (TestJoinDimFromAsyncDataStreamDemo.Order(id=14, uId=1004, total=834.0),alan_chan) // 输入参数input----:TestJoinDimFromAsyncDataStreamDemo.Order(id=15, uId=1005, total=975.0) // result::6> (TestJoinDimFromAsyncDataStreamDemo.Order(id=15, uId=1005, total=975.0),alan_chan_chn) // 输入参数input----:TestJoinDimFromAsyncDataStreamDemo.Order(id=15, uId=1005, total=975.0) // unorderedResult:3> (TestJoinDimFromAsyncDataStreamDemo.Order(id=15, uId=1005, total=975.0),alan_chan_chn) env.execute("TestJoinDimFromAsyncDataStreamDemo"); } }
以上,本文介绍了Flink 通过异步IO的方式访问第三方数据源,介绍了2个示例,即通过缓存和redis的方式。
本专题分为以下几篇文章:
【flink番外篇】15、Flink维表实战之6种实现方式-初始化的静态数据
【flink番外篇】15、Flink维表实战之6种实现方式-维表来源于第三方数据源
【flink番外篇】15、Flink维表实战之6种实现方式-通过广播将维表数据传递到下游
【flink番外篇】15、Flink维表实战之6种实现方式-通过Temporal table实现维表数据join
【flink番外篇】15、Flink维表实战之6种实现方式-完整版(1)
【flink番外篇】15、Flink维表实战之6种实现方式-完整版(2)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。