当前位置:   article > 正文

flink重温笔记(六):Flink 流批一体 API 开发—— 数据输出 sink_flink打印输出

flink打印输出

Flink学习笔记

前言:今天是学习 flink 的第七天啦!学习了 flink 中 sink(数据槽) 部分知识点,这一部分只要是解决数据处理之后,数据到哪里去的问题,我觉得 flink 知识点虽然比较难理解,但是代码跑通后,逻辑还是比较有趣的!

Tips:毛爷爷说过:“宜将剩勇追穷寇,不可沽名学霸王!”明天周日除了复习前面知识点之外,也要继续努力学习接下来的知识点,继续加油!

二、Flink 流批一体 API 开发

4. 数据输出 Sink

4.1 print 打印

打印是最简单的一个Sink,通常是用来做实验和测试时使用。

实例:socket 数据源,查看进程编号最终输出 sink 之 print 打印

package cn.itcast.day06.sink;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

/**
 * @author lql
 * @time 2024-02-17 22:27:48
 * @description TODO:print
 */
public class PrintSinkDemo {
    public static void main(String[] args) throws Exception {
        //local模式默认的并行度是当前机器的逻辑核的数量
        Configuration configuration = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);

        int parallelism0 = env.getParallelism();
        System.out.println("执行环境默认的并行度:" + parallelism0);
        // socket 数据源
        DataStreamSource<String> lines = env.socketTextStream("node1", 9999);

        // 获取 lines 数据源并行度
        int parallelism = lines.getParallelism();
        System.out.println("SocketSource的并行度:" + parallelism);

