当前位置:   article > 正文

大数据之flink入门安装_flink安装教程

flink安装教程

Apache Flink 是一个分布式大数据处理引擎,不负责存储,可对有限数据流和无限数据流进行有状态计算。可部署在各种集群环境,对各种大小的数据规模进行快速计算。

在这里插入图片描述

一、flink介绍

1、特点

	批流统一
	支持高吞吐、低延迟、高性能的流处
	支持带有事件时间的窗口(Window)操作
	支持有状态计算的Exactly-once语义
	支持高度灵活的窗口(Window)操作,支持基于time、count、session窗口操作
	支持具有Backpressure功能的持续流模型
	支持基于轻量级分布式快照(Snapshot)实现的容错
	支持迭代计算
	Flink在JVM内部实现了自己的内存管理
	支持程序自动优化:避免特定情况下Shuffle、排序等昂贵操作,中间结果有必要进行缓存
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

2、和别的框架对比
在这里插入图片描述

3、flink角色

 JobManager:

也称之为Master,用于协调分布式执行,它用来调度task,协调检查点,协调失败时恢复等。Flink运行时至少存在一个master,如果配置高可用模式则会存在多个master,它们其中有一个是leader,而其他的都是standby。

 TaskManager:

也称之为Worker,用于执行一个dataflow的task、数据缓冲和Data Streams的数据交换,Flink运行时至少会存在一个TaskManager。JobManager和TaskManager可以直接运行在物理机上,或者运行YARN这样的资源调度框架,TaskManager通过网络连接到JobManager,通过RPC通信告知自身的可用性进而获得任务分配。

4、模式

local模式、standalone模式、onyarn模式

二、flink安装

1、上传解压安装包

下载地址:https://flink.apache.org/downloads.html

tar -xvf flink-1.9.1-bin-scala_2.11.tgz -C /bigdata/
  • 1

2、修改配置文件

vim flink-conf.yaml

   #指定jobmanager的地址
   jobmanager.rpc.address: hadoop01
   #指定taskmanager的可用槽位的数量
   taskmanager.numberOfTaskSlots: 2
  • 1
  • 2
  • 3
  • 4

vim slaves

   hadoop02
   hadoop03
  • 1
  • 2

3、拷贝文件

   scp -r /bigdata/flink-1.9.1/ hadoop02:/bigdata/flink-1.9.1/
   scp -r /bigdata/flink-1.9.1/ hadoop03:/bigdata/flink-1.9.1/
  • 1
  • 2

4、启动

   bin/start-cluster.sh
  • 1

5、访问

   http://192.168.1.103:8081/#/overview
  • 1

4、模板初始化

mvn archetype:generate \
 -DarchetypeGroupId=org.apache.flink \
 -DarchetypeArtifactId=flink-quickstart-java \
 -DarchetypeVersion=1.9.1 \
 -DgroupId=dashuju.flink \
 -DartifactId=flink-java \
 -Dversion=1.0 \
 -Dpackage=dashuju.flink \
-DinteractiveMode=false
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

② 或者在命令行中执行下面的命令,需要有网络

curl https://flink.apache.org/q/quickstart.sh | bash -s 1.9.1
  • 1

三、java版本wordcount(匿名内部类)

package cn._51doit.flink.day01;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class StreamWordCount {

    public static void main(String[] args) throws Exception {

        //创建一个flink stream 程序的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        //使用StreamExecutionEnvironment创建DataStream
        //Source
        DataStream<String> lines = env.socketTextStream(args[0], Integer.parseInt(args[1]));
        
        //Transformation开始  
        //调用DataStream上的方法Transformation(s)
        SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
        @Override
        public void flatMap(String line, Collector<String> out) throws Exception {
                //切分
                String[] words = line.split(" ");
                for (String word : words) {
                    //输出
                    out.collect(word);
                }
            }
        });
        
        //将单词和一组合
       SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
           public Tuple2<String, Integer> map(String word) throws Exception {
               return Tuple2.of(word, 1);
            }
        });

        SingleOutputStreamOperator<Tuple2<String, Integer>> summed = wordAndOne.keyBy(0).sum(1);
        //Transformation结束
        // 调用Sink (Sink必须调用)
        summed.print();
        //启动
        env.execute("StreamWordCount");

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

四、java版本wordcount(Lambda)

package cn._51doit.flink.day01;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;

public class LambdaStreamWordCount {

    public static void main(String[] args) throws Exception {
        //创建一个flink stream 程序的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        //使用StreamExecutionEnvironment创建DataStream
        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
        
        //调用DataStream上的方法Transformation(s)
        SingleOutputStreamOperator<String> word = lines.flatMap((String line, Collector<String> out) ->
                Arrays.stream(line.split(" ")).forEach(out::collect)).returns(Types.STRING);
                
        //将单词和一组合
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = word.map(w -> Tuple2.of(w, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));

        SingleOutputStreamOperator<Tuple2<String, Integer>> summed = wordAndOne.keyBy(0).sum(1);
        // 调用Sink (Sink必须调用)
        summed.print();
       //执行
        env.execute("LambdaStreamWordCount");

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

五、scala版本wordcount

package cn._51doit.flink.day01

import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala._

object StreamWordCount {

  def main(args: Array[String]): Unit = {

    //创建flink执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
	//flink通过env创建抽象的数据集(Datastream)
    //Source
    val lines: DataStream[String] = env.socketTextStream("localhost", 8888)
    //Transformation开始
	//切分压平,需要导入隐式转换
    val words: DataStream[String] = lines.flatMap(_.split(" "))
	//将单词和1组合
    val wordAndOne: DataStream[(String, Int)] = words.map((_, 1))
	//分组聚合
    val keyed: KeyedStream[(String, Int), Tuple] = wordAndOne.keyBy(0)
	//聚合
    val summed: DataStream[(String, Int)] = keyed.sum(1)
    //Transformation结束
    //调用Sink
    summed.print()
    //执行
    env.execute("StreamWordCount")
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

六、提交任务

方式一:web提交

在这里插入图片描述

方式二:命令提交

/bigdata/flink-1.9.1/bin/flink run -m hadoop01:8081 -p 4 -c cn._51doit.flink.StreamingWordCount /root/hello-flink-java-1.0.jar hadoop01 8888
  • 1

参数说明:
-m指定主机名后面的端口为JobManager的REST的端口,而不是RPC的端口,RPC通信端口是6123
-p 指定是并行度
-c 指定main方法的全类名

7、任务终止

在这里插入图片描述
通过saveCheckpoint,重新提交

在这里插入图片描述
或者使用代码

//任务流取消和故障时会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  • 1
  • 2

在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/570053
推荐阅读
相关标签
  

闽ICP备14008679号