赞
踩
- <!--flink核心包-->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>1.7.2</version>
- </dependency>
- <!--flink流处理包-->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.12</artifactId>
- <version>1.7.2</version>
- <scope>provided</scope>
- </dependency>
- // DataStream
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-
- // DataSet
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Flink针对DataStream提供了大量的已经实现的数据目的地(Sink),具体如下所示
writeAsText():讲元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取
print()/printToErr():打印每个元素的toString()方法的值到标准输出或者标准错误输出流中
自定义输出:addSink可以实现把数据输出到第三方存储介质中
Flink提供了一批内置的Connector,其中有的Connector会提供对应的Sink支持
案例:将流数据下沉到redis中
1、依赖:
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-redis_2.11</artifactId>
- <version>1.1.5</version>
- </dependency>
2、代码:
- import org.apache.flink.api.common.functions.MapFunction;
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.connectors.redis.RedisSink;
- import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
- import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
- import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
- import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
-
- public class StreamToRedis {
-
- public static void main(String[] args) throws Exception {
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- String hostname = "192.168.25.128";
- int port = 7777;
- DataStreamSource<String> data = env.socketTextStream(hostname, port);
- SingleOutputStreamOperator<Tuple2<String, String>> m_word = data.map(new MapFunction<String, Tuple2<String, String>>() {
- public Tuple2<String, String> map(String value) throws Exception {
- return new Tuple2<String, String>("m_word", value);
- }
- });
-
- FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("192.168.25.129").setPort(6379).build();
- // 参数一:配置信息
- // 参数二:下沉mapper映射器用于指明谁是key,谁是value
- RedisSink<Tuple2<String, String>> redisSink = new RedisSink<Tuple2<String, String>>(conf, new MyMapper());
- // 将结果保存到redis中
- m_word.addSink(redisSink);
- env.execute("to redis");
- }
-
- /**
- * RedisMapper<Tuple2<String,String>> 泛型为Tuple2<String,String>为将来传递过来的数据类型
- * 将数据其映射为key和value
- */
- public static class MyMapper implements RedisMapper<Tuple2<String,String>>{
-
- public RedisCommandDescription getCommandDescription() {
-
- // redis命令就是怎样插入key和value
- return new RedisCommandDescription(RedisCommand.LPUSH);
- }
-
- public String getKeyFromData(Tuple2<String, String> data) {
- // 指定key
- return data.f0;
- }
-
- public String getValueFromData(Tuple2<String, String> data) {
- // 指定value
- return data.f1;
- }
- }
- }
案例:统计一个文件中各个单词出现的次数,把统计结果输出到文件
- import org.apache.flink.api.common.functions.FlatMapFunction;
- import org.apache.flink.api.java.DataSet;
- import org.apache.flink.api.java.ExecutionEnvironment;
- import org.apache.flink.api.java.operators.AggregateOperator;
- import org.apache.flink.api.java.operators.DataSource;
- import org.apache.flink.api.java.operators.FlatMapOperator;
- import org.apache.flink.api.java.operators.UnsortedGrouping;
- import org.apache.flink.api.java.tuple.Tuple2;
-
- import org.apache.flink.util.Collector;
-
-
- public class WordCountBatch {
-
- public static void main(String[] args) throws Exception {
- // 1、获取flink的运行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- // 2、用flink的运行环境,去获取待分析数据
- String input = "C:\\data\\input\\hello.txt";
- String output = "C:\\data\\output";
- DataSource<String> lines = env.readTextFile(input);
-
- // 3、处理数据
- //将lines中的每一行元素都施加一个算法
- FlatMapOperator<String, Tuple2<String, Integer>> wordAndOne = lines.flatMap(new SplitFun());
-
- // (单词,1)
- // b。把相同的单词聚合到一起
- UnsortedGrouping<Tuple2<String, Integer>> groupByWord = wordAndOne.groupBy(0);
- // c。把聚合到一起的数据累加处理
- DataSet<Tuple2<String, Integer>> result = groupByWord.sum(1);
- // 4、保存处理结果
- result.writeAsText(output);
- result.print();
- // 5、触发程序执行
- env.execute("wordcount batch process");
-
-
- }
-
- /**
- * 作用:将输入进来的每一行元素根据空格切分,切分成一个一个的单词
- * 再把切分好的单词变成《单词,1)这样的组合,元组中
- */
- static class SplitFun implements FlatMapFunction<String, Tuple2<String ,Integer>>{
-
- @Override
- public void flatMap(String in, Collector<Tuple2<String, Integer>> out) throws Exception {
- // a.将文本内容打散成一个一个单词
- String[] words = in.split(" "); //hello you ==> (hello,1) (you,1)
- for (String word : words) {
- Tuple2<String, Integer> wordAndOne = new Tuple2<>();
- out.collect(wordAndOne);
- }
- }
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。