当前位置:   article > 正文

【flink番外篇】15、Flink维表实战之6种实现方式-维表来源于第三方数据源_flink 维表

flink 维表

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引



本文介绍了Flink 通过异步IO的方式访问第三方数据源,介绍了2个示例,即通过缓存和redis的方式。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,本文还依赖redis的环境。

本专题分为以下几篇文章:
【flink番外篇】15、Flink维表实战之6种实现方式-初始化的静态数据
【flink番外篇】15、Flink维表实战之6种实现方式-维表来源于第三方数据源
【flink番外篇】15、Flink维表实战之6种实现方式-通过广播将维表数据传递到下游
【flink番外篇】15、Flink维表实战之6种实现方式-通过Temporal table实现维表数据join
【flink番外篇】15、Flink维表实战之6种实现方式-完整版(1)
【flink番外篇】15、Flink维表实战之6种实现方式-完整版(2)

一、maven依赖及数据结构

1、maven依赖

本文的所有示例均依赖本部分的pom.xml内容,可能针对下文中的某些示例存在过多的引入,根据自己的情况进行删减。

<properties>
	<encoding>UTF-8</encoding>
	<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	<maven.compiler.source>1.8</maven.compiler.source>
	<maven.compiler.target>1.8</maven.compiler.target>
	<java.version>1.8</java.version>
	<scala.version>2.12</scala.version>
	<flink.version>1.17.0</flink.version>
</properties>

<dependencies>
	<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-clients</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-java</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-table-common</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-streaming-java</artifactId>
		<version>${flink.version}</version>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-table-api-java-bridge</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-csv</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-json</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-table-planner_2.12</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-uber -->
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-table-api-java-uber</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-runtime -->
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-table-runtime</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-connector-jdbc</artifactId>
		<version>3.1.0-1.17</version>
	</dependency>
	<dependency>
		<groupId>mysql</groupId>
		<artifactId>mysql-connector-java</artifactId>
		<version>5.1.38</version>
	</dependency>
	<dependency>
		<groupId>com.google.guava</groupId>
		<artifactId>guava</artifactId>
		<version>32.0.1-jre</version>
	</dependency>
	<!-- flink连接器 -->
	<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-connector-kafka</artifactId>
		<version>${flink.version}</version>
	</dependency>
	<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka -->
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-sql-connector-kafka</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress -->
	<dependency>
		<groupId>org.apache.commons</groupId>
		<artifactId>commons-compress</artifactId>
		<version>1.24.0</version>
	</dependency>
	<dependency>
		<groupId>org.projectlombok</groupId>
		<artifactId>lombok</artifactId>
		<version>1.18.2</version>
	</dependency>
	<dependency>
		<groupId>org.apache.bahir</groupId>
		<artifactId>flink-connector-redis_2.12</artifactId>
		<version>1.1.0</version>
		<exclusions>
			<exclusion>
				<artifactId>flink-streaming-java_2.12</artifactId>
				<groupId>org.apache.flink</groupId>
			</exclusion>
			<exclusion>
				<artifactId>flink-runtime_2.12</artifactId>
				<groupId>org.apache.flink</groupId>
			</exclusion>
			<exclusion>
				<artifactId>flink-core</artifactId>
				<groupId>org.apache.flink</groupId>
			</exclusion>
			<exclusion>
				<artifactId>flink-java</artifactId>
				<groupId>org.apache.flink</groupId>
			</exclusion>
			<exclusion>
				<groupId>org.apache.flink</groupId>
				<artifactId>flink-table-api-java</artifactId>
			</exclusion>
			<exclusion>
				<groupId>org.apache.flink</groupId>
				<artifactId>flink-table-api-java-bridge_2.12</artifactId>
			</exclusion>
			<exclusion>
				<groupId>org.apache.flink</groupId>
				<artifactId>flink-table-common</artifactId>
			</exclusion>
			<exclusion>
				<groupId>org.apache.flink</groupId>
				<artifactId>flink-table-planner_2.12</artifactId>
			</exclusion>
		</exclusions>
	</dependency>
	<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
	<dependency>
		<groupId>com.alibaba</groupId>
		<artifactId>fastjson</artifactId>
		<version>2.0.43</version>
	</dependency>
</dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160

2、数据结构

