当前位置:   article > 正文

Flink单机和集群环境部署教程

Flink单机和集群环境部署教程


下面是 Apache Flink 的单机和集群环境部署教程,包含部署过程中的注意事项和一个使用案例。Apache Flink 是一个开源的流处理框架,用于处理实时数据流。

一、Flink 单机环境部署

1. 环境准备

  • 操作系统:Linux(推荐 Ubuntu 20.04 或 CentOS 7)
  • Java:Flink 需要 Java 环境,推荐使用 OpenJDK 8 或 11。
  • Flink:下载并安装 Flink。

2. 安装 Java

Ubuntu 中:

sudo apt update
sudo apt install openjdk-11-jdk
  • 1
  • 2

CentOS 中:

sudo yum install java-11-openjdk
  • 1

验证 Java 安装:

java -version
  • 1

3. 下载并安装 Flink

访问 Flink 官网 下载最新版本的 Flink。

wget https://downloads.apache.org/flink/flink-1.16.2/flink-1.16.2-bin-scala_2.12.tgz
tar -xzvf flink-1.16.2-bin-scala_2.12.tgz
mv flink-1.16.2 /usr/local/flink
  • 1
  • 2
  • 3

4. 配置环境变量

编辑 ~/.bashrc 文件,添加以下内容:

export FLINK_HOME=/usr/local/flink
export PATH=$PATH:$FLINK_HOME/bin
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
  • 1
  • 2
  • 3

应用更改:

source ~/.bashrc
  • 1

5. 配置 Flink

编辑 Flink 的配置文件,位于 $FLINK_HOME/conf

配置 flink-conf.yaml
  • taskmanager.memory.process.size:任务管理器的内存配置。
  • jobmanager.rpc.address:JobManager 的主机地址。
jobmanager.rpc.address: localhost
taskmanager.memory.process.size: 1024m
  • 1
  • 2
配置 masters 文件

在单机模式下,默认配置即可,无需修改。

配置 workers 文件

在单机模式下,默认配置即可,无需修改。

6. 启动 Flink

start-cluster.sh
  • 1

7. 验证 Flink

访问 Flink Dashboard,确保 Flink 正常运行:

注意事项

  • 确保 Java 和 Flink 环境配置正确
  • 检查防火墙设置,确保 Flink 的端口可访问。
  • 内存配置:根据实际需求调整 flink-conf.yaml 中的内存配置。

二、Flink 集群环境部署

1. 环境准备

  • 多台服务器:至少 3 台(1 个 JobManager,2 个 TaskManager)。
  • 网络:确保各节点之间可以互相访问。
  • 操作系统:Linux(Ubuntu 或 CentOS)。
  • Java:在所有节点上安装 Java。
  • Flink:在所有节点上安装 Flink。

2. 配置 SSH 免密码登录

在 JobManager 节点上生成 SSH 密钥:

ssh-keygen -t rsa
  • 1

将公钥复制到所有 TaskManager 节点:

ssh-copy-id user@taskmanager1
ssh-copy-id user@taskmanager2
  • 1
  • 2

3. 安装 Flink

在所有节点上安装 Flink,步骤与单机安装相同。

4. 配置 Flink 集群

配置 flink-conf.yaml

在所有节点上编辑 $FLINK_HOME/conf/flink-conf.yaml,配置如下内容:

jobmanager.rpc.address: master
taskmanager.numberOfTaskSlots: 2
parallelism.default: 1
taskmanager.memory.process.size: 1024m
  • 1
  • 2
  • 3
  • 4
  • jobmanager.rpc.address:设置为 JobManager 的主机名。
  • taskmanager.numberOfTaskSlots:每个 TaskManager 的任务槽数量。
  • parallelism.default:默认的并行度。
配置 masters 文件

在 JobManager 节点上编辑 $FLINK_HOME/conf/masters 文件,添加 JobManager 的主机名:

master:8081
  • 1
配置 workers 文件

在 JobManager 节点上编辑 $FLINK_HOME/conf/workers 文件,添加所有 TaskManager 节点的主机名:

taskmanager1
taskmanager2
  • 1
  • 2

