当前位置:   article > 正文

Flink:下沉Sink常用API_flink sink api

flink sink api

依赖

  1. <!--flink核心包-->
  2. <dependency>
  3. <groupId>org.apache.flink</groupId>
  4. <artifactId>flink-java</artifactId>
  5. <version>1.7.2</version>
  6. </dependency>
  7. <!--flink流处理包-->
  8. <dependency>
  9. <groupId>org.apache.flink</groupId>
  10. <artifactId>flink-streaming-java_2.12</artifactId>
  11. <version>1.7.2</version>
  12. <scope>provided</scope>
  13. </dependency>

DataStream和DataSet的用法一样

  1. // DataStream
  2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3. // DataSet
  4. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

Flink针对DataStream提供了大量的已经实现的数据目的地(Sink),具体如下所示

  • writeAsText():讲元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取

  • print()/printToErr():打印每个元素的toString()方法的值到标准输出或者标准错误输出流中

  • 自定义输出:addSink可以实现把数据输出到第三方存储介质中

    Flink提供了一批内置的Connector,其中有的Connector会提供对应的Sink支持

案例:将流数据下沉到redis中

1、依赖:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-redis_2.11</artifactId>
  4. <version>1.1.5</version>
  5. </dependency>

 2、代码:

  1. import org.apache.flink.api.common.functions.MapFunction;
  2. import org.apache.flink.api.java.tuple.Tuple2;
  3. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  4. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. import org.apache.flink.streaming.connectors.redis.RedisSink;
  7. import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
  8. import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
  9. import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
  10. import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
  11. public class StreamToRedis {
  12. public static void main(String[] args) throws Exception {
  13. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  14. String hostname = "192.168.25.128";
  15. int port = 7777;
  16. DataStreamSource<String> data = env.socketTextStream(hostname, port);
  17. SingleOutputStreamOperator<Tuple2<String, String>> m_word = data.map(new MapFunction<String, Tuple2<String, String>>() {
  18. public Tuple2<String, String> map(String value) throws Exception {
  19. return new Tuple2<String, String>("m_word", value);
  20. }
  21. });
  22. FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("192.168.25.129").setPort(6379).build();
  23. // 参数一:配置信息
  24. // 参数二:下沉mapper映射器用于指明谁是key,谁是value
  25. RedisSink<Tuple2<String, String>> redisSink = new RedisSink<Tuple2<String, String>>(conf, new MyMapper());
  26. // 将结果保存到redis中
  27. m_word.addSink(redisSink);
  28. env.execute("to redis");
  29. }
  30. /**
  31. * RedisMapper<Tuple2<String,String>> 泛型为Tuple2<String,String>为将来传递过来的数据类型
  32. * 将数据其映射为key和value
  33. */
  34. public static class MyMapper implements RedisMapper<Tuple2<String,String>>{
  35. public RedisCommandDescription getCommandDescription() {
  36. // redis命令就是怎样插入key和value
  37. return new RedisCommandDescription(RedisCommand.LPUSH);
  38. }
  39. public String getKeyFromData(Tuple2<String, String> data) {
  40. // 指定key
  41. return data.f0;
  42. }
  43. public String getValueFromData(Tuple2<String, String> data) {
  44. // 指定value
  45. return data.f1;
  46. }
  47. }
  48. }

案例:统计一个文件中各个单词出现的次数,把统计结果输出到文件

  1. import org.apache.flink.api.common.functions.FlatMapFunction;
  2. import org.apache.flink.api.java.DataSet;
  3. import org.apache.flink.api.java.ExecutionEnvironment;
  4. import org.apache.flink.api.java.operators.AggregateOperator;
  5. import org.apache.flink.api.java.operators.DataSource;
  6. import org.apache.flink.api.java.operators.FlatMapOperator;
  7. import org.apache.flink.api.java.operators.UnsortedGrouping;
  8. import org.apache.flink.api.java.tuple.Tuple2;
  9. import org.apache.flink.util.Collector;
  10. public class WordCountBatch {
  11. public static void main(String[] args) throws Exception {
  12. // 1、获取flink的运行环境
  13. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  14. // 2、用flink的运行环境,去获取待分析数据
  15. String input = "C:\\data\\input\\hello.txt";
  16. String output = "C:\\data\\output";
  17. DataSource<String> lines = env.readTextFile(input);
  18. // 3、处理数据
  19. //将lines中的每一行元素都施加一个算法
  20. FlatMapOperator<String, Tuple2<String, Integer>> wordAndOne = lines.flatMap(new SplitFun());
  21. // (单词,1)
  22. // b。把相同的单词聚合到一起
  23. UnsortedGrouping<Tuple2<String, Integer>> groupByWord = wordAndOne.groupBy(0);
  24. // c。把聚合到一起的数据累加处理
  25. DataSet<Tuple2<String, Integer>> result = groupByWord.sum(1);
  26. // 4、保存处理结果
  27. result.writeAsText(output);
  28. result.print();
  29. // 5、触发程序执行
  30. env.execute("wordcount batch process");
  31. }
  32. /**
  33. * 作用:将输入进来的每一行元素根据空格切分,切分成一个一个的单词
  34. * 再把切分好的单词变成《单词,1)这样的组合,元组中
  35. */
  36. static class SplitFun implements FlatMapFunction<String, Tuple2<String ,Integer>>{
  37. @Override
  38. public void flatMap(String in, Collector<Tuple2<String, Integer>> out) throws Exception {
  39. // a.将文本内容打散成一个一个单词
  40. String[] words = in.split(" "); //hello you ==> (hello,1) (you,1)
  41. for (String word : words) {
  42. Tuple2<String, Integer> wordAndOne = new Tuple2<>();
  43. out.collect(wordAndOne);
  44. }
  45. }
  46. }
  47. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/606572
推荐阅读
相关标签
  

闽ICP备14008679号