当前位置:   article > 正文

flink1.10.1--java 版--尚硅谷1-3简介/入门/安装/提交任务_flink1.10的scala依赖

flink1.10的scala依赖

一.flink简介

1.传统数据处理架构

事务处理:java 后端->数据库

在这里插入图片描述
分析处理:离线数仓

在这里插入图片描述
有状态的流式处理:实时处理
在这里插入图片描述

流处理的演变:第二代流失处理架构(lambda)
在这里插入图片描述

2.Flink 的主要特点

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

3.Flink vs Spark Streaming

延迟:flink 毫秒级延迟,sparkStreaming 秒级延迟
架构:flink 真正的流,sparkStreaming 微批

在这里插入图片描述
flink没有Stage的概念,每个个节点的计算不需要等待。
在这里插入图片描述

4.WordCount

dataSet

dataSet是数据集不是数据流
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

package com.atguigu.wc;/**
 * Copyright (c) 2018-2028 尚硅谷 All Rights Reserved
 * <p>
 * Project: FlinkTutorial
 * Package: com.atguigu.wc
 * Version: 1.0
 * <p>
 * Created by wushengran on 2020/11/6 11:22
 */

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.util.Collector;

/**
 * @ClassName: WordCount
 * @Description:
 * @Author: wushengran on 2020/11/6 11:22
 * @Version: 1.0
 */

// 批处理word count
public class WordCount {
    public static void main(String[] args) throws Exception{
        // 创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //data/sensor.txt
        // 从文件中读取数据
       // String inputPath = "D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\hello.txt";
        String inputPath = "data/hello.txt";
        DataSet<String> inputDataSet = env.readTextFile(inputPath);

        // 对数据集进行处理,按空格分词展开,转换成(word, 1)二元组进行统计
        DataSet<Tuple2<String, Integer>> resultSet = inputDataSet.flatMap(new MyFlatMapper())
                .groupBy(0)    // 按照第一个位置的word分组
                .sum(1);    // 将第二个位置上的数据求和

        resultSet.print();
    }

    // 自定义类,实现FlatMapFunction接口
    public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
            // 按空格分词
            String[] words = value.split(" ");
            // 遍历所有word,包成二元组输出
            for (String word : words) {
                out.collect(new Tuple2<>(word, 1));
            }
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58

在这里插入图片描述

DataStream

流式处理API

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

package com.atguigu.wc;/**
 * Copyright (c) 2018-2028 尚硅谷 All Rights Reserved
 * <p>
 * Project: FlinkTutorial
 * Package: com.atguigu.wc
 * Version: 1.0
 * <p>
 * Created by wushengran on 2020/11/6 11:48
 */

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.net.URL;

/**
 * @ClassName: StreamWordCount
 * @Description:
 * @Author: wushengran on 2020/11/6 11:48
 * @Version: 1.0
 */
public class StreamWordCount {
    public static void main(String[] args) throws Exception{
        // 创建流处理执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.setParallelism(1);
//        env.disableOperatorChaining();

//        // 从文件中读取数据
//        String inputPath = "D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\hello.txt";
//        DataStream<String> inputDataStream = env.readTextFile(inputPath);

        // 用parameter tool工具从程序启动参数中提取配置项
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String host = parameterTool.get("host");
        int port = parameterTool.getInt("port");

        // 从socket文本流读取数据
        DataStream<String> inputDataStream = env.socketTextStream(host, port);


        // 基于数据流进行转换计算
        DataStream<Tuple2<String, Integer>> resultStream = inputDataStream.flatMap(new WordCount.MyFlatMapper()).slotSharingGroup("green")
                .keyBy(0)
                .sum(1).setParallelism(2).slotSharingGroup("red");

        resultStream.print().setParallelism(1);

        // 执行任务
        env.execute();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

在这里插入图片描述

在这里插入图片描述

nc -lk 7777
  • 1

在这里插入图片描述

第三章 Flink 部署

flink下载
https://flink.apache.org/zh/
在这里插入图片描述

在这里插入图片描述
flink 1.10之前的版本都包含了对hadoop的依赖
在这里插入图片描述
flink1.10之后的版本flink的安装包和hadoop的依赖分离开来了
在这里插入图片描述

在这里插入图片描述

在这里插入图片描述
flink/conf/flink-conf.yaml的主要内容
在这里插入图片描述
在这里插入图片描述

3.1 Standalone 模式

3.1.1 安装

解压缩 flink-1.10.1-bin-scala_2.12.tgz,进入 conf 目录中。
1)修改 flink/conf/flink-conf.yaml 文件:
在这里插入图片描述
在这里插入图片描述

3)分发给另外两台机子:
在这里插入图片描述
4)启动 :
在这里插入图片描述
在这里插入图片描述
flink/conf/flink-conf.yaml
在这里插入图片描述

