赞
踩
进群下载配套安装包,无需考虑版本兼容等问题!!!
使用StandAlone模式,需要启动Flink的主节点JobManager以及从节点TaskManager
服务 | node1 | node2 | node3 |
---|---|---|---|
JobManager | 是 | 否 | 否 |
TaskManager | 是 | 是 | 是 |
Flink 的部署模式分为3种:
(1)Application模式
(2)Per-Job模式
(3)Session模式
官方地址:https://svn.apache.org/repos/asf/flink/index.html
tar zxvf flink-1.14.6-bin-scala_2.11.tgz
export FLINK_HOME=/home/environment/flink-1.14.6
export PATH=$FLINK_HOME/bin:$PATH
每个 JobManager 的可用内存值(jobmanager.memory.process.size)
每个 TaskManager 的可用内存值 (taskmanager.memory.process.size
每台机器的可用 CPU 数(taskmanager.numberOfTaskSlots)
集群中所有 CPU 数(parallelism.default)
临时目录(io.tmp.dirs)
bin/start-cluster.sh 和 bin/stop-cluster.sh 依赖 conf/masters 和 conf/workers 来确定集群组件实例的数量
编辑masters文件;进入conf目录:
vim ./flink/conf/masters
#删除默认的改为
node1:8081
配置工作节点
vi workers
# 将其他两台机器的IP一起写进去
node1
node2
node3
配置文件详细描述
#==============================================================================
# 公共配置
#==============================================================================
#运行JobManager的主机的外部地址,可以是TaskManager和任何想要连接的客户端都可以访问。
#此设置仅在独立模式下使用,可能在作业管理器端被覆盖通过指定bin/jobmanager.sh的--host<hostname>参数。
#sh可执行文件。在高可用性模式下,如果使用bin/start集群。sh脚本和设置conf/masters文件,这将自动处理。纱线根据节点的主机名自动配置主机名
# 可访问JobManager的RPC IP地址
#jobmanager.rpc.address: localhost
jobmanager.rpc.address: node1
# 可访问JobManager的RPC端口
jobmanager.rpc.port: 6123
# 请注意,下面几条是JobManager进程中所有内存使用的原因,包括JVM元空间和其他开销
# JobManager的总进程内存大小
jobmanager.memory.process.size: 1600m
# TaskManager的总进程内存大小
taskmanager.memory.process.size: 1728m
# 要排除JVM元空间和开销,请使用Flink总内存大小,而不是“taskmanager.memory.process.size”。
# 不建议同时设置“taskmanager.memory.process”。大小和Flink内存。
# 任务管理器。内存。flink。尺寸:1280m
# 每个TaskManager提供的任务槽数。每个插槽运行一条并行管道
taskmanager.numberOfTaskSlots: 1
# 用于未指定程序的并行性和其他并行性 - 改为3,因为三台机器的集群
parallelism.default: 3
# ==================默认文件系统方案和权限==================
#
# 默认情况下,不带方案的文件路径相对于本地路径进行解释根文件系统“file:///”。使用此选项覆盖默认值并解释相对于不同文件系统的相对路径,例如“hdfs://mynamenode:1234
# fs.default-scheme
#==============================================================================
# 高可用性
#==============================================================================
# 高可用性模式。可能的选项是 'NONE' or 'zookeeper'
high-availability: zookeeper
# 保存主恢复元数据的路径。ZooKeeper存储时检查站和领导人选举的小真相,这个位置存储较大的对象,如持久化数据流图
# 必须是可从所有节点访问的持久文件系统(如HDFS、S3、Ceph、nfs等)
high-availability.storageDir: hdfs://node1:9000/flink/ha/
# 协调高可用性设置的ZooKeeper仲裁对等方列表
# 必须是以下格式的列表:“host1:clientPort,host2:clientPort,…”(默认clientPort:2181)
#high-availability.zookeeper.quorum: localhost:2181
high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181
# ACL options 基于 https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
# 它可以是“创建者”(ZOO_CREATE_ALL_ACL)或“打开”(ZO_open_ACL_UNSAFE)。默认值为“打开”,如果启用了ZK security,则可以更改为“creator”
#high-availability.zookeeper.client.acl: open
#==============================================================================
# 容错和检查点
#==============================================================================
# 如果启用检查点,将用于存储操作员状态检查点的后端。执行时启用检查点。检查点。间隔>0
# 详细信息参阅 CheckpointConfig and ExecutionCheckpointingOptions
#execution.checkpointing.interval: 3min
#execution.checkpointing.externalized-checkpoint-retention: [DELETE_ON_CANCELLATION, RETAIN_ON_CANCELLATION]
#execution.checkpointing.max-concurrent-checkpoints: 1
#execution.checkpointing.min-pause: 0
#execution.checkpointing.mode: [EXACTLY_ONCE, AT_LEAST_ONCE]
#execution.checkpointing.timeout: 10min
#execution.checkpointing.tolerable-failed-checkpoints: 0
#execution.checkpointing.unaligned: false
# 支持的后端是“jobmanager”、“filesystem”、“rocksdb”或<class-name-of-factory>
state.backend: filesystem
# 当使用任何默认绑定状态后端时,检查点文件系统的目录
#state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
state.checkpoints.dir: hdfs://node1:9000/flink-checkpoints
# 保存点的默认目标目录,可选
#state.savepoints.dir: hdfs://namenode-host:port/flink-savepoints
state.checkpoints.dir: hdfs://node1:9000/flink-checkpoints
# 用于启用/禁用支持增量检查点的后端的增量检查点(如RocksDB状态后端)的标志
#state.backend.incremental: false
# 故障转移策略,即作业计算如何从任务失败中恢复。
# 仅重新启动可能受任务失败影响的任务,这通常包括下游任务和可能的上游任务,如果它们生成的数据不再可供使用
jobmanager.execution.failover-strategy: region
#==============================================================================
# Rest & web frontend(REST和Web前端)
#==============================================================================
# REST客户端连接到的端口。如果尚未指定REST.bind-port,则服务器也将绑定到此端口
rest.port: 8081
# REST客户端将连接到的地址
#rest.address: 0.0.0.0
rest.address: node1
# REST和web服务器要绑定到的端口范围
rest.bind-port: 8080-8090
# REST和web服务器绑定到的地址
#rest.bind-address: 0.0.0.0
rest.bind-address: node1
# 用于指定是否从基于web的运行时监视器启用作业提交的标志。取消注释以禁用
web.submit.enable: true
# 用于指定是否从基于web的运行时监视器启用作业取消的标志。取消注释以禁用
#web.cancel.enable: false
#==============================================================================
# Advanced(高级用法)
#==============================================================================
# 覆盖临时文件的目录。如果未指定,则采用系统特定的Java临时目录 (java.io.tmpdir property)
# 对于Yarn上的框架设置,Flink将自动提取容器的临时目录,而无需任何配置。
# 用系统目录分隔符(unix上的冒号“:”)或逗号为多个目录添加分隔列表,例如:/data1/tmp:/data2/tmp:/ata3/tmp
# 注意:每个目录条目由不同的I/O线程读取和写入。可以多次包含同一目录,以便针对该目录创建多个I/O线程。例如,这与高吞吐量RAID相关
# 用于存放运算过程中的临时目录 === 自定义,需要自己先创建文件
#io.tmp.dirs: /data/flink/tmp
io.tmp.dirs: /tmp
# 类加载解析顺序。可能的值是“child-first”(Flink的默认值)和“parent-first(Java的默认值)
# 子优先类加载允许用户在其应用程序中使用不同于类路径中的 'dependency/library'。切换回 'parent-first' 可能有助于调试依赖关系问题
#classloader.resolve-order: child-first
# 进入网络堆栈的内存量。这些数字通常不需要调整。如果出现“Insufficient number”错误,可能需要调整它们。默认最小值为64MB,默认最大值为1GB
#taskmanager.memory.network.fraction: 0.1
#taskmanager.memory.network.min: 64mb
#taskmanager.memory.network.max: 1gb
#==============================================================================
# Flink群集安全配置
#==============================================================================
# 可以通过四个步骤启用各种组件(Hadoop、ZooKeeper和连接器)的Kerberos身份验证:
# 1.配置本地krb5.conf文件
# 2.提供Kerberos凭据(keytab or a ticket cache w/ kinit)
# 3.使凭证可用于各种JAAS登录上下文
# 4.将连接器配置为使用JAAS/SASL
# 下面配置如何提供Kerberos凭据。如果设置了keytab路径和主体,则将使用keytab而不是票证缓存
#security.kerberos.login.use-ticket-cache: true
#security.kerberos.login.keytab: /path/to/kerberos/keytab
#security.kerberos.login.principal: flink-user
# 下面的配置定义了哪些JAAS登录上下文
#security.kerberos.login.contexts: Client,KafkaClient
#==============================================================================
# ZK安全配置
#==============================================================================
# 如果为安全性配置了ZK集成,则以下配置适用
# 如果已配置,则重写下面的配置以提供自定义ZK服务名称
# zookeeper.sasl.service-name: zookeeper
# 下面的配置必须与“security.kerberos.login.context”中设置的值之一匹配
#zookeeper.sasl.login-context-name: Client
#==============================================================================
# HistoryServer(历史记录服务器)
#==============================================================================
# HistoryServer通过启动和停止bin/historyserver.sh (start|stop)
# 要将完成的作业上载到的目录。请将此目录添加到HistoryServer的受监视目录列表中(请参见下文)
jobmanager.archive.fs.dir: hdfs://node1:9000/completed-jobs/
# 基于web的HistoryServer侦听的地址
historyserver.web.address: 0.0.0.0
# 基于web的HistoryServer侦听的端口
historyserver.web.port: 8082
# 以逗号分隔的目录列表,用于监视已完成的作业
historyserver.archive.fs.dir: hdfs://node1:9000/completed-jobs/
# 刷新受监视目录的间隔(毫秒)
historyserver.archive.fs.refresh-interval: 10000
scp -r /home/environment/flink-1.14.6/ node2:/home/environment/
scp -r /home/environment/flink-1.14.6/ node3:/home/environment/
./bin/start-cluster.sh
./bin/stop-cluster.sh
任意一台节点上(一般在 master 节点)执行 bin/start-cluster.sh 均可启动该集群
参数 | 说明 |
---|---|
-n | TaskManager的数量,相当于executor的数量 |
-s | 每个JobManager的core的数量,executor-cores。建议将slot的数量设置每台机器的处理器数量 |
-tm | 每个TaskManager的内存大小,executor-memory |
-jm | JobManager的内存大小,driver-memory |
启动成功后通过web访问;JobManager同时会在8081端口上启动一个web前端,通过http://localhost:8081来访问,默认端口是8081,可以修改config配置端口
命令执行job
# 启动一个终端(端口9099)
# 在一个窗口启动一个websocket 端口9099
nc -l -p 9099
# 再开一个端口执行flink 例子
./flink run ../examples/streaming/WordCount.jar --port 9099
提交官方提供的WordCount程序试试,启动命令:
flink run -m yarn-cluster -yn 1 /home/radmin/package/WordCount.jar
在Flink1.8版本后,Flink官方提供的安装包里没有整合HDFS的jar
下载jar包并在Flink的lib目录下放入该jar包并分发使Flink能够支持对Hadoop的操作
https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2
放入lib目录
Flink自1.11.0 版本开始,已经支持了hadoop 3.x,具体来讲就是将 HADOOP_CLASSPATH 配置成运行机器上的hadoop3 相关jar包即可
YARN上的Flink是针对Hadoop 2.4.1编译的,支持所有的Hadoop版本>= 2.4.1,包括Hadoop 3.x
推荐的方法是通过HADOOP_CLASSPATH环境变量将Hadoop类路径添加到Flink。
Flink 在启动Flink组件(如Client、JobManager或TaskManager)前,将环境变量HADOOP_CLASSPATH的值添加到classpath中。但大多数Hadoop发行版和云环境不会默认设置HADOOP_CLASSPATH这个环境变量,所以如果Hadoop类路径应该由Flink选择,那么必须在所有运行Flink组件的机器上设置这个环境变量。
如果运行在YARN上时,只需要执行下面一条脚本。因为在YARN内部运行的组件将通过Hadoop类路径启动,但当向YARN提交作业时,Hadoop依赖项必须在类路径中。
hadoop classpath是一句shell命令,用于获取配置的Hadoop类路径
export HADOOP_CLASSPATH=`hadoop classpath`
注意:从Flink 1.11开始,Flink项目不再正式支持使用Flink -shade -hadoop-2-uber版本。建议用户通过HADOOP_CLASSPATH提供Hadoop依赖项
/home/environment/flink-1.14.6/lib/flink-shaded-hadoop-2-2.8.3-10.0.jar
export HADOOP_CLASSPATH=`hadoop classpath`
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。