本示例仅仅为实现需求:将订单中uId与用户id进行关联,然后输出Tuple2<Order, String>。

  • 事实流 order
    // 事实表
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class Order {
        private Integer id;
        private Integer uId;
        private Double total;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 维度流 user
    // 维表
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class User {
        private Integer id;
        private String name;
        private Double balance;
        private Integer age;
        private String email;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

3、数据源

事实流数据有几种,具体见示例部分,比如socket、redis、kafka等
维度表流有几种,具体见示例部分,比如静态数据、mysql、socket、kafka等。
如此,实现本文中的示例就需要准备好相应的环境,即mysql、redis、kafka、netcat等。

4、验证结果

本文提供的所有示例均为验证通过的示例,测试的数据均在每个示例中,分为事实流、维度流和运行结果进行注释,在具体的示例中关于验证不再赘述。

二、维表来源于第三方数据源

1、说明

这种方式是将维表数据存储在Redis、HBase、MySQL等外部存储中,事实流在关联维表数据的时候实时去外部存储中查询。

由于维度数据量不受内存限制,可以存储很大的数据量。同时维表数据来源于第三方数据源,读取速度受制于外部存储的读取速度。一般常见的做法该种方式较多。

2、示例:将事实流与维表进行关联-通过缓存降低性能开销

如果频繁的访问第三方数据源进行join,会带来很大的开销,为降低该种情况的开销,一般使用cache来减轻访问压力,但该种方式存在数据同步的不一致或延迟情况。如果使用缓存,则会存在将数据存在内存中,也会增加系统开销。该种情况的实际应用以具体的业务场景而定。本示例使用的是guava Cache,缓存的实现有很多种方式,具体以自己的实际情况进行选择。

本示例的数据源仅仅以静态的数据进行展示,实际上可能数据来源于Hbase、mysql等。

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */
public class TestJoinDimFromCacheDataDemo {
    // 维表
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class User {
        private Integer id;
        private String name;
        private Double balance;
        private Integer age;
        private String email;
    }

    // 事实表
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class Order {
        private Integer id;
        private Integer uId;
        private Double total;
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // order 实时流
        DataStream<Order> orderDs = env.socketTextStream("192.168.10.42", 9999)
                .map(o -> {
                    String[] lines = o.split(",");
                    return new Order(Integer.valueOf(lines[0]), Integer.valueOf(lines[1]), Double.valueOf(lines[2]));
                });

        // user 维表
        DataStream<Tuple2<Order, String>> result = orderDs.map(new RichMapFunction<Order, Tuple2<Order, String>>() {
            // 缓存接口这里是LoadingCache,LoadingCache在缓存项不存在时可以自动加载缓存
            LoadingCache<Integer, User> userDim;

            @Override
            public void open(Configuration parameters) throws Exception {
                // 使用google LoadingCache来进行缓存
                // CacheBuilder的构造函数是私有的,只能通过其静态方法newBuilder()来获得CacheBuilder的实例
                userDim = CacheBuilder.newBuilder()
                        // 设置并发级别为8,并发级别是指可以同时写缓存的线程数
                        .concurrencyLevel(8)
                        // 最多缓存个数,超过了就根据最近最少使用算法来移除缓存
                        .maximumSize(1000)
                        // 设置写缓存后10分钟过期
                        .expireAfterWrite(10, TimeUnit.MINUTES)
                        // 设置缓存容器的初始容量为10
                        .initialCapacity(10)
                        // 设置要统计缓存的命中率
                        .recordStats()
                        // 指定移除通知
                        .removalListener(new RemovalListener<Integer, User>() {
                            @Override
                            public void onRemoval(RemovalNotification<Integer, User> removalNotification) {
                                System.out.println(removalNotification.getKey() + "被移除了,值为:" + removalNotification.getValue());
                            }
                        })
                        .build(
                                // 指定加载缓存的逻辑
                                new CacheLoader<Integer, User>() {
                                    @Override
                                    public User load(Integer uId) throws Exception {
                                        return dataSource(uId);
                                    }
                                });
                System.out.println("userDim:" + userDim.get(1002));
            }

            private User dataSource(Integer uId) {
                // 可以是任何数据源,本处仅仅示例
                Map<Integer, User> users = new HashMap<>();
                users.put(1001, new User(1001, "alan", 20d, 18, "alan.chan.chn@163.com"));
                users.put(1002, new User(1002, "alanchan", 22d, 20, "alan.chan.chn@163.com"));
                users.put(1003, new User(1003, "alanchanchn", 23d, 22, "alan.chan.chn@163.com"));
                users.put(1004, new User(1004, "alan_chan", 21d, 19, "alan.chan.chn@163.com"));
                users.put(1005, new User(1005, "alan_chan_chn", 23d, 21, "alan.chan.chn@163.com"));
                User user = null;
                if (users.containsKey(uId)) {
                    user = users.get(uId);
                }

                return user;
            }

            @Override
            public Tuple2<Order, String> map(Order value) throws Exception {
                return new Tuple2(value, userDim.get(value.getUId()).getName());
            }

        });

        result.print();
        // 输入数据
        // 7,1003,111
        // 8,1005,234
        // 9,1002,875

        // 控制台输出数据
        // 5> (TestJoinDimFromCacheDataDemo.Order(id=7, uId=1003, total=111.0),alanchanchn)
        // 6> (TestJoinDimFromCacheDataDemo.Order(id=8, uId=1005,  total=234.0),alan_chan_chn)
        // 7> (TestJoinDimFromCacheDataDemo.Order(id=9, uId=1002, total=875.0),alanchan)

        env.execute("TestJoinDimFromCacheDataDemo");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133

3、示例:将事实流与维表进行关联-通过Flink 的异步 I/O提高系统效率

Flink与外部存储系统进行读写操作的时候可以使用同步方式,也就是发送一个请求后等待外部系统响应,然后再发送第二个读写请求,这样的方式吞吐量比较低,可以用提高并行度的方式来提高吞吐量,但是并行度多了也就导致了进程数量多了,占用了大量的资源。

Flink中可以使用异步IO来读写外部系统,这要求外部系统客户端支持异步IO,比如redis、MongoDB等。

更多内容见文章:
55、Flink之用于外部数据访问的异步 I/O介绍及示例

1)、redis 异步I/O实现

package org.tablesql.join;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.tablesql.join.TestJoinDimFromAsyncDataStreamDemo.Order;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */
public class JoinAyncFunctionByRedis extends RichAsyncFunction<Order, Tuple2<Order, String>> {
    private JedisPoolConfig config = null;

    private static String ADDR = "192.168.10.41";
    private static int PORT = 6379;
    private static int TIMEOUT = 10000;
    private JedisPool jedisPool = null;
    private Jedis jedis = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        config = new JedisPoolConfig();
        jedisPool = new JedisPool(config, ADDR, PORT, TIMEOUT);

        jedis = jedisPool.getResource();
    }

    @Override
    public void asyncInvoke(Order input, ResultFuture<Tuple2<Order, String>> resultFuture) throws Exception {
        // order 实时流中的单行数据
        System.out.println("输入参数input----:" + input);
        // 发起一个异步请求,返回结果
        CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                // 数据格式:1002,alanchan,19,25,alan.chan.chn@163.com
                String userLine = jedis.hget("AsyncReadUserById_Redis", input.getUId() + "");
                String[] userTemp = userLine.split(",");
                // 返回 用户名
                return userTemp[1];
            }
        }).thenAccept((String dbResult) -> {
            // 设置请求完成时的回调,将结果返回
            List list = new ArrayList<Tuple2<Order, String>>();
            list.add(new Tuple2<>(input, dbResult));
            resultFuture.complete(list);
        });
    }

    // 连接超时的时候调用的方法
    public void timeout(Order input, ResultFuture<Tuple2<Order, String>> resultFuture)
            throws Exception {
        List list = new ArrayList<Tuple2<Order, String>>();
        // 数据源超时,不能获取到维表信息,置为"
        list.add(new Tuple2<>(input, ""));
        resultFuture.complete(list);
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (jedis.isConnected()) {
            jedis.close();
        }

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82

2)、实现事实流与维度流join