        lines.print();
        lines.addSink(new MyPrintSink()).name("my-print-sink");
        env.execute();
    }

    private static class MyPrintSink extends RichSinkFunction<String> {
        // 这一处定义很重要,不然 indexOfThisSubtask 只能在一个方法中使用!
        private int indexOfThisSubtask;
        @Override
        public void open(Configuration parameters) throws Exception {
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
        }

        @Override
        public void invoke(String value, Context context) throws Exception {
            System.out.println(indexOfThisSubtask + 1 + "> " + value);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

结果:

执行环境默认的并行度:8
SocketSource的并行度:1
6> hadoop
1> hadoop
1> hadoop
7> hadoop
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

总结:

  • 打印输出,也是一种 sink
4.2 writeAsText 以文本格式输出

该方法是将数据以文本格式实时的写入到指定的目录中,本质上使用的是 TextOutputFormat 格式写入的。

实例:socket 数据源,将数据输出到文本 Text 中

package cn.itcast.day06.sink;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
 * @author lql
 * @time 2024-02-17 22:40:48
 * @description TODO:writeAsText
 */
public class WriteSinkDemo {
    public static void main(String[] args) throws Exception {
        //local模式默认的并行度是当前机器的逻辑核的数量
        Configuration configuration = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);

        int parallelism0 = env.getParallelism();
        System.out.println("执行环境默认的并行度:" + parallelism0);
        DataStreamSource<String> lines = env.socketTextStream("node1", 9999);

        //获取DataStream的并行度
        int parallelism = lines.getParallelism();
        System.out.println("SocketSource的并行度:" + parallelism);

        lines.writeAsText("D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\output",FileSystem.WriteMode.OVERWRITE);
        env.execute();

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

结果:

output 文件夹下出现以数字命名的文件
内容为 socket 数据源输出,加上了 \n 换行符
目录中的文件名称是该 Sink 所在 subtask 的 Index + 1
  • 1
  • 2
  • 3

总结:

  • 1- writeAsText 输出数据以小文件方式,文件命名为 subtask 的 Index + 1
  • 2- FileSystem.WriteMode 有两种,一种是 OVERWRITE,可以覆盖同名文件,一种是 NO_OVERWRITE,同名文件就报错
4.3 writeAsCsv 以 csv 格式输出

该方法是将数据以 csv 格式写入到指定的目录中,本质上使用的是 CsvOutputFormat 格式写入的。

实例:socket 数据源,将数据输出到文本 csv 中

package cn.itcast.day06.sink;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.core.fs.FileSystem;
/**
 * @author lql
 * @time 2024-02-17 22:52:12
 * @description TODO:将DataSet数据写入到csv文件中
 */
public class CsvSink {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //需先建立文件
        String filePath = "D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\output\\user.csv";
        //添加数据
        Tuple7<Integer, String, Integer, Integer, String, String, Long> row = new Tuple7<>(15, "zhangsan", 40, 1, "CN", "2020-09-08 00:00:00", 1599494400000L);

        //转换为dataSet,利用 数据源中 fromElements 可以接受 [列表或元组] 的属性
        DataSource<Tuple7<Integer, String, Integer, Integer, String, String, Long>> dataSet = (DataSource<Tuple7<Integer, String, Integer, Integer, String, String, Long>>) env.fromElements(row);

        //将内容写入到File中,如果文件已存在,将会被复盖
        dataSet.writeAsCsv(filePath,FileSystem.WriteMode.OVERWRITE).setParallelism(1);
        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

结果:

在指定文件中,生成了 csv 数据
  • 1

总结:

  • 1- 首先需要定义数据存放的位置,精确到 .scv
  • 2- 最终需要将并行度设置为 1,才能生成一个完整的文件
4.4 writeUsingOutputFormat 指定格式输出

该方法是将数据已指定的格式写入到指定目录中

package cn.itcast.day06.sink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.io.TextOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.core.fs.Path;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
 * @author lql
 * @time 2024-02-17 23:03:24
 * @description TODO:将数据已指定的格式写入到指定目录中
 */
public class writeUsingOutputFormatSink {
    public static void main(String[] args) throws Exception {
        //1:获取流处理运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //调用env的fromElements创建一个非并行的DataStreamSource
        DataStreamSource<String> words = env.fromElements(
                "hadoop","spark","flink","hbase","flink","spark"
        );

        // 对拆分后的单词,每个单词记一次数
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String word) throws Exception {
                return Tuple2.of(word, 1);
            }
        });

        SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne.keyBy(0).sum(1);

        result.writeUsingOutputFormat(new TextOutputFormat<>(new Path("D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\output\\wordcount")));
        env.execute();

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

结果:

在指定目录下,生成 n(电脑并行度数量) 个文本文件
  • 1

总结:

  • 1- writeAsText 和 writeAsCsv 方法底层都是调用了 writeUsingOutputFormat 方法
  • 2- 这种方法更加灵活
4.5 writeToSocket 输出到网络端口

该方法是将数据输出到指定的Socket网络地址端口。

实例:socket 数据源,node1:9999 写数据到 node1:8888

package cn.itcast.day06.sink;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author lql
 * @time 2024-02-17 23:12:03
 * @description TODO:writeToSocket
 */
public class WriteToSocketDemo {
    public static void main(String[] args) throws Exception {
        //local模式默认的并行度是当前机器的逻辑核的数量
        Configuration configuration = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
        int parallelism0 = env.getParallelism();
        System.out.println("执行环境默认的并行度:" + parallelism0);
        DataStreamSource<String> lines = env.socketTextStream("node1", 9999);
        
        //获取DataStream的并行度
        int parallelism = lines.getParallelism();
        System.out.println("SocketSource的并行度:" + parallelism);
        
        // 第三个参数是数据输出的序列化格式 SerializationSchema
        lines.writeToSocket("node1",8888,new SimpleStringSchema());
        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

结果:

node1:8888 实时接收到 node1:9999 写入的数据
  • 1

总结:

  • 端口号需要提前开启
4.6 基于本地集合的 Sink

数据分类集合输出

实例:数据打印输出,error 输出,可以输出到:Stdout,Stderr,采集为本地集合

package cn.itcast.day06.sink;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
 * @author lql
 * @time 2024-02-17 23:18:12
 * @description TODO:数据可以输出到:Stdout,Stderr,采集为本地集合
 */
public class CollectionDemo {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<Tuple2<Integer, String>> dataSource = env.fromElements(
                Tuple2.of(1, "zhangsan"),
                Tuple2.of(2, "lisi"),
                Tuple2.of(3, "wangwu"),
                Tuple2.of(4, "zhaoliu")
        );

        //2.sink
        dataSource.print();
        dataSource.printToErr();

        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

结果:

黑色字体输出:
6> (3,wangwu)
7> (4,zhaoliu)
4> (1,zhangsan)
5> (2,lisi)

红色字体输出:
8> (3,wangwu)
7> (2,lisi)
1> (4,zhaoliu)
6> (1,zhangsan)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

总结:

  • 1- printToErr 可以进行分类输出
  • 2- 并行度是1 能输出文件
  • 3- 并行度是n 能输出文件夹
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/615082
推荐阅读
相关标签
  

闽ICP备14008679号