访问 http://localhost:8081 可以对 flink 集群和任务进行监控管理。
在这里插入图片描述
在这里插入图片描述

3.1.2 提交任务

web页面提交任务

在这里插入图片描述
task任务需要的taskSloat 一般等于执行任务需要最大一个并行度
在这里插入图片描述
在这里插入图片描述

命令提交任务

  1. 准备数据文件(如果需要)
    在这里插入图片描述

  2. 把含数据文件的文件夹,分发到 taskmanage 机器
    在这里插入图片描述
    如 果 从 文 件 中 读 取 数 据 , 由 于 是 从 本 地 磁 盘 读 取 , 实 际 任 务 会 被 分 发 到
    taskmanage 的机器中,所以要把目标文件分发

  3. 执行程序
    在这里插入图片描述
    在这里插入图片描述

./flink run -p 2  -c com.atguigu.wc.StreamWordCount  /root/ysw/flink/jar/FlinkTutorial-1.0-SNAPSHOT.jar --host kafka1 --port 6666
  • 1

在这里插入图片描述

在这里插入图片描述
4) 查看计算结果
注意:如果输出到控制台,应该在 taskmanager 下查看;如果计算结果输出到文
件,同样会保存到 taskmanage 的机器下,不会在 jobmanage 下。

  1. 在 webui 控制台查看计算过程
    在这里插入图片描述
    在这里插入图片描述

3.2 Yarn 模式

现在流行的资源管理平台主要是yarn和k8s
在这里插入图片描述

3.2.1 Flink on Yarn

在这里插入图片描述

3.2.2 Session Cluster

  1. 启动 hadoop 集群(略)
  2. 启动 yarn-session
/yarn-session.sh -n 2 -s 2 -jm 1024 -tm 1024 -nm test -d
  • 1

在这里插入图片描述
3) 执行任务

./flink run -c com.atguigu.wc.StreamWordCount FlinkTutorial-1.0-SNAPSHOT-jar-with-dependencies.jar --host lcoalhost –port 7777
  • 1
  1. 去 yarn 控制台查看任务状态
    在这里插入图片描述
  2. 取消 yarn-session
yarn application --kill application_1577588252906_0001
  • 1

3.2.2 Per Job Cluster

  1. 启动 hadoop 集群(略)
  2. 不启动 yarn-session,直接执行 job
./flink run –m yarn-cluster -c com.atguigu.wc.StreamWordCount FlinkTutorial-1.0-SNAPSHOT-jar-with-dependencies.jar --host lcoalhost –port 7777
  • 1

3.3 Kubernetes 部署

在这里插入图片描述

// 启动 jobmanager-service 服务
kubectl create -f jobmanager-service.yaml
// 启动 jobmanager-deployment 服务
kubectl create -f jobmanager-deployment.yaml
// 启动 taskmanager-deployment 服务
kubectl create -f taskmanager-deployment.yaml
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

4)访问 Flink UI 页面
集群启动后,就可以通过 JobManagerServicers 中配置的 WebUI 端口,用浏览器
输入以下 url 来访问 Flink UI 页面了:
http://{JobManagerHost:Port}/api/v1/namespaces/default/services/flink-jobmanage
r:ui/proxy

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/684342
推荐阅读
相关标签
  

闽ICP备14008679号