当前位置:   article > 正文

Flink流处理-DataStream常用Source及Sink_datastream source source=env.fromelements(

datastream source source=env.fromelements("hello flink","hello hadoo

环境

flink-1.9.0

一、需要的依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>1.9.0</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

二、初始化执行环境读取数据文件

数据文件word

how	are	you
world	and	that
hello	world
jack	and
app	storm	storm	what
spark	spark
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

初始化流处理执行环境

 /**
  * 初始化流处理执行环境
  */
 private static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  • 1
  • 2
  • 3
  • 4

三、常用Source

1.socketTextStream
从socket套接字中获取数据

/**
 * 从socket套接字中获取数据
 */
public static void socketTextStream() throws Exception {
    // 从本地socket套接字中读取数据
    DataStreamSource<String> dataStream = env.socketTextStream("localhost", 9999);
    // 打印输入的内容
    dataStream.print();
    // 执行任务
    env.execute();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

打开CMD,运行netcat,输入nc -l -p 9999后运行该程序,等待连接完成后输入一些内容,可以看到在控制台中输出了socket中输入的内容
在这里插入图片描述
2.readTextFile(path)
从文本文件中读取数据作为数据源

/**
 * 从文本文件中读取数据作为数据源
 * 注意:文件可以是本地文件,也可以是hdfs中的文件,只需要指定对应的路径即可
 */
public static void readTextFile() throws Exception {
    // 从本地文本文件中读取数据
    DataStreamSource<String> dataStream = env.readTextFile("FlinkModule/src/main/resources/common/word");
    // 从hdfs文件系统中读取数据
    //DataStreamSource<String> dataStream = env.readTextFile("hdfs://master:9000/word");

    // 将文本中每行单词切分成单个单词并收集
    dataStream.flatMap(new FlatMapFunction<String, String>() {
        @Override
        public void flatMap(String s, Collector<String> collector) throws Exception {
            String[] words = s.split("\t");
            for(String word: words){
                collector.collect(word);
            }
        }
    }).print();
    // 执行任务
    env.execute();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

运行结果
在这里插入图片描述
3.generateSequence(from, to)
将序列作为数据源

/**
 * 从生成序列中读取数据作为数据源
 */
public static void generateSequence() throws Exception {
    // 设置并行度,默认为CPU核心数,这里设置为1可以防止输出乱序
    env.setParallelism(1);
    // 生成1-10的序列并输出
    env.generateSequence(1, 10).print();
    // 执行任务
    env.execute();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

运行结果
在这里插入图片描述
4.fromCollection(Seq)
将集合中的数据当作数据源

/**
 * 从Java.util.Collection集合中读取数据作为数据源
 */
public static void fromCollection() throws Exception {
    ArrayList<String> list = new ArrayList<>(5);
    list.add("flink");
    list.add("scala");
    list.add("spark");
    list.add("hadoop");
    list.add("hive");
    env.fromCollection(list).print();
    // 执行任务
    env.execute();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

运行结果
在这里插入图片描述
5.fromElements(elements: _*)
将一堆元素作为数据源

/**
 * 从Java.util.Collection集合中读取数据作为数据源
 */
public static void fromElements() throws Exception {
    env.fromElements("flink", "scala", "spark", "hadoop", "hive").print();
    // 执行任务
    env.execute();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

运行结果
在这里插入图片描述
6.addSource
自定义数据源
MySQL表中的数据

1	hello	1
2	hi	3
3	flink	1
4	scala	1
5	spark	1
6	hadoop	1
7	hive	1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

Mysql自定义数据源类

package cn.myclass.stream;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;


/**
 * 自定义mysql数据源
 * 可以通过实现SourceFunction接口实现,也可以通过继承RichSourceFunction类
 * 实现,两者的区别为后者提供的方法更多更丰富。如果只有简单的逻辑则使用前者
 * 即可。这里由于需要连接数据库,所以使用了RichSourceFunction,借助类中提供
 * 的open以及close方法更加合理的利用资源读取数据。
 * @author Yang
 */
public class MysqlDataSource extends RichSourceFunction<String> {

    /**
     * 预处理对象
     */
    private PreparedStatement preparedStatement = null;

    /**
     * 连接对象
     */
    private Connection connection = null;

    /**
     * 初始化方法,读取数据前先初始化MySQL连接,避免多次初始化,有效利用资源。
     * @param parameters 参数信息
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        Class.forName("com.mysql.jdbc.Driver");
        //创建连接
        connection = DriverManager.getConnection(
                "jdbc:mysql://localhost:3306/flink",
                "root",
                "root");
        // 从word表中读取所有单词
        String sql = "select word from word";
        // 获得预处理对象
        preparedStatement = connection.prepareStatement(sql);
    }

    /**
     * 读取数据时执行此方法,从查询结果中依次获得单词
     * @param sourceContext 数据源上下文对象
     */
    @Override
    public void run(SourceContext<String> sourceContext) throws Exception {
        // 执行查询获得结果
        ResultSet resultSet = preparedStatement.executeQuery();
        while (resultSet.next()){
            // 将结果添加到收集器中
            sourceContext.collect(resultSet.getString("word"));
        }
    }

    /**
     * 取消任务时执行
     */
    @Override
    public void cancel() {
    }

    /**
     * 关闭时的方法,关闭MySQL连接,避免资源占用
     */
    @Override
    public void close() throws Exception {
        if (preparedStatement != null){
            preparedStatement.close();
        }
        if (connection != null){
            connection.close();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83

调用方法

/**
 * 从自定义数据源中读取数据
 */
public static void addSource() throws Exception {
    // 添加自定义数据源并打印读取的数据
    env.addSource(new MysqlDataSource()).print();
    // 执行任务
    env.execute();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

运行结果
在这里插入图片描述

四、常用Sink

1.print
测试时常用的方法,将结果直接输出到标准输出设备(控制台)。略

2.writeAsText
保存为文本文件

 /**
  * 将结果写入到文本文件中
  */
 public static void writeAsText() throws Exception {
     // 从本地文本文件中读取数据
     DataStream<String> dataStream = env.readTextFile("FlinkModule/src/main/resources/common/word");
     // 设置并行度为1,将结果写入到一个文件中
     env.setParallelism(1);
     // 将结果写入到hdfs中
     //dataStream.writeAsText("hdfs://master:9000/words.txt");
     // 将结果写到本地文本文件中
     dataStream.writeAsText("FlinkModule/src/main/resources/stream/words.txt");
     // 执行任务
     env.execute();
 }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

运行结果
在这里插入图片描述

3.writeAsCsv
保存为csv文件

/**
 * 将结果写入到csv文件中
 * 注意:将结果写入到csv只支持元组类型的数据,所以在这里将结果转化成了元组并无实际意义
 */
public static void writeAsCsv() throws Exception {
    // 从本地文本文件中读取数据
    DataStream<String> dataStream = env.readTextFile("FlinkModule/src/main/resources/common/word");
    // 设置并行度
    env.setParallelism(1);
    // 将单词转化成元组
    dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
        @Override
        public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
            String[] words = s.split("\t");
            for(String word: words){
                collector.collect(new Tuple2<>(word, 1));
            }
        }
    }).writeAsCsv("FlinkModule/src/main/resources/stream/words.csv");
    // 执行任务
    env.execute();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

运行结果
在这里插入图片描述
4.writeToSocket
输出到套接字

 /**
  * 将结果写入到socket套接字中
  */
 public static void writeToSocket() throws Exception {
     // 从本地文本文件中读取数据
     DataStream<String> dataStream = env.readTextFile("FlinkModule/src/main/resources/common/word");
     // 将结果写入到socket套接字中,以简单字符串类型发送
     dataStream.writeToSocket("localhost", 9999, new SimpleStringSchema());
     // 执行任务
     env.execute();
 }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

打开CMD,运行netcat,输入nc -lp 9999等待接收运行结果后执行该方法,可以看到nc中输出的结果
在这里插入图片描述
5.addSink
自定义Mysql数据沉槽类

package cn.myclass.stream;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

/**
 * 自定义Mysql数据沉槽
 * @author Yang
 */
public class MysqlDataSink extends RichSinkFunction<Tuple2<String, Integer>> {
    /**
     * 预处理对象
     */
    private PreparedStatement preparedStatement = null;

    /**
     * 连接对象
     */
    private Connection connection = null;

    /**
     * 初始化方法,读取数据前先初始化MySQL连接,避免多次初始化,有效利用资源。
     * @param parameters 参数信息
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        Class.forName("com.mysql.jdbc.Driver");
        // 创建连接
        connection = DriverManager.getConnection(
                "jdbc:mysql://localhost:3306/flink",
                "root",
                "root");
        // 从word表中读取所有单词
        String sql = "insert into word(word,count) values(?,?)";
        // 预编译语句并获得预处理对象
        preparedStatement = connection.prepareStatement(sql);
    }

    /**
     * 每条结果执行的方法
     * @param tuple2 元组数据
     * @param context 上下文
     */
    @Override
    public void invoke(Tuple2<String, Integer> tuple2, Context context) throws Exception {
        // 设置sql语句中的第一个和第二个值
        preparedStatement.setString(1, tuple2.f0);
        preparedStatement.setInt(2, tuple2.f1);
        // 执行插入
        preparedStatement.executeUpdate();
    }

    /**
     * 关闭时的方法,关闭MySQL连接,避免资源占用
     */
    @Override
    public void close() throws Exception {
        if (preparedStatement != null){
            preparedStatement.close();
        }
        if (connection != null){
            connection.close();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70

调用方法

 /**
  * 将结果写入自定义数据沉槽
  */
 public static void addSink() throws Exception {
     // 从本地文本文件中读取数据
     DataStream<String> dataStream = env.readTextFile("FlinkModule/src/main/resources/common/word");
     // 将单词形成元组并设置次数为1
     dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                 @Override
                 public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                     String[] words = s.split("\t");
                     for(String word: words){
                         collector.collect(new Tuple2<>(word, 1));
                     }
                 }
                 // 写入MySql中
             }).addSink(new MysqlDataSink());
     // 执行任务
     env.execute();
 }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

运行结果
在这里插入图片描述

五、完整代码

自定义Mysql数据源

package cn.myclass.stream;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;


/**
 * 自定义mysql数据源
 * 可以通过实现SourceFunction接口实现,也可以通过继承RichSourceFunction类
 * 实现,两者的区别为后者提供的方法更多更丰富。如果只有简单的逻辑则使用前者
 * 即可。这里由于需要连接数据库,所以使用了RichSourceFunction,借助类中提供
 * 的open以及close方法更加合理的利用资源读取数据。
 * @author Yang
 */
public class MysqlDataSource extends RichSourceFunction<String> {

    /**
     * 预处理对象
     */
    private PreparedStatement preparedStatement = null;

    /**
     * 连接对象
     */
    private Connection connection = null;

    /**
     * 初始化方法,读取数据前先初始化MySQL连接,避免多次初始化,有效利用资源。
     * @param parameters 参数信息
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        Class.forName("com.mysql.jdbc.Driver");
        //创建连接
        connection = DriverManager.getConnection(
                "jdbc:mysql://localhost:3306/flink",
                "root",
                "root");
        // 从word表中读取所有单词
        String sql = "select word from word";
        // 获得预处理对象
        preparedStatement = connection.prepareStatement(sql);
    }

    /**
     * 读取数据时执行此方法,从查询结果中依次获得单词
     * @param sourceContext 数据源上下文对象
     */
    @Override
    public void run(SourceContext<String> sourceContext) throws Exception {
        // 执行查询获得结果
        ResultSet resultSet = preparedStatement.executeQuery();
        while (resultSet.next()){
            // 将结果添加到收集器中
            sourceContext.collect(resultSet.getString("word"));
        }
    }

    /**
     * 取消任务时执行
     */
    @Override
    public void cancel() {
    }

    /**
     * 关闭时的方法,关闭MySQL连接,避免资源占用
     */
    @Override
    public void close() throws Exception {
        if (preparedStatement != null){
            preparedStatement.close();
        }
        if (connection != null){
            connection.close();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83

自定义Mysql数据沉槽

package cn.myclass.stream;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

/**
 * 自定义Mysql数据沉槽
 * @author Yang
 */
public class MysqlDataSink extends RichSinkFunction<Tuple2<String, Integer>> {
    /**
     * 预处理对象
     */
    private PreparedStatement preparedStatement = null;

    /**
     * 连接对象
     */
    private Connection connection = null;

    /**
     * 初始化方法,读取数据前先初始化MySQL连接,避免多次初始化,有效利用资源。
     * @param parameters 参数信息
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        Class.forName("com.mysql.jdbc.Driver");
        // 创建连接
        connection = DriverManager.getConnection(
                "jdbc:mysql://localhost:3306/flink",
                "root",
                "root");
        // 从word表中读取所有单词
        String sql = "insert into word(word,count) values(?,?)";
        // 预编译语句并获得预处理对象
        preparedStatement = connection.prepareStatement(sql);
    }

    /**
     * 每条结果执行的方法
     * @param tuple2 元组数据
     * @param context 上下文
     */
    @Override
    public void invoke(Tuple2<String, Integer> tuple2, Context context) throws Exception {
        // 设置sql语句中的第一个和第二个值
        preparedStatement.setString(1, tuple2.f0);
        preparedStatement.setInt(2, tuple2.f1);
        // 执行插入
        preparedStatement.executeUpdate();
    }

    /**
     * 关闭时的方法,关闭MySQL连接,避免资源占用
     */
    @Override
    public void close() throws Exception {
        if (preparedStatement != null){
            preparedStatement.close();
        }
        if (connection != null){
            connection.close();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70

主类

package cn.myclass.stream;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.ArrayList;

/**
 * Flink流处理常用数据源及数据沉槽
 * @author Yang
 */
public class DataStreamSourceAndSink {

    /**
     * 初始化流处理执行环境
     */
    private static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    /**
     * 从socket套接字中获取数据作为数据源
     */
    public static void socketTextStream() throws Exception {
        // 从本地socket套接字中读取数据
        DataStreamSource<String> dataStream = env.socketTextStream("localhost", 9999);
        // 打印输入的内容
        dataStream.print();
        // 执行任务
        env.execute();
    }

    /**
     * 从文本文件中读取数据作为数据源
     * 文本文件可以是本地文件,也可以是hdfs中的文件,只需要指定路径即可
     */
    public static void readTextFile() throws Exception {
        // 从本地文本文件中读取数据
        DataStreamSource<String> dataStream = env.readTextFile("FlinkModule/src/main/resources/common/word");
        // 从hdfs文件系统中读取数据
        //DataStreamSource<String> dataStream = env.readTextFile("hdfs://master:9000/word");

        // 将文本中每行单词切分成单个单词并收集
        dataStream.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String s, Collector<String> collector) throws Exception {
                String[] words = s.split("\t");
                for(String word: words){
                    collector.collect(word);
                }
            }
        }).print();
        // 执行任务
        env.execute();
    }

    /**
     * 从生成序列中读取数据作为数据源
     */
    public static void generateSequence() throws Exception {
        // 设置并行度,默认为CPU核心数,这里设置为1可以防止输出乱序
        env.setParallelism(1);
        // 生成1-10的序列并输出
        env.generateSequence(1, 10).print();
        // 执行任务
        env.execute();
    }

    /**
     * 从Java.util.Collection集合中读取数据作为数据源
     */
    public static void fromCollection() throws Exception {
        ArrayList<String> list = new ArrayList<>(5);
        list.add("flink");
        list.add("scala");
        list.add("spark");
        list.add("hadoop");
        list.add("hive");
        env.fromCollection(list).print();
        // 执行任务
        env.execute();
    }

    /**
     * 从Java.util.Collection集合中读取数据作为数据源
     */
    public static void fromElements() throws Exception {
        env.fromElements("flink", "scala", "spark", "hadoop", "hive").print();
        // 执行任务
        env.execute();
    }

    /**
     * 从自定义数据源中读取数据
     */
    public static void addSource() throws Exception {
        // 添加自定义数据源并打印读取的数据
        env.addSource(new MysqlDataSource()).print();
        // 执行任务
        env.execute();
    }

    /**
     * 将结果写入到文本文件中
     */
    public static void writeAsText() throws Exception {
        // 从本地文本文件中读取数据
        DataStream<String> dataStream = env.readTextFile("FlinkModule/src/main/resources/common/word");
        // 设置并行度为1,将结果写入到一个文件中
        env.setParallelism(1);
        // 将结果写入到hdfs中
        //dataStream.writeAsText("hdfs://master:9000/words.txt");
        // 将结果写到本地文本文件中
        dataStream.writeAsText("FlinkModule/src/main/resources/stream/words.txt");
        // 执行任务
        env.execute();
    }

    /**
     * 将结果写入到csv文件中
     * 注意:将结果写入到csv只支持元组类型的数据
     */
    public static void writeAsCsv() throws Exception {
        // 从本地文本文件中读取数据
        DataStream<String> dataStream = env.readTextFile("FlinkModule/src/main/resources/common/word");
        // 设置并行度
        env.setParallelism(1);
        // 将单词转化成元组
        dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] words = s.split("\t");
                for(String word: words){
                    collector.collect(new Tuple2<>(word, 1));
                }
            }
        }).writeAsCsv("FlinkModule/src/main/resources/stream/words.csv");
        // 执行任务
        env.execute();
    }

    /**
     * 将结果写入到socket套接字中
     */
    public static void writeToSocket() throws Exception {
        // 从本地文本文件中读取数据
        DataStream<String> dataStream = env.readTextFile("FlinkModule/src/main/resources/common/word");
        // 将结果写入到socket套接字中,以简单字符串类型发送
        dataStream.writeToSocket("localhost", 9999, new SimpleStringSchema());
        // 执行任务
        env.execute();
    }

    /**
     * 将结果写入自定义数据沉槽
     */
    public static void addSink() throws Exception {
        // 从本地文本文件中读取数据
        DataStream<String> dataStream = env.readTextFile("FlinkModule/src/main/resources/common/word");
        // 将单词形成元组并初始化次数为1
        dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                        String[] words = s.split("\t");
                        for(String word: words){
                            collector.collect(new Tuple2<>(word, 1));
                        }
                    }
                    // 写入MySql中
                }).addSink(new MysqlDataSink());
        // 执行任务
        env.execute();
    }


    public static void main(String[] args) throws Exception {
        socketTextStream();
        readTextFile();
        generateSequence();
        fromCollection();
        fromElements();
        addSource();
        writeAsText();
        writeAsCsv();
        writeToSocket();
        addSink();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191

如有错误,望指正!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/779545
推荐阅读
相关标签
  

闽ICP备14008679号