当前位置:   article > 正文

Linux安装Flink及其环境配置

linux安装flink

目录


Linux服务器环境部署专栏目录(点击进入…)


快来加入我们的互动学习群吧,与我们一起成长和进步吧! QQ交流群:948912943

进群下载配套安装包,无需考虑版本兼容等问题!!!


Linux安装Flink及其环境配置

集群规划

使用StandAlone模式,需要启动Flink的主节点JobManager以及从节点TaskManager

服务node1node2node3
JobManager
TaskManager

Flink 的部署模式分为3种:
(1)Application模式
(2)Per-Job模式
(3)Session模式


下载安装包

官方地址:https://svn.apache.org/repos/asf/flink/index.html

在这里插入图片描述


1.解压到安装目录

tar zxvf flink-1.14.6-bin-scala_2.11.tgz
  • 1

2.配置环境变量

export FLINK_HOME=/home/environment/flink-1.14.6
export PATH=$FLINK_HOME/bin:$PATH
  • 1
  • 2

每个 JobManager 的可用内存值(jobmanager.memory.process.size)
每个 TaskManager 的可用内存值 (taskmanager.memory.process.size
每台机器的可用 CPU 数(taskmanager.numberOfTaskSlots)
集群中所有 CPU 数(parallelism.default)
临时目录(io.tmp.dirs)


3.修改master(node1)

bin/start-cluster.sh 和 bin/stop-cluster.sh 依赖 conf/masters 和 conf/workers 来确定集群组件实例的数量

编辑masters文件;进入conf目录:

vim ./flink/conf/masters

#删除默认的改为
node1:8081
  • 1
  • 2

4.配置其他两台机器workers(工作节点)

配置工作节点
vi workers

# 将其他两台机器的IP一起写进去
node1
node2
node3
  • 1
  • 2
  • 3
  • 4

5.编辑flink-conf.yaml文件

配置文件详细描述

#==============================================================================
# 公共配置
#==============================================================================

#运行JobManager的主机的外部地址,可以是TaskManager和任何想要连接的客户端都可以访问。
#此设置仅在独立模式下使用,可能在作业管理器端被覆盖通过指定bin/jobmanager.sh的--host<hostname>参数。
#sh可执行文件。在高可用性模式下,如果使用bin/start集群。sh脚本和设置conf/masters文件,这将自动处理。纱线根据节点的主机名自动配置主机名

# 可访问JobManager的RPC IP地址
#jobmanager.rpc.address: localhost
jobmanager.rpc.address: node1

# 可访问JobManager的RPC端口
jobmanager.rpc.port: 6123

# 请注意,下面几条是JobManager进程中所有内存使用的原因,包括JVM元空间和其他开销
# JobManager的总进程内存大小
jobmanager.memory.process.size: 1600m

# TaskManager的总进程内存大小
taskmanager.memory.process.size: 1728m

# 要排除JVM元空间和开销,请使用Flink总内存大小,而不是“taskmanager.memory.process.size”。
# 不建议同时设置“taskmanager.memory.process”。大小和Flink内存。
# 任务管理器。内存。flink。尺寸:1280m
# 每个TaskManager提供的任务槽数。每个插槽运行一条并行管道
taskmanager.numberOfTaskSlots: 1

# 用于未指定程序的并行性和其他并行性 - 改为3,因为三台机器的集群
parallelism.default: 3





# ==================默认文件系统方案和权限==================
# 
# 默认情况下,不带方案的文件路径相对于本地路径进行解释根文件系统“file:///”。使用此选项覆盖默认值并解释相对于不同文件系统的相对路径,例如“hdfs://mynamenode:1234
# fs.default-scheme




#==============================================================================
# 高可用性
#==============================================================================

# 高可用性模式。可能的选项是 'NONE' or 'zookeeper'
high-availability: zookeeper

# 保存主恢复元数据的路径。ZooKeeper存储时检查站和领导人选举的小真相,这个位置存储较大的对象,如持久化数据流图
# 必须是可从所有节点访问的持久文件系统(如HDFS、S3、Ceph、nfs等)
high-availability.storageDir: hdfs://node1:9000/flink/ha/

# 协调高可用性设置的ZooKeeper仲裁对等方列表
# 必须是以下格式的列表:“host1:clientPort,host2:clientPort,…”(默认clientPort:2181)
#high-availability.zookeeper.quorum: localhost:2181
high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181

# ACL options 基于 https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
# 它可以是“创建者”(ZOO_CREATE_ALL_ACL)或“打开”(ZO_open_ACL_UNSAFE)。默认值为“打开”,如果启用了ZK security,则可以更改为“creator”
#high-availability.zookeeper.client.acl: open





#==============================================================================
# 容错和检查点
#==============================================================================

# 如果启用检查点,将用于存储操作员状态检查点的后端。执行时启用检查点。检查点。间隔>0
# 详细信息参阅 CheckpointConfig and ExecutionCheckpointingOptions
#execution.checkpointing.interval: 3min
#execution.checkpointing.externalized-checkpoint-retention: [DELETE_ON_CANCELLATION, RETAIN_ON_CANCELLATION]
#execution.checkpointing.max-concurrent-checkpoints: 1
#execution.checkpointing.min-pause: 0
#execution.checkpointing.mode: [EXACTLY_ONCE, AT_LEAST_ONCE]
#execution.checkpointing.timeout: 10min
#execution.checkpointing.tolerable-failed-checkpoints: 0
#execution.checkpointing.unaligned: false

# 支持的后端是“jobmanager”、“filesystem”、“rocksdb”或<class-name-of-factory>
state.backend: filesystem

# 当使用任何默认绑定状态后端时,检查点文件系统的目录
#state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
state.checkpoints.dir: hdfs://node1:9000/flink-checkpoints

# 保存点的默认目标目录,可选
#state.savepoints.dir: hdfs://namenode-host:port/flink-savepoints
state.checkpoints.dir: hdfs://node1:9000/flink-checkpoints

# 用于启用/禁用支持增量检查点的后端的增量检查点(如RocksDB状态后端)的标志
#state.backend.incremental: false

# 故障转移策略,即作业计算如何从任务失败中恢复。
# 仅重新启动可能受任务失败影响的任务,这通常包括下游任务和可能的上游任务,如果它们生成的数据不再可供使用
jobmanager.execution.failover-strategy: region




#==============================================================================
# Rest & web frontend(REST和Web前端)
#==============================================================================

# REST客户端连接到的端口。如果尚未指定REST.bind-port,则服务器也将绑定到此端口
rest.port: 8081

# REST客户端将连接到的地址
#rest.address: 0.0.0.0
rest.address: node1

# REST和web服务器要绑定到的端口范围
rest.bind-port: 8080-8090

# REST和web服务器绑定到的地址
#rest.bind-address: 0.0.0.0
rest.bind-address: node1

# 用于指定是否从基于web的运行时监视器启用作业提交的标志。取消注释以禁用
web.submit.enable: true

# 用于指定是否从基于web的运行时监视器启用作业取消的标志。取消注释以禁用
#web.cancel.enable: false





#==============================================================================
# Advanced(高级用法)
#==============================================================================

# 覆盖临时文件的目录。如果未指定,则采用系统特定的Java临时目录 (java.io.tmpdir property) 
# 对于Yarn上的框架设置,Flink将自动提取容器的临时目录,而无需任何配置。
# 用系统目录分隔符(unix上的冒号“:”)或逗号为多个目录添加分隔列表,例如:/data1/tmp:/data2/tmp:/ata3/tmp
# 注意:每个目录条目由不同的I/O线程读取和写入。可以多次包含同一目录,以便针对该目录创建多个I/O线程。例如,这与高吞吐量RAID相关
# 用于存放运算过程中的临时目录 === 自定义,需要自己先创建文件
#io.tmp.dirs: /data/flink/tmp
io.tmp.dirs: /tmp

# 类加载解析顺序。可能的值是“child-first”(Flink的默认值)和“parent-first(Java的默认值)
# 子优先类加载允许用户在其应用程序中使用不同于类路径中的 'dependency/library'。切换回 'parent-first' 可能有助于调试依赖关系问题
#classloader.resolve-order: child-first

# 进入网络堆栈的内存量。这些数字通常不需要调整。如果出现“Insufficient number”错误,可能需要调整它们。默认最小值为64MB,默认最大值为1GB
#taskmanager.memory.network.fraction: 0.1
#taskmanager.memory.network.min: 64mb
#taskmanager.memory.network.max: 1gb





#==============================================================================
# Flink群集安全配置
#==============================================================================

# 可以通过四个步骤启用各种组件(Hadoop、ZooKeeper和连接器)的Kerberos身份验证:
# 1.配置本地krb5.conf文件
# 2.提供Kerberos凭据(keytab or a ticket cache w/ kinit)
# 3.使凭证可用于各种JAAS登录上下文
# 4.将连接器配置为使用JAAS/SASL

# 下面配置如何提供Kerberos凭据。如果设置了keytab路径和主体,则将使用keytab而不是票证缓存
#security.kerberos.login.use-ticket-cache: true
#security.kerberos.login.keytab: /path/to/kerberos/keytab
#security.kerberos.login.principal: flink-user

# 下面的配置定义了哪些JAAS登录上下文
#security.kerberos.login.contexts: Client,KafkaClient





#==============================================================================
# ZK安全配置
#==============================================================================

# 如果为安全性配置了ZK集成,则以下配置适用
# 如果已配置,则重写下面的配置以提供自定义ZK服务名称
# zookeeper.sasl.service-name: zookeeper

# 下面的配置必须与“security.kerberos.login.context”中设置的值之一匹配
#zookeeper.sasl.login-context-name: Client





#==============================================================================
# HistoryServer(历史记录服务器)
#==============================================================================

# HistoryServer通过启动和停止bin/historyserver.sh (start|stop)
# 要将完成的作业上载到的目录。请将此目录添加到HistoryServer的受监视目录列表中(请参见下文)
jobmanager.archive.fs.dir: hdfs://node1:9000/completed-jobs/

# 基于web的HistoryServer侦听的地址
historyserver.web.address: 0.0.0.0

# 基于web的HistoryServer侦听的端口
historyserver.web.port: 8082

# 以逗号分隔的目录列表,用于监视已完成的作业
historyserver.archive.fs.dir: hdfs://node1:9000/completed-jobs/

# 刷新受监视目录的间隔(毫秒)
historyserver.archive.fs.refresh-interval: 10000

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213

6.拷贝到另外两台机器

scp -r /home/environment/flink-1.14.6/ node2:/home/environment/
scp -r /home/environment/flink-1.14.6/ node3:/home/environment/
  • 1
  • 2

7.启动flink

./bin/start-cluster.sh
./bin/stop-cluster.sh
  • 1
  • 2

任意一台节点上(一般在 master 节点)执行 bin/start-cluster.sh 均可启动该集群

参数说明
-nTaskManager的数量,相当于executor的数量
-s每个JobManager的core的数量,executor-cores。建议将slot的数量设置每台机器的处理器数量
-tm每个TaskManager的内存大小,executor-memory
-jmJobManager的内存大小,driver-memory

启动成功后通过web访问;JobManager同时会在8081端口上启动一个web前端,通过http://localhost:8081来访问,默认端口是8081,可以修改config配置端口

命令执行job

# 启动一个终端(端口9099)
# 在一个窗口启动一个websocket 端口9099
nc -l -p 9099
# 再开一个端口执行flink 例子
./flink run ../examples/streaming/WordCount.jar --port 9099
  • 1
  • 2
  • 3
  • 4
  • 5

提交官方提供的WordCount程序试试,启动命令:

flink run -m yarn-cluster -yn 1 /home/radmin/package/WordCount.jar
  • 1

在Flink1.8版本后,Flink官方提供的安装包里没有整合HDFS的jar

下载jar包并在Flink的lib目录下放入该jar包并分发使Flink能够支持对Hadoop的操作
https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2
放入lib目录


Hadoop版本的支持

Flink自1.11.0 版本开始,已经支持了hadoop 3.x,具体来讲就是将 HADOOP_CLASSPATH 配置成运行机器上的hadoop3 相关jar包即可

YARN上的Flink是针对Hadoop 2.4.1编译的,支持所有的Hadoop版本>= 2.4.1,包括Hadoop 3.x


Flink与Hadoop集成

推荐的方法是通过HADOOP_CLASSPATH环境变量将Hadoop类路径添加到Flink。

Flink 在启动Flink组件(如Client、JobManager或TaskManager)前,将环境变量HADOOP_CLASSPATH的值添加到classpath中。但大多数Hadoop发行版和云环境不会默认设置HADOOP_CLASSPATH这个环境变量,所以如果Hadoop类路径应该由Flink选择,那么必须在所有运行Flink组件的机器上设置这个环境变量。

如果运行在YARN上时,只需要执行下面一条脚本。因为在YARN内部运行的组件将通过Hadoop类路径启动,但当向YARN提交作业时,Hadoop依赖项必须在类路径中。

hadoop classpath是一句shell命令,用于获取配置的Hadoop类路径

export HADOOP_CLASSPATH=`hadoop classpath`
  • 1

注意:从Flink 1.11开始,Flink项目不再正式支持使用Flink -shade -hadoop-2-uber版本。建议用户通过HADOOP_CLASSPATH提供Hadoop依赖项

/home/environment/flink-1.14.6/lib/flink-shaded-hadoop-2-2.8.3-10.0.jar
export HADOOP_CLASSPATH=`hadoop classpath`
  • 1
  • 2
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/745511
推荐阅读
相关标签
  

闽ICP备14008679号