package org.tablesql.join;

import java.util.concurrent.TimeUnit;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */
public class TestJoinDimFromAsyncDataStreamDemo {
    // 维表
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class User {
        private Integer id;
        private String name;
        private Double balance;
        private Integer age;
        private String email;
    }

    // 事实表
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class Order {
        private Integer id;
        private Integer uId;
        private Double total;
    }

    public static void main(String[] args) throws Exception {
        testJoinAyncFunctionByRedis();
    }

    static void testJoinAyncFunctionByRedis() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // order 实时流
        DataStream<Order> orderDs = env.socketTextStream("192.168.10.42", 9999)
                .map(o -> {
                    String[] lines = o.split(",");
                    return new Order(Integer.valueOf(lines[0]), Integer.valueOf(lines[1]), Double.valueOf(lines[2]));
                });

        // 保证顺序:异步返回的结果保证顺序,超时时间1秒,最大容量2,超出容量触发反压
        DataStream<Tuple2<Order, String>> result = AsyncDataStream.orderedWait(orderDs, new JoinAyncFunctionByRedis(),
                1000L, TimeUnit.MILLISECONDS, 2);

        result.print("result:");

        // 允许乱序:异步返回的结果允许乱序,超时时间1秒,最大容量2,超出容量触发反压
        DataStream<Tuple2<Order, String>> unorderedResult = AsyncDataStream
                .unorderedWait(orderDs, new JoinAyncFunctionByRedis(), 1000L, TimeUnit.MILLISECONDS, 2)
                .setParallelism(1);
        unorderedResult.print("unorderedResult");
        
