赞
踩
本篇文章大概3155字,阅读时间大约8分钟
结合Flink官方文档,整理关于Flink命令行的操作参数,包含命令行接口和Scala Shell
1
文档编写目的
整理Flink官方文档中命令行接口和scala-shell的内容
在Flink1.10.1集群上进行实操
Command-Line Interface
Scala Shell
2
Command-Line Interface
Flink的命令行接口主要用来运行达成jar包的Flink应用程序,具体功能如下:
提交Flink应用
取消运行的Flink应用
获取Flink应用的信息
查看Flink集群上的应用(包含运行中和等待的应用)
触发或释放savepoint
flink run (这里略过了部分python的参数)
- -c,--class <classname> Flink应用程序的入口
- -C,--classpath <url> 指定所有节点都可以访问到的url,可用于多个应用程序都需要的工具类加载
- -d,--detached 是否使用分离模式,就是提交任务,cli是否退出,加了-d参数,cli会退出
- -n,--allowNonRestoredState 允许跳过无法还原的savepoint。比如删除了代码中的部分operator
- -p,--parallelism <parallelism> 执行并行度
- -s,--fromSavepoint <savepointPath> 从savepoint恢复任务
- -sae,--shutdownOnAttachedExit 以attached模式提交,客户端退出的时候关闭集群
flink yarn-cluster 模式
-d,--detached 是否使用分离模式 -m,--jobmanager <arg> 指定提交的jobmanager -yat,--yarnapplicationType <arg> 设置yarn应用的类型 -yD <property=value> 使用给定属性的值 -yd,--yarndetached 使用yarn分离模式 -yh,--yarnhelp yarn session的帮助 -yid,--yarnapplicationId <arg> 挂到正在运行的yarnsession上 -yj,--yarnjar <arg> Flink jar文件的路径 -yjm,--yarnjobManagerMemory <arg> jobmanager的内存(单位M) -ynl,--yarnnodeLabel <arg> 指定 YARN 应用程序 YARN 节点标签 -ynm,--yarnname <arg> 自定义yarn应用名称 -yq,--yarnquery 显示yarn的可用资源 -yqu,--yarnqueue <arg> 指定yarn队列 -ys,--yarnslots <arg> 指定每个taskmanager的slots数 -yt,--yarnship <arg> 在指定目录中传输文件 -ytm,--yarntaskManagerMemory <arg> 每个taskmanager的内存 -yz,--yarnzookeeperNamespace <arg> 用来创建ha的zk子路径的命名空间 -z,--zookeeperNamespace <arg> 用来创建ha的zk子路径的命名空间
flink info 语法:info [OPTIONS] <jar-file> <arguments>
-c,--class <classname> 应用程序入口
flink list 显示正在运行或调度的程序
- -a,--all 显示所有应用和对应的job id
- -r,--running 显示正在运行的应用和job id
- -s,--scheduled 显示调度的应用和job id
- #yarn-cluster模式
- -m,--jobmanager <arg> 指定连接的jobmanager
- -yid,--yarnapplicationId <arg> 挂到指定的yarn id对应的yarn session上
- -z,--zookeeperNamespace <arg> 用来创建ha的zk子路径的命名空间
flink stop:停止一个正在运行的应用
- -d,--drain 在获取savepoint,停止pipeline之前发送MAX_WATERMARK
- -p,--savepointPath <savepointPath> 指定savepoint的path,如果不指定会使用默认值("state.savepoints.dir")
savepoint - 触发一个正在运行的应用生成savepoint
- 语法:savepoint [OPTIONS] <Job ID> [<target directory>]
- -d,--dispose <arg> savepoint的路径
- -j,--jarfile <jarfile> Flink的jar包
Flink可以使用交互式的scala shell环境,可以使用本地模式和集群模式。
local启动方式
./bin/start-scala-shell.sh local
scala shell支持batch和stream,对应到benv和senv上。benv通过print()将结果输出到控制台,senv通过senv.execute("job name")进行任务提交。
./bin/start-scala-shell.sh local
batch - DataSet API测试
- val text = benv.fromElements(
- "To be, or not to be,--that is the question:--",
- "Whether 'tis nobler in the mind to suffer",
- "The slings and arrows of outrageous fortune",
- "Or to take arms against a sea of troubles,")
- val counts = text.flatMap(_.toLowerCase.split("\\W+")).map((_,1)).groupBy(0).sum(1)
- counts.print()
stream - DataStream API测试
- val textStreaming = senv.fromElements(
- "To be, or not to be,--that is the question:--",
- "Whether 'tis nobler in the mind to suffer",
- "The slings and arrows of outrageous fortune",
- "Or to take arms against a sea of troubles,")
- val countsStreaming = textStreaming.flatMap(_.toLowerCase.split("\\W+")).map((_,1)).keyBy(0).sum(1)
- countsStreaming.print()
- senv.execute("Streaming Wordcount")
local
./bin/start-scala-shell.sh local
remote 远程
通过jobmanager连接启动了的flink集群
./bin/start-scala-shell.sh remote <hostname> <portnumber>
yarn scala shell cluster
./bin/start-scala-shell.sh yarn -n 2
yarn session
./bin/start-scala-shell.sh yarn
点个“在看”表示朕
已阅
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。