当前位置:   article > 正文

Flink pipeline 写入redis_flink redis 管道

flink redis 管道

Flink pipeline 写入redis

​ 实战过程中,源算子为Kafka,sink为 需要将数据以HINCRBY形式写入Redis

​ 以下提供三种实现方案,推荐使用第二种。

以下方案在发生错误时恢复可能表现的不太如意。正在改进。

方案一:使用官方提供的

实现RedisMapper接口

// 此处stream为源算子拿到数据的对象 DataStreamSource<T>
stream.addSink(new RedisSink<>(buildSinkRedisPool(), new RedisMapper<Tuple3<String, String, Long>>() {
                    @Override
                    public RedisCommandDescription getCommandDescription() {
                        return new RedisCommandDescription(RedisCommand.HINCRBY, "temp");
                    }

                    @Override
                    public String getKeyFromData(Tuple3<String, String, Long> data) {
                        return data.f1;
                    }

                    @Override
                    public String getValueFromData(Tuple3<String, String, Long> data) {
                        return String.valueOf(data.f2);
                    }

                    @Override
                    public Optional<String> getAdditionalKey(Tuple3<String, String, Long> data) {
                        return Optional.of(data.f0);
                    }

                    @Override
                    public Optional<Integer> getAdditionalTTL(Tuple3<String, String, Long> data) {
                        return Optional.of(120 * 60 * 1000);
                    }
}))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

方案二:使用 lettuce

pom导入

<dependency>
    <groupId>io.lettuce</groupId>
    <artifactId>lettuce-core</artifactId>
    <version>${lettuce.version}</version>
    <exclusions>
        <exclusion>
            <artifactId>netty-buffer</artifactId>
            <groupId>io.netty</groupId>
        </exclusion>
    </exclusions>
</dependency>

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.84.Final</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

代码

public class RedisPipLineSink extends ProcessFunction<Tuple3<String, String, Long>, String> {
    private static final long expire = 120 * 60;

    private Configuration configuration;
    private Long commFlag = 0L;
    private Long thresholdValue = 10000L;

    protected RedisClusterClient client = null;
    protected StatefulRedisClusterConnection<String, String> connect = null;
    protected RedisAdvancedClusterAsyncCommands<String, String> async = null;


    @Override
    public void open(Configuration parameters) throws Exception {

        Set<RedisURI> redisURIS = new HashSet<>();
        // 此处 redisServer 应为从配置文件获取,配置文件由启动main()函数时指定
		redisServer = "";
        Arrays.asList(redisServer.split(",")).forEach(item -> {
            String[] info = item.split(":");
            redisURIS.add(RedisURI.builder().withHost(info[0]).withPort(Integer.valueOf(info[1])).build());
        });

        client = RedisClusterClient.create(redisURIS);
        connect = client.connect();
        async = connect.async();
        async.setAutoFlushCommands(false);
    }


    @Override
    public void processElement(Tuple3<String, String, Long> value, Context ctx, Collector<String> out) throws Exception {
        // 缓冲区为1W
        commFlag++;
        async.hincrby(value.f0, value.f1, value.f2);
        async.expire(value.f0, expire);
        // 达到1W提交命令并清空缓冲区
        if (thresholdValue.equals(commFlag)) {
            commFlag = 0L;
            async.flushCommands();
        }
    }

    @Override
    public void close() throws Exception {
        if (connect != null && connect.isOpen()) {
            connect.close();
        }
        if (client != null) {
            client.shutdown();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

启动命令

​ jvm启动时需要增加参数-Dcom.datastax.driver.FORCE_NIO=true

​ 如果不加入此命令将会报错

Caused by: java.lang.ClassCastException: io.netty.channel.epoll.EpollEventLoopGroup cannot be cast to io.netty.channel.EventLoopGroup
    at com.datastax.driver.core.NettyUtil.newEventLoopGroupInstance(NettyUtil.java:134)
    at com.datastax.driver.core.NettyOptions.eventLoopGroup(NettyOptions.java:95)
    at com.datastax.driver.core.Connection$Factory.<init>(Connection.java:926)
    at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1499)
    at com.datastax.driver.core.Cluster.init(Cluster.java:208)
    at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:376)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

​ 原因见:

  • https://docs.datastax.com/en/developer/java-driver-dse/1.4/faq/#what-is-netty-s-native-epoll-transport-and-how-do-i-enable-or-disable-it
  • https://stackoverflow.com/questions/53947481/how-to-write-data-from-flink-pipeline-to-redis-efficiently

方案三:使用redisson

pom导入

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.17.6</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

代码

public class RedisRedissonPipLineSink extends ProcessFunction<Tuple3<String, String, Long>, String> {

    private static final String PREFIX = "redis://";
    private static final long expire = 120 * 60;

    private Configuration configuration;
    private Long commFlag = 0L;
    private Long thresholdValue = 10000L;

    private RedissonClient redisson = null;
    private RBatch batch = null;

    @Override
    public void open(Configuration parameters) throws Exception {
	
		// 此处 redisServer 应为从配置文件获取,配置文件由启动main()函数时指定
        String redisServer = "";

        if (redisServer == null || "".equals(redisServer)) {
            return;
        }

        Config config = new Config();
        ClusterServersConfig clusterServersConfig = config.useClusterServers();

        List<String> nodes = Arrays.asList(redisServer.split(","));
        for (String node : nodes) {
            clusterServersConfig.addNodeAddress(PREFIX + node);
        }

        redisson = Redisson.create(config);
        batch = redisson.createBatch();
    }


    @Override
    public void processElement(Tuple3<String, String, Long> value, Context ctx, Collector<String> out) throws Exception {
        commFlag++;
        RMapAsync<Object, Object> map = batch.getMap(value.f0, StringCodec.INSTANCE);
        map.addAndGetAsync(value.f1, value.f2);
        map.expireAsync(2, TimeUnit.HOURS);
        batch.getMap(value.f0).addAndGetAsync(value.f1, value.f2);
        if (thresholdValue.equals(commFlag)) {
            batch.executeAsync();
            batch = redisson.createBatch();
            commFlag = 0L;
        }
    }

    @Override
    public void close() throws Exception {
        if (redisson != null && !redisson.isShutdown()) {
            redisson.shutdown();
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/587271
推荐阅读
相关标签
  

闽ICP备14008679号