当前位置:   article > 正文

flink介绍、安装以及基本使用_flink的安装与使用

flink的安装与使用

flink介绍、安装以及基本使用

简介

flink主要解决大数据流式计算问题,举个例子,我们在一个系统中难免会遇到统计功能,例如淘宝里面每间商品每个月销量,微信里面每月支出收入等,当数据达到一定程度,光靠数据库里的sum group by是不现实的,多则千万条数据,更甚多个表关联后会达到数亿计的临时数据,实时的统计会使得数据库占用大量资源,这个时候就需要流式计算,针对实时的数据变更通过流式计算出统计的数据,相当于每时每刻都在增量的计算统计结果,查询统计时直接读取当前的结果即可。

而flink则提供了这么一个分布式平台,我们只需要使用flink提供的java包里的方法,定义好输入,怎么处理数据,输出结果,然后打包成jar,上传到flink集群,flink自会帮我们解析任务,拆分任务,分配给不同机器处理。

请添加图片描述

优点显而易见:低代码,我们不必自己每次都写一个庞大的程序去处理数据,只需要用它提供的方法定义流程即可;分布式,我们不必纠结一台机器能不能稳定提供计算的能力,它会自动帮我们分布到不同机器运行。

docker 安装flink

docker pull flink:scala_2.12-java8(注意,这里尽量不要用latest,因为默认最新一般都是java11版本,使用java8开发的jar包会启动失败)

启动jobmanager(master节点,管理程序,负责调度job运算)

docker run -d --name jm --network mynet -p 8081:8081 -e JOB_MANAGER_RPC_ADDRESS=jm flink:scala_2.12-java8 jobmanager

启动两个taskmanager(真正运算task的节点)

docker run -d --name tm1 --network mynet -e JOB_MANAGER_RPC_ADDRESS=jm flink:scala_2.12-java8 taskmanager

docker run -d --name tm2 --network mynet -e JOB_MANAGER_RPC_ADDRESS=jm flink:scala_2.12-java8 taskmanager

然后可以通过8081打开管理页面,查看集群状态。

请添加图片描述

下面为了更好模拟正常的集群,我们把每个taskmanager的task slot数改为2(默认是1):

进入容器,修改opt/flink/conf/flink-conf.yaml里的taskmanager.numberOfTaskSlots: 2 (注意,这里如果用-v 映射文件出来会导致容器启动失败,具体原因不明,建议启动容器后通过 cp 命令把修改好的配置覆盖进去)

然后重启两个taskmanager容器即可。

基本概念

我们先举一个简单的场景,有个生产者持续向kafka生产文本,我们想要持续统计出现单词的个数,然后输出到mysql,并且希望有两个节点同时进行统计提高效率。

jobmanager:集群的master节点,如简介所说,它负责解析用户开发的jar包,生成job流程,拆分不同的任务,并且分配去不同slot运行。上面的例子可以拆分成4个子任务,监听kafka数据源,统计A,统计B,输出mysql。

taskmanager:集群真正干活的节点。上面拆分的4个子任务会被安排到taskmanager节点执行。

taskslot:一个taskmanager可以有多个slot,每个slot都有自己独立的内存空间,上面拆分的子任务会分配到slot里执行,因此每个子任务都有自己的内存空间存储临时数据,互不干扰。一般情况下,建议与cpu核心数一样,充分利用cpu运算资源。

operation:算子,运算单元,一个子任务包含一个或多个算子,例如上面的统计单词个数,明显就是一个sum操作,就是一个算子。

请添加图片描述

一个job的过程一般存在4个基本的元素:

environment:获取环境,一般就是指的集群环境。

source:读取数据源,就是上面的从kafka获取数据。

transform:就是算子操作,真正运算逻辑。

sink:输出,上面的输出的mysql。

简单开发:

新建一个maven项目

引入flink相关maven包,引入log4j相关maven包,flink默认使用log4j,这里引入log4j的包只是本地启动调试的需要,不然logger初始化会失败