5. 启动 Flink 集群

在 JobManager 节点上启动 Flink 集群:

start-cluster.sh
  • 1

6. 验证集群状态

访问 Flink Dashboard,确保所有节点正常运行:

注意事项

  • 确保 SSH 配置正确,JobManager 节点需要通过 SSH 无密码访问 TaskManager 节点。
  • 内存和资源配置:根据实际需求调整 flink-conf.yaml 中的资源配置。
  • 时钟同步:使用 ntpdchrony 确保所有节点的时钟同步。
  • 网络配置:确保各节点之间的网络连接正常。

三、Flink 使用案例:Word Count

使用 Java 实现 Flink Word Count

1. 编写 Java 程序
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.core.fs.FileSystem;

import java.util.Arrays;

public class WordCount {

    public static void main(String[] args) throws Exception {
        // 获取运行参数
        final ParameterTool params = ParameterTool.fromArgs(args);

        // 设置执行环境
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 从输入文件中读取数据
        env.readTextFile(params.get("input"))
                // 拆分成单词
                .flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {
                    Arrays.stream(value.split("\\W+"))
                          .forEach(word -> out.collect(new Tuple2<>(word, 1)));
                })
                .returns(TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class))
                // 按单词分组
                .groupBy(0)
                // 统计每个单词的出现次数
                .sum(1)
                // 将结果保存到输出文件
                .writeAsCsv(params.get("output"), FileSystem.WriteMode.OVERWRITE);

        // 执行程序
        env.execute("Word Count Example");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
2. 编译并打包 Java 程序

确保你已经安装了 Maven,并在 pom.xml 文件中配置了 Flink 依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.16.2</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.12</artifactId>
    <version>1.16.2</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

编译并打包:

mvn clean package
  • 1
3. 运行 Java 程序

将输入文件上传到 HDFS 或本地文件系统:

hdfs dfs -put localfile.txt /user/flink/input
  • 1

运行 Flink 应用:

flink run -c com.example.WordCount target/wordcount-1.0-SNAPSHOT.jar --input /user/flink/input --output /user/flink/output
  • 1
4. 查看结果
hdfs dfs -cat /user/flink/output/part-00000
  • 1

使用 Python 实现 Flink Word Count

可以使用 Apache Beam 的 Python API 来实现 Flink 版的 Word Count。

1. 安装 Apache Beam
pip install apache-beam
  • 1
2. 编写 Python 程序

创建 wordcount.py

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

def run():
    # 创建 PipelineOptions
    options = PipelineOptions(
        runner='FlinkRunner',
        flink_master='localhost:8081',
        environment_type='LOOPBACK'
    )

    # 创建管道
    with beam.Pipeline(options=options) as p:
        # 读取输入文件
        (p
         | 'Read' >> beam.io.ReadFromText('input.txt')
         | 'Split' >> beam.FlatMap(lambda x: x.split())
         | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
         | 'GroupAndSum' >> beam.CombinePerKey(sum)
         | 'Write' >> beam.io.WriteToText('output.txt')
        )

if __name__ == '__main__':
    run()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
3. 运行 Python 程序


python wordcount.py
  • 1
  • 2
  • 3
4. 查看结果
cat output.txt-00000-of-00001
  • 1

总结

通过以上步骤,我们成功部署了 Flink 单机和集群环境,并实现了一个简单的 Word Count 应用。Flink 提供了强大的流处理和批处理能力,可以在多种场景下处理实时数据。

部署过程中的注意事项

  • 确保 Java 和 Flink 环境配置正确
  • 检查网络配置,确保各节点之间的通信正常。
  • 合理分配资源,根据集群规模和任务需求调整 Flink 的资源配置。
  • 时钟同步:使用 ntpdchrony 确保所有节点的时钟同步。
  • 监控集群状态,定期检查节点状态和资源使用情况,及时处理故障节点。

通过合理配置和优化,Flink 可以在多种场景下提供高效的流处理和批处理能力,为大规模数据处理提供强有力的支持。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/秋刀鱼在做梦/article/detail/969746
推荐阅读
相关标签
  

闽ICP备14008679号