当前位置:   article > 正文

Flink源码系列——Flink中一个简单的数据处理功能的实现过程_sockettextstreamfunction

sockettextstreamfunction

在Flink中,实现从指定主机名和端口接收字符串消息,对接收到的字符串中出现的各个单词,每隔1秒钟就输出最近5秒内出现的各个单词的统计次数。


代码实现如下:

public class SocketWindowWordCount {
   
    public static void main(String[] args) throws Exception {
        /** 需要连接的主机名和端口 */
        final String hostname;
        final int port;
        try {
            final ParameterTool params = ParameterTool.fromArgs(args);
            hostname = params.get("hostname");
            port = params.getInt("port");
        } catch (Exception e) {
            e.printStackTrace();
            System.err.println("Please run 'SocketWindowWordCount --host <host> --port <port>'");
            return;
        }

        /** 获取{@link StreamExecutionEnvironment}的具体实现的实例 */
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        /** 通过连接给定地址和端口, 获取数据流的数据源 */
        DataStream<String> text = env.socketTextStream(hostname, port);

        /** 对数据流中的数据进行解析、分组、窗口、以及聚合 */
        DataStream<WordWithCount> windowCounts = text
                .flatMap(new FlatMapFunction<String, WordWithCount>() {
                    @Override
                    public void flatMap(String value, Collector<WordWithCount> out) {
                        for (String word : value.split("\\s")) {
                            out.collect(new WordWithCount(word, 1L));
                        }
                    }
                })
                .keyBy("word")
                .timeWindow(Time.seconds(5), Time.seconds(1))
                .reduce(new ReduceFunction<WordWithCount>() {
                    @Override
                    public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                        return new WordWithCount(a.word, a.count + b.count);
                    }
                });

        /** 打印出分析结果 */
        windowCounts.print();

        /** 执行处理程序 */
        env.execute("Socket Window WordCount");
    }

    /** 单词和统计次数的数据结构 */
    public static class WordWithCount {
   
        public String word;
        public long count;

        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return word + " : " + count;
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
对于上述实现,接下来要分析的内容有:
1)如何创建从指定host和port接收数据的数据源;
2)如何对创建好的数据源进行一系列操作来实现所需功能;
3)如何将分析结果打印出来。
  • 1
  • 2
  • 3
  • 4

1、构建数据源

在Flink中,数据源的构建是通过StreamExecutionEnviroment的具体实现的实例来构建的,如上述代码中的这句代码。

DataStream<String> text = env.socketTextStream(hostname, port);
  • 1

这句代码就在指定的host和port上构建了一个接受网络数据的数据源,接下来看其内部如何实现的。

public DataStreamSource<String> socketTextStream(String hostname, int port) {
   return socketTextStream(hostname, port, "\n");
}

public DataStreamSource<String> socketTextStream(String hostname, int port, String delimiter) {
   return socketTextStream(hostname, port, delimiter, 0);
}

public DataStreamSource<String> socketTextStream(String hostname, int port, String delimiter, long maxRetry) {
   return addSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry),
         "Socket Stream");
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

通过对socketTextStream的重载方法的依次调用,可以看到会根据传入的hostname、port,以及默认的行分隔符”\n”,和最大尝试次数0,构造一个SocketTextStreamFunction实例,并采用默认的数据源节点名称为”Socket Stream”。

SocketTextStreamFunction的类继承图如下所示,可以看出其是SourceFunction的一个子类,而SourceFunction是Flink中数据源的基础接口。
这里写图片描述

SourceFunction的定义如下:

@Public
public interface SourceFunction<T> extends Function, Serializable {
   
   void run(SourceContext<T> ctx) throws Exception;
   void cancel();
   @Public
   interface SourceContext<T> {
   
      void collect(T element);
      @PublicEvolving
      void collectWithTimestamp(T element, long timestamp);
      @PublicEvolving
      void emitWatermark(Watermark mark);
      @PublicEvolving
      void markAsTemporarilyIdle();
      Object getCheckpointLock();
      void close();
   }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

从定义中可以看出,其主要有两个方法run和cancel。

run(SourceContex)方法,就是实现数据获取逻辑的地方,并可以通过传入的参数ctx进行向下游节点的数据转发。
cancel()方法,则是用来取消数据源的数据产生,一般在run方法中,会存在一个循环来持续产生数据,而cancel方法则可以使得该循环终止。

其内部接口SourceContex则是用来进行数据发送的接口。了解了SourceFunction这个接口的功能后,来看下SocketTextStreamFunction的具体实现,也就是主要看其run方法的具体实现。

public void run(SourceContext<String> ctx) throws Exception {
   final StringBuilder buffer = new StringBuilder();
   long attempt = 0;
   /** 这里是第一层循环,只要当前处于运行状态,该循环就不会退出,会一直循环 */
   while (isRunning) {
      try (Socket socket = new Socket()) {
         /** 对指定的hostname和port,建立Socket连接,并构建一个BufferedReader,用来从Socket中读取数据 */
         currentSocket = socket;
         LOG.info("Connecting to server socket " + hostname + ':' + port);
         socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME);
         BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
         char[] cbuf = new char[8192];
         int bytesRead;
         /** 这里是第二层循环,对运行状态进行了双重校验,同时对从Socket中读取的字节数进行判断 */
         while (isRunning && (bytesRead = reader.read(cbuf)) !&
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/人工智能uu/article/detail/794403
推荐阅读
相关标签
  

闽ICP备14008679号