        // redis的操作命令及数据
        // 127.0.0.1:6379> hset AsyncReadUserById_Redis 1001 '1001,alan,18,20,alan.chan.chn@163.com'
        // (integer) 1
        // 127.0.0.1:6379> hset AsyncReadUserById_Redis 1002 '1002,alanchan,19,25,alan.chan.chn@163.com'
        // (integer) 1
        // 127.0.0.1:6379> hset AsyncReadUserById_Redis 1003 '1003,alanchanchn,20,30,alan.chan.chn@163.com'
        // (integer) 1
        // 127.0.0.1:6379> hset AsyncReadUserById_Redis 1004 '1004,alan_chan,27,20,alan.chan.chn@163.com'
        // (integer) 1
        // 127.0.0.1:6379> hset AsyncReadUserById_Redis 1005 '1005,alan_chan_chn,36,10,alan.chan.chn@163.com'
        // (integer) 1
        // 127.0.0.1:6379> hgetall AsyncReadUserById_Redis
        // 1) "1001"
        // 2) "1001,alan,18,20,alan.chan.chn@163.com"
        // 3) "1002"
        // 4) "1002,alanchan,19,25,alan.chan.chn@163.com"
        // 5) "1003"
        // 6) "1003,alanchanchn,20,30,alan.chan.chn@163.com"
        // 7) "1004"
        // 8) "1004,alan_chan,27,20,alan.chan.chn@163.com"
        // 9) "1005"
        // 10) "1005,alan_chan_chn,36,10,alan.chan.chn@163.com"
        
        // 输入数据
        // 13,1002,811
        // 14,1004,834
        // 15,1005,975

        // 控制台输出数据
        // 输入参数input----:TestJoinDimFromAsyncDataStreamDemo.Order(id=13, uId=1002, total=811.0)
        // result::12> (TestJoinDimFromAsyncDataStreamDemo.Order(id=13, uId=1002, total=811.0),1002,alanchan,19,25,alan.chan.chn@163.com)
        // 输入参数input----:TestJoinDimFromAsyncDataStreamDemo.Order(id=13, uId=1002, total=811.0)
        // unorderedResult:9> (TestJoinDimFromAsyncDataStreamDemo.Order(id=13, uId=1002, total=811.0),1002,alanchan,19,25,alan.chan.chn@163.com)
        // result::5> (TestJoinDimFromAsyncDataStreamDemo.Order(id=14, uId=1004, total=834.0),alan_chan)
        // 输入参数input----:TestJoinDimFromAsyncDataStreamDemo.Order(id=14, uId=1004, total=834.0)
        // unorderedResult:2> (TestJoinDimFromAsyncDataStreamDemo.Order(id=14, uId=1004, total=834.0),alan_chan)
        // 输入参数input----:TestJoinDimFromAsyncDataStreamDemo.Order(id=15, uId=1005, total=975.0)
        // result::6> (TestJoinDimFromAsyncDataStreamDemo.Order(id=15, uId=1005, total=975.0),alan_chan_chn)
        // 输入参数input----:TestJoinDimFromAsyncDataStreamDemo.Order(id=15, uId=1005, total=975.0)
        // unorderedResult:3> (TestJoinDimFromAsyncDataStreamDemo.Order(id=15, uId=1005, total=975.0),alan_chan_chn)

        env.execute("TestJoinDimFromAsyncDataStreamDemo");
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114

以上,本文介绍了Flink 通过异步IO的方式访问第三方数据源,介绍了2个示例,即通过缓存和redis的方式。

本专题分为以下几篇文章:
【flink番外篇】15、Flink维表实战之6种实现方式-初始化的静态数据
【flink番外篇】15、Flink维表实战之6种实现方式-维表来源于第三方数据源
【flink番外篇】15、Flink维表实战之6种实现方式-通过广播将维表数据传递到下游
【flink番外篇】15、Flink维表实战之6种实现方式-通过Temporal table实现维表数据join
【flink番外篇】15、Flink维表实战之6种实现方式-完整版(1)
【flink番外篇】15、Flink维表实战之6种实现方式-完整版(2)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/174243
推荐阅读
相关标签
  

闽ICP备14008679号