赞
踩
前言:今天是学习 flink 的第七天啦!学习了 flink 中 sink(数据槽) 部分知识点,这一部分只要是解决数据处理之后,数据到哪里去的问题,我觉得 flink 知识点虽然比较难理解,但是代码跑通后,逻辑还是比较有趣的!
Tips:毛爷爷说过:“宜将剩勇追穷寇,不可沽名学霸王!”明天周日除了复习前面知识点之外,也要继续努力学习接下来的知识点,继续加油!
打印是最简单的一个Sink,通常是用来做实验和测试时使用。
实例:socket 数据源,查看进程编号最终输出 sink 之 print 打印
package cn.itcast.day06.sink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
/**
* @author lql
* @time 2024-02-17 22:27:48
* @description TODO:print
*/
public class PrintSinkDemo {
public static void main(String[] args) throws Exception {
//local模式默认的并行度是当前机器的逻辑核的数量
Configuration configuration = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
int parallelism0 = env.getParallelism();
System.out.println("执行环境默认的并行度:" + parallelism0);
// socket 数据源
DataStreamSource<String> lines = env.socketTextStream("node1", 9999);
// 获取 lines 数据源并行度
int parallelism = lines.getParallelism();
System.out.println("SocketSource的并行度:" + parallelism);
lines.print();
lines.addSink(new MyPrintSink()).name("my-print-sink");
env.execute();
}
private static class MyPrintSink extends RichSinkFunction<String> {
// 这一处定义很重要,不然 indexOfThisSubtask 只能在一个方法中使用!
private int indexOfThisSubtask;
@Override
public void open(Configuration parameters) throws Exception {
int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
}
@Override
public void invoke(String value, Context context) throws Exception {
System.out.println(indexOfThisSubtask + 1 + "> " + value);
}
}
}
结果:
执行环境默认的并行度:8
SocketSource的并行度:1
6> hadoop
1> hadoop
1> hadoop
7> hadoop
总结:
该方法是将数据以文本格式实时的写入到指定的目录中,本质上使用的是 TextOutputFormat 格式写入的。
实例:socket 数据源,将数据输出到文本 Text 中
package cn.itcast.day06.sink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author lql
* @time 2024-02-17 22:40:48
* @description TODO:writeAsText
*/
public class WriteSinkDemo {
public static void main(String[] args) throws Exception {
//local模式默认的并行度是当前机器的逻辑核的数量
Configuration configuration = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
int parallelism0 = env.getParallelism();
System.out.println("执行环境默认的并行度:" + parallelism0);
DataStreamSource<String> lines = env.socketTextStream("node1", 9999);
//获取DataStream的并行度
int parallelism = lines.getParallelism();
System.out.println("SocketSource的并行度:" + parallelism);
lines.writeAsText("D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\output",FileSystem.WriteMode.OVERWRITE);
env.execute();
}
}
结果:
output 文件夹下出现以数字命名的文件
内容为 socket 数据源输出,加上了 \n 换行符
目录中的文件名称是该 Sink 所在 subtask 的 Index + 1
总结:
该方法是将数据以 csv 格式写入到指定的目录中,本质上使用的是 CsvOutputFormat 格式写入的。
实例:socket 数据源,将数据输出到文本 csv 中
package cn.itcast.day06.sink;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.core.fs.FileSystem;
/**
* @author lql
* @time 2024-02-17 22:52:12
* @description TODO:将DataSet数据写入到csv文件中
*/
public class CsvSink {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//需先建立文件
String filePath = "D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\output\\user.csv";
//添加数据
Tuple7<Integer, String, Integer, Integer, String, String, Long> row = new Tuple7<>(15, "zhangsan", 40, 1, "CN", "2020-09-08 00:00:00", 1599494400000L);
//转换为dataSet,利用 数据源中 fromElements 可以接受 [列表或元组] 的属性
DataSource<Tuple7<Integer, String, Integer, Integer, String, String, Long>> dataSet = (DataSource<Tuple7<Integer, String, Integer, Integer, String, String, Long>>) env.fromElements(row);
//将内容写入到File中,如果文件已存在,将会被复盖
dataSet.writeAsCsv(filePath,FileSystem.WriteMode.OVERWRITE).setParallelism(1);
env.execute();
}
}
结果:
在指定文件中,生成了 csv 数据
总结:
该方法是将数据已指定的格式写入到指定目录中
package cn.itcast.day06.sink;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.io.TextOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author lql
* @time 2024-02-17 23:03:24
* @description TODO:将数据已指定的格式写入到指定目录中
*/
public class writeUsingOutputFormatSink {
public static void main(String[] args) throws Exception {
//1:获取流处理运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//调用env的fromElements创建一个非并行的DataStreamSource
DataStreamSource<String> words = env.fromElements(
"hadoop","spark","flink","hbase","flink","spark"
);
// 对拆分后的单词,每个单词记一次数
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return Tuple2.of(word, 1);
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne.keyBy(0).sum(1);
result.writeUsingOutputFormat(new TextOutputFormat<>(new Path("D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\output\\wordcount")));
env.execute();
}
}
结果:
在指定目录下,生成 n(电脑并行度数量) 个文本文件
总结:
该方法是将数据输出到指定的Socket网络地址端口。
实例:socket 数据源,node1:9999 写数据到 node1:8888
package cn.itcast.day06.sink;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author lql
* @time 2024-02-17 23:12:03
* @description TODO:writeToSocket
*/
public class WriteToSocketDemo {
public static void main(String[] args) throws Exception {
//local模式默认的并行度是当前机器的逻辑核的数量
Configuration configuration = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
int parallelism0 = env.getParallelism();
System.out.println("执行环境默认的并行度:" + parallelism0);
DataStreamSource<String> lines = env.socketTextStream("node1", 9999);
//获取DataStream的并行度
int parallelism = lines.getParallelism();
System.out.println("SocketSource的并行度:" + parallelism);
// 第三个参数是数据输出的序列化格式 SerializationSchema
lines.writeToSocket("node1",8888,new SimpleStringSchema());
env.execute();
}
}
结果:
node1:8888 实时接收到 node1:9999 写入的数据
总结:
数据分类集合输出
实例:数据打印输出,error 输出,可以输出到:Stdout,Stderr,采集为本地集合
package cn.itcast.day06.sink;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author lql
* @time 2024-02-17 23:18:12
* @description TODO:数据可以输出到:Stdout,Stderr,采集为本地集合
*/
public class CollectionDemo {
public static void main(String[] args) throws Exception {
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Tuple2<Integer, String>> dataSource = env.fromElements(
Tuple2.of(1, "zhangsan"),
Tuple2.of(2, "lisi"),
Tuple2.of(3, "wangwu"),
Tuple2.of(4, "zhaoliu")
);
//2.sink
dataSource.print();
dataSource.printToErr();
env.execute();
}
}
结果:
黑色字体输出:
6> (3,wangwu)
7> (4,zhaoliu)
4> (1,zhangsan)
5> (2,lisi)
红色字体输出:
8> (3,wangwu)
7> (2,lisi)
1> (4,zhaoliu)
6> (1,zhangsan)
总结:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。