<dependencies>
        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.36</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-reload4j -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-reload4j</artifactId>
            <version>1.7.36</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-reload4j -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-reload4j</artifactId>
            <version>1.7.36</version>
        </dependency>


        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.17.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>1.17.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>1.17.1</version>
        </dependency>

    </dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

添加log4j.properties。 flink线上会使用自己的log4j配置,这里只是配置本地启动日志。

log4j.rootLogger=warn, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss SSS} [%t] [%c] [%p] - %m%n
  • 1
  • 2
  • 3
  • 4

新建一个文本模拟数据

请添加图片描述

然后新建一个类BatchWordCount 对这个文件进行统计。

package com.example.flink;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class BatchWordCount {
    public static void main(String[] args) throws Exception{
        //获取Environment
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //读取文件到source
        DataSource<String> source = env.readTextFile("input/words.txt");

        //统计source里的数据,每个单词出现次数。
        AggregateOperator<Tuple2<String, Integer>> sum = source.flatMap((String in, Collector<Tuple2<String, Integer>> out) -> {
                    String[] words = in.split(" ");
                    for (String word : words) {
                        out.collect(Tuple2.of(word, 1));
                    }
                }).returns(Types.TUPLE(Types.STRING, Types.INT))
                .groupBy(0)
                .sum(1);

        //sink输出到控制台
        sum.print();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

请添加图片描述

这里只是简单模拟了一个有界数据源(txt文件,数据有限),可以看到代码与上面简介的environment、source、transform、sink一一对应,即使再复杂的流程也离不开这四个步骤。

当然实际情况下,我们更多的是无界流数据源(例如kafka,socket,数据持续输入),因此下面我们新建一个StreamWordCount,通过socket持续等带数据输入。

package com.example.flink;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class StreamWordCount {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> source = env.socketTextStream("localhost",9999);
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = source.flatMap((String in, Collector<Tuple2<String, Integer>> out) -> {
                    String[] words = in.split(" ");
                    for (String word : words) {
                        out.collect(Tuple2.of(word, 1));
                    }
                }).returns(Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(data -> data.f0)
                .sum(1);
        sum.print();
        env.execute();
    }
}
env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

可以看到 env以及source的类变成stream的,我们从9999端口获取数据

下面简单编写一个socketserver程序:

package com.example.flink;

import java.io.IOException;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Scanner;

public class SocketSource {
    public static void main(String[] args) throws IOException {
        ServerSocket server = new ServerSocket(9999);
        Socket socket = server.accept();
        System.out.println("请输入");//提示输入
        PrintWriter pw = new PrintWriter(socket.getOutputStream());
        while (true){
            Scanner scan=new Scanner(System.in);//控制台输入控件
            String str=scan.nextLine();
            if(str.equals("exit")){
                break;
            }
            pw.println(str);
            pw.flush();
        }
        pw.close();
        socket.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

启动server,并且启动StreamWordCount,然后再server输入一些数据

请添加图片描述

可以看到会有相应的输出,当本地启动的时候,flink-client 会模拟一个线上的flink集群,输出左边的数字代表虚拟slot的id,可以看到不同key会被分配到不同的slot运算输出,因为被分配到不同的slot,因此并不能保证运算时的顺序。

有时我们希望可以指定用多少个slot去运算,这时就需要修改并行度,我们可以通过env.setParallelism()全局设置,或者针对特定操作设置。

下面我们修改一下print这个操作的并行度为2,再运行一次看看

请添加图片描述

可以看到我们输出定义的是2并行,所以部分单词会被分配到同一个slot输出。

线上运行

下面需要把这个任务真正放到flink集群,去模拟线上的场景。

我们先启动一个容器当作socketserver。

docker pull busybox

docker run -it --network mynet --name bb busybox

然后在容器里用nc命令监听9999端口

nc -l -p 9999

我们把监听的localhost改成一个真正要监听的地址,这里就是上面busybox容器的名字bb。然后通过maven 把上面的项目 package成jar包。

通过界面上传jar包,并且submit com.example.flink.StreamWordCount 这个启动类,设置默认并行度为4

如下图

请添加图片描述

请添加图片描述

在running job 可以看到,刚刚的job被拆分成3个子任务,source、flat map和agg&sink,因为source是只有一个线程连接bb:9999的socket并且读取数据,并行度只能是1,而其余操作都被设为我设置的4并行,代表有4个slot运行这些操作。因为sink只是输出,不需要运算,而且并行度与agg一样,因此agg和sink被合并到同一个子任务以减少子任务间的数据传输。

这里题外话一下,算子分可合并和不可合并,例如map,flatmap,filter这些只是过滤,转换的操作,因为不需要太多计算,flink默认他们都是可以合并的。而sum,max那些则属于计算量较大,不可合并的aggregation。

下面在busybox容器输入一些文本,可以看到job里面指标一些变化,数据接收以及不同slot的指标等。

我们通过docker logs tm1 和tm2 ,可以看到统计的输出,但是我们发现webui里的stdout并不能看到对应的输出,这是因为docker启动默认是前台启动,只在命令行里输出,没有输出到文件,因此webui里也获取不到,解决方法建议参考Flink docker 容器运行环境下不能够从Web UI 查看 Logs 以及Stdout的解决办法_Allocator的博客-CSDN博客,比较复杂这里就不管了。

实战

下面举一个更贴切实际的例子

某个电商网站有以下订单表,包含订单id(order_id),商品id(product_id),商品名(product_name),店铺id(shop_id)

现在我很关注每个店铺卖《奥特曼》相关产品的总销量是多少,以调控市场。

常规做法是select count(*) from order where product_name like “%奥特曼%” group by shop_id

众所周知,like 前后都模糊匹配的话是不走索引的,当这个商品表到达千万级别,统计出来效率是非常低的。

这个时候我们就可以把binlog变化推送去kafka(例如用canal),然后使用flink流式计算,实时统计这个指标。

接下来我们来模拟一下,把之前的kafka容器启动起来,然后创建一个order topic

下面添加3个maven包

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>1.17.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>3.1.1-1.17</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.29</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

增加OrderCount类:

package com.example.flink;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;

public class OrderCount {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("8.130.136.133:9092")//Kafka服务
                .setTopics("order")//消息主题
                .setGroupId("default")//消费组
                //偏移量 当没有提交偏移量则从最开始开始消费
                .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
                .setValueOnlyDeserializer(new SimpleStringSchema()).build();
        DataStreamSource<String> source = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "order");
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = source.filter(in -> {
                    String[] data = in.split(" ");
                    String name = data[0];
                    String shopId = data[1];
                    return name.contains("奥特曼");
                }).flatMap((String in, Collector<Tuple2<String, Integer>> out) -> {
                    String[] data = in.split(" ");
                    String name = data[0];
                    String shopId = data[1];
                    out.collect(new Tuple2<>(shopId, 1));
                }).returns(Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(data -> data.f0)
                .sum(1);
        sum.addSink(JdbcSink.sink(
                "REPLACE INTO stat (shop_id, count) VALUES (?,?)",
                ((statement,tuple)->{
                    statement.setString(1, tuple.f0);
                    statement.setInt(2, tuple.f1);
                }),
                JdbcExecutionOptions.builder().withBatchSize(1).build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://8.130.136.133:3306/test?characterEncoding=utf8&useSSL=false").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("root").withPassword("123456").build()
        ));
        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

可以看到代码里从kafka监听数据的变化,通过flatmap+keyby+sum实现对shopid的分组统计,最后通过jdbc的sink写入数据库。

然后向kafka写入4条消息

奥特曼之父 A

奥特曼之母 A

巨型奥特曼 B

钢铁侠 A

查看数据库结果

请添加图片描述

可以看到实时统计出来的数据已经被写入mysql,并且过滤了不相关的数据,统计出最终结果。

这个时候我们在业务上就可以直接用这张表查询统计结果,而不必时时刻刻都用sql去聚合统计。

在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/548732
推荐阅读
相关标签
  

闽ICP备14008679号