赞
踩
flink-1.9.0
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.9.0</version>
</dependency>
数据文件word
how are you
world and that
hello world
jack and
app storm storm what
spark spark
初始化流处理执行环境
/**
* 初始化流处理执行环境
*/
private static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
1.socketTextStream
从socket套接字中获取数据
/**
* 从socket套接字中获取数据
*/
public static void socketTextStream() throws Exception {
// 从本地socket套接字中读取数据
DataStreamSource<String> dataStream = env.socketTextStream("localhost", 9999);
// 打印输入的内容
dataStream.print();
// 执行任务
env.execute();
}
打开CMD,运行netcat,输入nc -l -p 9999后运行该程序,等待连接完成后输入一些内容,可以看到在控制台中输出了socket中输入的内容
2.readTextFile(path)
从文本文件中读取数据作为数据源
/** * 从文本文件中读取数据作为数据源 * 注意:文件可以是本地文件,也可以是hdfs中的文件,只需要指定对应的路径即可 */ public static void readTextFile() throws Exception { // 从本地文本文件中读取数据 DataStreamSource<String> dataStream = env.readTextFile("FlinkModule/src/main/resources/common/word"); // 从hdfs文件系统中读取数据 //DataStreamSource<String> dataStream = env.readTextFile("hdfs://master:9000/word"); // 将文本中每行单词切分成单个单词并收集 dataStream.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String s, Collector<String> collector) throws Exception { String[] words = s.split("\t"); for(String word: words){ collector.collect(word); } } }).print(); // 执行任务 env.execute(); }
运行结果
3.generateSequence(from, to)
将序列作为数据源
/**
* 从生成序列中读取数据作为数据源
*/
public static void generateSequence() throws Exception {
// 设置并行度,默认为CPU核心数,这里设置为1可以防止输出乱序
env.setParallelism(1);
// 生成1-10的序列并输出
env.generateSequence(1, 10).print();
// 执行任务
env.execute();
}
运行结果
4.fromCollection(Seq)
将集合中的数据当作数据源
/**
* 从Java.util.Collection集合中读取数据作为数据源
*/
public static void fromCollection() throws Exception {
ArrayList<String> list = new ArrayList<>(5);
list.add("flink");
list.add("scala");
list.add("spark");
list.add("hadoop");
list.add("hive");
env.fromCollection(list).print();
// 执行任务
env.execute();
}
运行结果
5.fromElements(elements: _*)
将一堆元素作为数据源
/**
* 从Java.util.Collection集合中读取数据作为数据源
*/
public static void fromElements() throws Exception {
env.fromElements("flink", "scala", "spark", "hadoop", "hive").print();
// 执行任务
env.execute();
}
运行结果
6.addSource
自定义数据源
MySQL表中的数据
1 hello 1
2 hi 3
3 flink 1
4 scala 1
5 spark 1
6 hadoop 1
7 hive 1
Mysql自定义数据源类
package cn.myclass.stream; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; /** * 自定义mysql数据源 * 可以通过实现SourceFunction接口实现,也可以通过继承RichSourceFunction类 * 实现,两者的区别为后者提供的方法更多更丰富。如果只有简单的逻辑则使用前者 * 即可。这里由于需要连接数据库,所以使用了RichSourceFunction,借助类中提供 * 的open以及close方法更加合理的利用资源读取数据。 * @author Yang */ public class MysqlDataSource extends RichSourceFunction<String> { /** * 预处理对象 */ private PreparedStatement preparedStatement = null; /** * 连接对象 */ private Connection connection = null; /** * 初始化方法,读取数据前先初始化MySQL连接,避免多次初始化,有效利用资源。 * @param parameters 参数信息 */ @Override public void open(Configuration parameters) throws Exception { Class.forName("com.mysql.jdbc.Driver"); //创建连接 connection = DriverManager.getConnection( "jdbc:mysql://localhost:3306/flink", "root", "root"); // 从word表中读取所有单词 String sql = "select word from word"; // 获得预处理对象 preparedStatement = connection.prepareStatement(sql); } /** * 读取数据时执行此方法,从查询结果中依次获得单词 * @param sourceContext 数据源上下文对象 */ @Override public void run(SourceContext<String> sourceContext) throws Exception { // 执行查询获得结果 ResultSet resultSet = preparedStatement.executeQuery(); while (resultSet.next()){ // 将结果添加到收集器中 sourceContext.collect(resultSet.getString("word")); } } /** * 取消任务时执行 */ @Override public void cancel() { } /** * 关闭时的方法,关闭MySQL连接,避免资源占用 */ @Override public void close() throws Exception { if (preparedStatement != null){ preparedStatement.close(); } if (connection != null){ connection.close(); } } }
调用方法
/**
* 从自定义数据源中读取数据
*/
public static void addSource() throws Exception {
// 添加自定义数据源并打印读取的数据
env.addSource(new MysqlDataSource()).print();
// 执行任务
env.execute();
}
运行结果
1.print
测试时常用的方法,将结果直接输出到标准输出设备(控制台)。略
2.writeAsText
保存为文本文件
/**
* 将结果写入到文本文件中
*/
public static void writeAsText() throws Exception {
// 从本地文本文件中读取数据
DataStream<String> dataStream = env.readTextFile("FlinkModule/src/main/resources/common/word");
// 设置并行度为1,将结果写入到一个文件中
env.setParallelism(1);
// 将结果写入到hdfs中
//dataStream.writeAsText("hdfs://master:9000/words.txt");
// 将结果写到本地文本文件中
dataStream.writeAsText("FlinkModule/src/main/resources/stream/words.txt");
// 执行任务
env.execute();
}
运行结果
3.writeAsCsv
保存为csv文件
/** * 将结果写入到csv文件中 * 注意:将结果写入到csv只支持元组类型的数据,所以在这里将结果转化成了元组并无实际意义 */ public static void writeAsCsv() throws Exception { // 从本地文本文件中读取数据 DataStream<String> dataStream = env.readTextFile("FlinkModule/src/main/resources/common/word"); // 设置并行度 env.setParallelism(1); // 将单词转化成元组 dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception { String[] words = s.split("\t"); for(String word: words){ collector.collect(new Tuple2<>(word, 1)); } } }).writeAsCsv("FlinkModule/src/main/resources/stream/words.csv"); // 执行任务 env.execute(); }
运行结果
4.writeToSocket
输出到套接字
/**
* 将结果写入到socket套接字中
*/
public static void writeToSocket() throws Exception {
// 从本地文本文件中读取数据
DataStream<String> dataStream = env.readTextFile("FlinkModule/src/main/resources/common/word");
// 将结果写入到socket套接字中,以简单字符串类型发送
dataStream.writeToSocket("localhost", 9999, new SimpleStringSchema());
// 执行任务
env.execute();
}
打开CMD,运行netcat,输入nc -lp 9999等待接收运行结果后执行该方法,可以看到nc中输出的结果
5.addSink
自定义Mysql数据沉槽类
package cn.myclass.stream; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; /** * 自定义Mysql数据沉槽 * @author Yang */ public class MysqlDataSink extends RichSinkFunction<Tuple2<String, Integer>> { /** * 预处理对象 */ private PreparedStatement preparedStatement = null; /** * 连接对象 */ private Connection connection = null; /** * 初始化方法,读取数据前先初始化MySQL连接,避免多次初始化,有效利用资源。 * @param parameters 参数信息 */ @Override public void open(Configuration parameters) throws Exception { Class.forName("com.mysql.jdbc.Driver"); // 创建连接 connection = DriverManager.getConnection( "jdbc:mysql://localhost:3306/flink", "root", "root"); // 从word表中读取所有单词 String sql = "insert into word(word,count) values(?,?)"; // 预编译语句并获得预处理对象 preparedStatement = connection.prepareStatement(sql); } /** * 每条结果执行的方法 * @param tuple2 元组数据 * @param context 上下文 */ @Override public void invoke(Tuple2<String, Integer> tuple2, Context context) throws Exception { // 设置sql语句中的第一个和第二个值 preparedStatement.setString(1, tuple2.f0); preparedStatement.setInt(2, tuple2.f1); // 执行插入 preparedStatement.executeUpdate(); } /** * 关闭时的方法,关闭MySQL连接,避免资源占用 */ @Override public void close() throws Exception { if (preparedStatement != null){ preparedStatement.close(); } if (connection != null){ connection.close(); } } }
调用方法
/** * 将结果写入自定义数据沉槽 */ public static void addSink() throws Exception { // 从本地文本文件中读取数据 DataStream<String> dataStream = env.readTextFile("FlinkModule/src/main/resources/common/word"); // 将单词形成元组并设置次数为1 dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception { String[] words = s.split("\t"); for(String word: words){ collector.collect(new Tuple2<>(word, 1)); } } // 写入MySql中 }).addSink(new MysqlDataSink()); // 执行任务 env.execute(); }
运行结果
自定义Mysql数据源
package cn.myclass.stream; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; /** * 自定义mysql数据源 * 可以通过实现SourceFunction接口实现,也可以通过继承RichSourceFunction类 * 实现,两者的区别为后者提供的方法更多更丰富。如果只有简单的逻辑则使用前者 * 即可。这里由于需要连接数据库,所以使用了RichSourceFunction,借助类中提供 * 的open以及close方法更加合理的利用资源读取数据。 * @author Yang */ public class MysqlDataSource extends RichSourceFunction<String> { /** * 预处理对象 */ private PreparedStatement preparedStatement = null; /** * 连接对象 */ private Connection connection = null; /** * 初始化方法,读取数据前先初始化MySQL连接,避免多次初始化,有效利用资源。 * @param parameters 参数信息 */ @Override public void open(Configuration parameters) throws Exception { Class.forName("com.mysql.jdbc.Driver"); //创建连接 connection = DriverManager.getConnection( "jdbc:mysql://localhost:3306/flink", "root", "root"); // 从word表中读取所有单词 String sql = "select word from word"; // 获得预处理对象 preparedStatement = connection.prepareStatement(sql); } /** * 读取数据时执行此方法,从查询结果中依次获得单词 * @param sourceContext 数据源上下文对象 */ @Override public void run(SourceContext<String> sourceContext) throws Exception { // 执行查询获得结果 ResultSet resultSet = preparedStatement.executeQuery(); while (resultSet.next()){ // 将结果添加到收集器中 sourceContext.collect(resultSet.getString("word")); } } /** * 取消任务时执行 */ @Override public void cancel() { } /** * 关闭时的方法,关闭MySQL连接,避免资源占用 */ @Override public void close() throws Exception { if (preparedStatement != null){ preparedStatement.close(); } if (connection != null){ connection.close(); } } }
自定义Mysql数据沉槽
package cn.myclass.stream; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; /** * 自定义Mysql数据沉槽 * @author Yang */ public class MysqlDataSink extends RichSinkFunction<Tuple2<String, Integer>> { /** * 预处理对象 */ private PreparedStatement preparedStatement = null; /** * 连接对象 */ private Connection connection = null; /** * 初始化方法,读取数据前先初始化MySQL连接,避免多次初始化,有效利用资源。 * @param parameters 参数信息 */ @Override public void open(Configuration parameters) throws Exception { Class.forName("com.mysql.jdbc.Driver"); // 创建连接 connection = DriverManager.getConnection( "jdbc:mysql://localhost:3306/flink", "root", "root"); // 从word表中读取所有单词 String sql = "insert into word(word,count) values(?,?)"; // 预编译语句并获得预处理对象 preparedStatement = connection.prepareStatement(sql); } /** * 每条结果执行的方法 * @param tuple2 元组数据 * @param context 上下文 */ @Override public void invoke(Tuple2<String, Integer> tuple2, Context context) throws Exception { // 设置sql语句中的第一个和第二个值 preparedStatement.setString(1, tuple2.f0); preparedStatement.setInt(2, tuple2.f1); // 执行插入 preparedStatement.executeUpdate(); } /** * 关闭时的方法,关闭MySQL连接,避免资源占用 */ @Override public void close() throws Exception { if (preparedStatement != null){ preparedStatement.close(); } if (connection != null){ connection.close(); } } }
主类
package cn.myclass.stream; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import java.util.ArrayList; /** * Flink流处理常用数据源及数据沉槽 * @author Yang */ public class DataStreamSourceAndSink { /** * 初始化流处理执行环境 */ private static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /** * 从socket套接字中获取数据作为数据源 */ public static void socketTextStream() throws Exception { // 从本地socket套接字中读取数据 DataStreamSource<String> dataStream = env.socketTextStream("localhost", 9999); // 打印输入的内容 dataStream.print(); // 执行任务 env.execute(); } /** * 从文本文件中读取数据作为数据源 * 文本文件可以是本地文件,也可以是hdfs中的文件,只需要指定路径即可 */ public static void readTextFile() throws Exception { // 从本地文本文件中读取数据 DataStreamSource<String> dataStream = env.readTextFile("FlinkModule/src/main/resources/common/word"); // 从hdfs文件系统中读取数据 //DataStreamSource<String> dataStream = env.readTextFile("hdfs://master:9000/word"); // 将文本中每行单词切分成单个单词并收集 dataStream.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String s, Collector<String> collector) throws Exception { String[] words = s.split("\t"); for(String word: words){ collector.collect(word); } } }).print(); // 执行任务 env.execute(); } /** * 从生成序列中读取数据作为数据源 */ public static void generateSequence() throws Exception { // 设置并行度,默认为CPU核心数,这里设置为1可以防止输出乱序 env.setParallelism(1); // 生成1-10的序列并输出 env.generateSequence(1, 10).print(); // 执行任务 env.execute(); } /** * 从Java.util.Collection集合中读取数据作为数据源 */ public static void fromCollection() throws Exception { ArrayList<String> list = new ArrayList<>(5); list.add("flink"); list.add("scala"); list.add("spark"); list.add("hadoop"); list.add("hive"); env.fromCollection(list).print(); // 执行任务 env.execute(); } /** * 从Java.util.Collection集合中读取数据作为数据源 */ public static void fromElements() throws Exception { env.fromElements("flink", "scala", "spark", "hadoop", "hive").print(); // 执行任务 env.execute(); } /** * 从自定义数据源中读取数据 */ public static void addSource() throws Exception { // 添加自定义数据源并打印读取的数据 env.addSource(new MysqlDataSource()).print(); // 执行任务 env.execute(); } /** * 将结果写入到文本文件中 */ public static void writeAsText() throws Exception { // 从本地文本文件中读取数据 DataStream<String> dataStream = env.readTextFile("FlinkModule/src/main/resources/common/word"); // 设置并行度为1,将结果写入到一个文件中 env.setParallelism(1); // 将结果写入到hdfs中 //dataStream.writeAsText("hdfs://master:9000/words.txt"); // 将结果写到本地文本文件中 dataStream.writeAsText("FlinkModule/src/main/resources/stream/words.txt"); // 执行任务 env.execute(); } /** * 将结果写入到csv文件中 * 注意:将结果写入到csv只支持元组类型的数据 */ public static void writeAsCsv() throws Exception { // 从本地文本文件中读取数据 DataStream<String> dataStream = env.readTextFile("FlinkModule/src/main/resources/common/word"); // 设置并行度 env.setParallelism(1); // 将单词转化成元组 dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception { String[] words = s.split("\t"); for(String word: words){ collector.collect(new Tuple2<>(word, 1)); } } }).writeAsCsv("FlinkModule/src/main/resources/stream/words.csv"); // 执行任务 env.execute(); } /** * 将结果写入到socket套接字中 */ public static void writeToSocket() throws Exception { // 从本地文本文件中读取数据 DataStream<String> dataStream = env.readTextFile("FlinkModule/src/main/resources/common/word"); // 将结果写入到socket套接字中,以简单字符串类型发送 dataStream.writeToSocket("localhost", 9999, new SimpleStringSchema()); // 执行任务 env.execute(); } /** * 将结果写入自定义数据沉槽 */ public static void addSink() throws Exception { // 从本地文本文件中读取数据 DataStream<String> dataStream = env.readTextFile("FlinkModule/src/main/resources/common/word"); // 将单词形成元组并初始化次数为1 dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception { String[] words = s.split("\t"); for(String word: words){ collector.collect(new Tuple2<>(word, 1)); } } // 写入MySql中 }).addSink(new MysqlDataSink()); // 执行任务 env.execute(); } public static void main(String[] args) throws Exception { socketTextStream(); readTextFile(); generateSequence(); fromCollection(); fromElements(); addSource(); writeAsText(); writeAsCsv(); writeToSocket(); addSink(); } }
如有错误,望指正!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。