赞
踩
在Flink中,实现从指定主机名和端口接收字符串消息,对接收到的字符串中出现的各个单词,每隔1秒钟就输出最近5秒内出现的各个单词的统计次数。
代码实现如下:
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
/** 需要连接的主机名和端口 */
final String hostname;
final int port;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
hostname = params.get("hostname");
port = params.getInt("port");
} catch (Exception e) {
e.printStackTrace();
System.err.println("Please run 'SocketWindowWordCount --host <host> --port <port>'");
return;
}
/** 获取{@link StreamExecutionEnvironment}的具体实现的实例 */
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/** 通过连接给定地址和端口, 获取数据流的数据源 */
DataStream<String> text = env.socketTextStream(hostname, port);
/** 对数据流中的数据进行解析、分组、窗口、以及聚合 */
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
/** 打印出分析结果 */
windowCounts.print();
/** 执行处理程序 */
env.execute("Socket Window WordCount");
}
/** 单词和统计次数的数据结构 */
public static class WordWithCount {
public String word;
public long count;
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}
对于上述实现,接下来要分析的内容有:
1)如何创建从指定host和port接收数据的数据源;
2)如何对创建好的数据源进行一系列操作来实现所需功能;
3)如何将分析结果打印出来。
在Flink中,数据源的构建是通过StreamExecutionEnviroment的具体实现的实例来构建的,如上述代码中的这句代码。
DataStream<String> text = env.socketTextStream(hostname, port);
这句代码就在指定的host和port上构建了一个接受网络数据的数据源,接下来看其内部如何实现的。
public DataStreamSource<String> socketTextStream(String hostname, int port) {
return socketTextStream(hostname, port, "\n");
}
public DataStreamSource<String> socketTextStream(String hostname, int port, String delimiter) {
return socketTextStream(hostname, port, delimiter, 0);
}
public DataStreamSource<String> socketTextStream(String hostname, int port, String delimiter, long maxRetry) {
return addSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry),
"Socket Stream");
}
通过对socketTextStream的重载方法的依次调用,可以看到会根据传入的hostname、port,以及默认的行分隔符”\n”,和最大尝试次数0,构造一个SocketTextStreamFunction实例,并采用默认的数据源节点名称为”Socket Stream”。
SocketTextStreamFunction的类继承图如下所示,可以看出其是SourceFunction的一个子类,而SourceFunction是Flink中数据源的基础接口。
SourceFunction的定义如下:
@Public
public interface SourceFunction<T> extends Function, Serializable {
void run(SourceContext<T> ctx) throws Exception;
void cancel();
@Public
interface SourceContext<T> {
void collect(T element);
@PublicEvolving
void collectWithTimestamp(T element, long timestamp);
@PublicEvolving
void emitWatermark(Watermark mark);
@PublicEvolving
void markAsTemporarilyIdle();
Object getCheckpointLock();
void close();
}
}
从定义中可以看出,其主要有两个方法run和cancel。
run(SourceContex)方法,就是实现数据获取逻辑的地方,并可以通过传入的参数ctx进行向下游节点的数据转发。
cancel()方法,则是用来取消数据源的数据产生,一般在run方法中,会存在一个循环来持续产生数据,而cancel方法则可以使得该循环终止。
其内部接口SourceContex则是用来进行数据发送的接口。了解了SourceFunction这个接口的功能后,来看下SocketTextStreamFunction的具体实现,也就是主要看其run方法的具体实现。
public void run(SourceContext<String> ctx) throws Exception {
final StringBuilder buffer = new StringBuilder();
long attempt = 0;
/** 这里是第一层循环,只要当前处于运行状态,该循环就不会退出,会一直循环 */
while (isRunning) {
try (Socket socket = new Socket()) {
/** 对指定的hostname和port,建立Socket连接,并构建一个BufferedReader,用来从Socket中读取数据 */
currentSocket = socket;
LOG.info("Connecting to server socket " + hostname + ':' + port);
socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME);
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
char[] cbuf = new char[8192];
int bytesRead;
/** 这里是第二层循环,对运行状态进行了双重校验,同时对从Socket中读取的字节数进行判断 */
while (isRunning && (bytesRead = reader.read(cbuf)) !&
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。