当前位置:   article > 正文

flink学习(6)之sink_sink 工具

sink 工具

flink的官方文档中看到,无论是source还是sink都称之为flink的Connectors
在这里插入图片描述
点击overview然后就可以看到它所有的cnnectors
在这里插入图片描述
从上边的图片中我们发现,这些组件不是都作为source和sink,有的可以作为source,有的可以作为sink,有的同时当做source和sink。我们点击Redis(sink)
在这里插入图片描述
可以看到flink提供了一个接口把数据发送的redis中。这个sink可以使用三种不同的方法与redis不同的环境进行通信。
1、Single Redis Server 单节点模式
2、Redis Cluster 集群模式
3、Redis Sentinel 哨兵模式
继续往下看有他针对不同Redis环境的示例,接下来我们就参考人家给出的示例写一个自己的示例:
在写之前我们需要把redis的依赖加入进去,文章的开头其实也写了:

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.0</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

RedisSink组件

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

public class MyRedisMapper implements RedisMapper<Tuple2<String, Integer>> {
    public RedisCommandDescription getCommandDescription() {
        return new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME");
    }

    public String getKeyFromData(Tuple2<String, Integer> data) {
        return data.f0;
    }

    public String getValueFromData(Tuple2<String, Integer> data) {
        return data.f1+"";
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

job任务

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.util.Collector;

public class FlinkRedisSinkDemo {
    public static void main(String[] args) throws Exception {
        //获取流执行环境
        StreamExecutionEnvironment senv=StreamExecutionEnvironment.getExecutionEnvironment();
        //获取数据源
        DataStream<String> source=senv.socketTextStream("192.168.112.111",1234);
        DataStream<Tuple2<String,Integer>> data = source.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
            public void flatMap(String line, Collector<Tuple2<String,Integer>> collector) throws Exception {
                String[] words = line.split(" ");
                for (String word:words
                     ) {
                    collector.collect(new Tuple2<String, Integer>(word,1));
                }
            }
        }).keyBy(0).sum(1);

        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("192.168.112.111").setPort(6379).build();
        data.addSink(new RedisSink<Tuple2<String, Integer>>(conf, new MyRedisMapper()));
        //执行流失计算
        senv.execute("FlinkRedisSinkDemo");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

然后我们把redis服务启动了,并在linux操作系统打开一个socket流端口供flink任务链接:
nc -l -p 1234
随后我们启动任务,任务启动之后我们就可以通过socket给flink传数据了
加粗样式
然后我进入redis客户端查看一下这个hash里边的数据
在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/615152
推荐阅读
相关标签
  

闽ICP备14008679号