当前位置:   article > 正文

Apache Flink 1.12.1入门教程

Apache Flink 1.12.1入门教程

一、一个简单的单词统计程序

首先,创建一个 Maven 项目,在pom.xml中增加所需的 Flink 依赖:

<dependencies>

    <dependency>

        <groupId>org.apache.flink</groupId>

        <artifactId>flink-java</artifactId>

        <version>1.12.1</version>

    </dependency>

    <dependency>

        <groupId>org.apache.flink</groupId>

        <artifactId>flink-streaming-java_2.11</artifactId>

        <version>1.12.1</version>

    </dependency>

    <dependency>

        <groupId>org.apache.flink</groupId>

        <artifactId>flink-clients_2.11</artifactId>

        <version>1.12.1</version>

    </dependency>

</dependencies>

创建一个WordCount.java文件:

package com.flink;

public class WordCount {
    public static void main(String[] args) throws Exception {

}

}

接着第一步是创建一个执行环境 ExecutionEnvironment类,用来设置参数和创建数据源以及提交任务的操作:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

下一步创建一个数据集DataSet,存放的类型是String类型,并初始化了三个英文句子:

DataSet<String> text = env.fromElements("this a book", "i love china", "i am chinese");

我们把数据再转化一下为Tuple2类型的数据,TupleN: 代表有N个元素,通过查看源码可以看到在flink-core-1.12.1.jar中,N的最大值为25。这里Tuple2第一个元素是String类型,用来存放单词,第二个元素是Integer类型,表示出现次数。goupBy(0)表示按第一个元素分组,sum(1)表示将第二个元素加起来。最后实现一个 flatMap 类来做解析字符串的工作,如下所示:

DataSet<Tuple2<String, Integer>> ds = text.flatMap(new LineSplitter()).groupBy(0).sum(1);

定义一个实现FlatMap函数相关的类来实现解析字符串,把字符串按照空格分割开,然后把每个单词次数计数一次,装配进Colloector,以提供给程序最后分组及计算。

static class LineSplitter implements FlatMapFunction<String, Tuple2<String,Integer>> {

@Override

public void flatMap(String string, Collector<Tuple2<String, Integer>> collector) throws Exception {

       for (String word: string.split(" ")) {

            collector.collect(new Tuple2<String, Integer>(word,1));

         }

       }

    }

完整的程序如下,可以直接执行main函数,在控制台可以看到打印出来的结果。

package com.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.java.DataSet;

import org.apache.flink.api.java.ExecutionEnvironment;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.util.Collector;

public class WordCount {

public static void main(String[] args) throws Exception {

  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

      DataSet<String> text = env.fromElements("a chinese", "china", "i am chinese");

DataSet<Tuple2<String, Integer>> ds=text.flatMap(new LineSplitter()).groupBy(0).sum(1);

  // 输出数据到目的端

  ds.print();

}

static class LineSplitter implements FlatMapFunction<String, Tuple2<String,Integer>> {

 @Override

 public void flatMap(String string, Collector<Tuple2<String, Integer>> collector)            throws Exception {

        for (String word:string.split(" ")) {

           collector.collect(new Tuple2<String, Integer>(word,1));

        }

  }

    }

}

程序运行结果:

二、通过流窗口实现单词统计

此程序连接到服务器Socket读取字符串作为数据源,在这里我们使用netcat工具作为服务器Socket进行测试。

首先定义一个数据流,读取字符串类型的数据,以换行符为一次输入结果:

DataStream<String> text = env.socketTextStream(hostname, port, "\n");

我们定义一个解析类,用来存放解析过程中的结果,变量word存放解析出来的单词,count存放单词的统计个数:

public static class WordWithCount {

  public String word;

  public long count;

}

然后通过flatMap来解析源数据,使用keyBy函数按照WordWithCount 中word的值进行分组,并定义以每隔5秒为一个处理时间窗口,统计5秒内输入的单词的个数,最后使用reduce方法把筛选出相同的单词,把他们的count值相加,然后返回传递给下次调用,完整的代码如下:

package com.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.common.functions.ReduceFunction;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/天景科技苑/article/detail/886425
推荐阅读
相关标签
  

闽ICP备14008679号