当前位置:   article > 正文

Flink学习10---DataStream之Sink简介及RichSinkFunction

richsinkfunction

功能就是负责把 Flink 处理后的数据输出到外部系统中。

一、Flink针对DataStream提供了大量的已经实现的数据下沉(sink)方式,具体有:

1. writeAsText(): 将元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取。

2. print() / printToErr(): 打印每个元素的toString()方法的值到标准输出或者标准错误输出流中。

3. 自定义输出:addSink可以实现把数据输出到第三方存储介质中。

Flink通过内置的Connector和Apache Bahir组件提供了对应sink的支持。

详细参考:https://blog.csdn.net/zhuzuwei/article/details/107137295

 

二、Sink组件容错性保证

Sink语义保证备注
HDFSExactly-once 
ElasticsearchAt-least-once 
Kafka ProduceAt-least-once / At-most-onceKafka 0.9和0.10提供At-least-once
Kafka 0.11提供Exactly_once
FileAt-least-once 
RedisAt-least-once 

 

三、实例演示

     之前都是print()的sink方式,此处演示sink到Txt文件和Redis数据库。

  1. import org.apache.flink.api.common.functions.FlatMapFunction;
  2. import org.apache.flink.api.java.tuple.Tuple2;
  3. import org.apache.flink.api.java.tuple.Tuple3;
  4. import org.apache.flink.api.java.utils.ParameterTool;
  5. import org.apache.flink.core.fs.FileSystem;
  6. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  7. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  8. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  9. import org.apache.flink.util.Collector;
  10. public class AddSinkReivew {
  11. public static void main(String[] args) throws Exception{
  12. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  13. DataStreamSource<String> lines = env.socketTextStream("192.168.***.***", 8888);
  14. SingleOutputStreamOperator<Tuple2<String, Integer>> words = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
  15. @Override
  16. public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
  17. String[] words = s.split(",");
  18. for (int i = 0; i < words.length; i++) {
  19. collector.collect(Tuple2.of(words[i], 1));
  20. }
  21. }
  22. });
  23. SingleOutputStreamOperator<Tuple2<String, Integer>> summed = words.keyBy(0).sum(1);
  24. summed.print();
  25. summed.writeAsText("C:\\Users\\Dell\\Desktop\\flinkTest\\sinkout1.txt", FileSystem.WriteMode.OVERWRITE);
  26. SingleOutputStreamOperator<Tuple3<String,String, Integer>> words2 = lines.flatMap(new FlatMapFunction<String, Tuple3<String,String, Integer>>() {
  27. @Override
  28. public void flatMap(String s, Collector<Tuple3<String,String, Integer>> collector) throws Exception {
  29. String[] words = s.split(",");
  30. for (int i = 0; i < words.length; i++) {
  31. collector.collect(Tuple3.of("wordscount",words[i], 1));
  32. }
  33. }
  34. });
  35. SingleOutputStreamOperator<Tuple3<String,String, Integer>> summed2 = words2.keyBy(1).sum(2);
  36. String configPath = "C:\\Users\\Dell\\Desktop\\flinkTest\\config.txt";
  37. ParameterTool parameters = ParameterTool.fromPropertiesFile(configPath);
  38. //设置全局参数
  39. env.getConfig().setGlobalJobParameters(parameters);
  40. summed2.addSink(new MyRedisSinkFunction());
  41. env.execute("AddSinkReivew");
  42. }
  43. }
  1. import org.apache.flink.api.java.tuple.Tuple3;
  2. import org.apache.flink.api.java.utils.ParameterTool;
  3. import org.apache.flink.configuration.Configuration;
  4. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
  5. import redis.clients.jedis.Jedis;
  6. public class MyRedisSinkFunction extends RichSinkFunction<Tuple3<String, String, Integer>>{
  7. private transient Jedis jedis;
  8. @Override
  9. public void open(Configuration config) {
  10. ParameterTool parameters = (ParameterTool)getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
  11. String host = parameters.getRequired("redis.host");
  12. String password = parameters.get("redis.password", "");
  13. Integer port = parameters.getInt("redis.port", 6379);
  14. Integer timeout = parameters.getInt("redis.timeout", 5000);
  15. Integer db = parameters.getInt("redis.db", 0);
  16. jedis = new Jedis(host, port, timeout);
  17. jedis.auth(password);
  18. jedis.select(db);
  19. }
  20. @Override
  21. public void invoke(Tuple3<String, String, Integer> value, Context context) throws Exception {
  22. if (!jedis.isConnected()) {
  23. jedis.connect();
  24. }
  25. //保存
  26. jedis.hset(value.f0, value.f1, String.valueOf(value.f2));
  27. }
  28. @Override
  29. public void close() throws Exception {
  30. jedis.close();
  31. }
  32. }

WriteAsText中指定的sinkout1.txt并不是一个文件,而是会生成同名文件夹。里面有4个文件对应并行度,保存不同subTask sink的结果。

单个文件夹内保存的结果如下:

Redis中也有了sink的结果

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/615121
推荐阅读
相关标签
  

闽ICP备14008679号