# /etc/profile # System wide environment and startup programs, for login setup # Functions and aliases go in /etc/bashrc # It's NOT a good idea to change this file unless you know what you # are doing. It's much better to create a custom.sh shell script in # /etc/profile.d/ to make custom changes to your environment, as this # will prevent the need for merging in future updates. pathmunge () { case ":${PATH}:" in *:"$1":*) ;; *) if [ "$2" = "after" ] ; then PATH=$PATH:$1 else PATH=$1:$PATH fi esac } if [ -x /usr/bin/id ]; then if [ -z "$EUID" ]; then # ksh workaround EUID=`/usr/bin/id -u` UID=`/usr/bin/id -ru` fi USER="`/usr/bin/id -un`" LOGNAME=$USER MAIL="/var/spool/mail/$USER" fi # Path manipulation if [ "$EUID" = "0" ]; then pathmunge /usr/sbin pathmunge /usr/local/sbin else pathmunge /usr/local/sbin after pathmunge /usr/sbin after fi HOSTNAME=`/usr/bin/hostname 2>/dev/null` HISTSIZE=1000 if [ "$HISTCONTROL" = "ignorespace" ] ; then export HISTCONTROL=ignoreboth else export HISTCONTROL=ignoredups fi export PATH USER LOGNAME MAIL HOSTNAME HISTSIZE HISTCONTROL # By default, we want umask to get set. This sets it for login shell # Current threshold for system reserved uid/gids is 200 # You could check uidgid reservation validity in # /usr/share/doc/setup-*/uidgid file if [ $UID -gt 199 ] && [ "`/usr/bin/id -gn`" = "`/usr/bin/id -un`" ]; then umask 002 else umask 022 fi for i in /etc/profile.d/*.sh /etc/profile.d/sh.local ; do if [ -r "$i" ]; then if [ "${-#*i}" != "$-" ]; then . "$i" else . "$i" >/dev/null fi fi done unset i unset -f pathmunge export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk- export HADOOP_HOME=/opt/hadoop export FLINK_HOME=/opt/flink export HADOOP_CONF_DIR=/opt/hadoop/etc/hadoop export HADOOP_CLASSPATH=$HADOOP_HOME/lib/*.jar export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$FLINK_HOME/bin export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tool.jar
jobmanager.rpc.address: bdp01 jobmanager.rpc.port: 6123 jobmanager.heap.size: 1024m #web.tmpdir: hdfs:///tsczbdnndev1.trinasolar.com:8020/flink/flink-web web.upload.dir: /data/flink/flink-web taskmanager.heap.size: 8192m taskmanager.numberOfTaskSlots: 2 parallelism.default: 1 #fs.default-scheme: hdfs://tsczbdnndev1.trinasolar.com:8020 high-availability: zookeeper high-availability.storageDir: hdfs:///flink/ha/ high-availability.zookeeper.quorum: bdp01:2181,bdp02:2181,bdp03:2181 high-availability.cluster-id: /default_ns # high-availability.zookeeper.client.acl: open high-availability.jobmanager.port: 50030 state.backend: filesystem state.checkpoints.dir: hdfs:///flink/checkpoints state.savepoints.dir: hdfs:///flink/savepoints state.backend.incremental: true #web.address: rest.port: 8082 web.submit.enable: true web.timeout: 20000 # io.tmp.dirs: /tmp taskmanager.memory.preallocate: true # classloader.resolve-order: child-first taskmanager.network.memory.fraction: 0.1 taskmanager.network.memory.min: 64mb taskmanager.network.memory.max: 4gb #security.kerberos.login.use-ticket-cache: true #security.kerberos.login.keytab: /tmp/flink.keytab #security.kerberos.login.principal: flink@TRINASOLAR.COM #high-availability.zookeeper.client.acl: creator #security.kerberos.login.contexts: Client,KafkaClient jobmanager.archive.fs.dir: hdfs:///flink/completed-jobs/ historyserver.web.address: historyserver.web.port: 8083 historyserver.archive.fs.dir: hdfs:///flink/completed-jobs/ historyserver.archive.fs.refresh-interval: 10000 #yarn.appmaster.rpc.address: tsczbddndev2.trinasolar.com yarn.maximum-failed-containers: 99999 yarn.reallocate-failed: true yarn.application-attempts: 10 fs.overwrite-files: true fs.output.always-create-directory: true #这行是测试的时候报错加的 classloader.check-leaked-classloader: false
./yarn-session.sh -n 10 -tm 1024 -s 2 -nm bdp01 -d
分为两种:session 和 per-job,一般都用session模式
./bin/flink run examples/batch/WordCount.jar
./bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar
1 参数必选 : -n,--container <arg> 分配多少个yarn容器 (=taskmanager的数量) 2 参数可选 : -D <arg> 动态属性 -d,--detached 独立运行 -jm,--jobManagerMemory <arg> JobManager的内存 [in MB] -nm,--name 在YARN上为一个自定义的应用设置一个名字 -q,--query 显示yarn中可用的资源 (内存, cpu核数) -qu,--queue <arg> 指定YARN队列. -s,--slots <arg> 每个TaskManager使用的slots数量 -tm,--taskManagerMemory <arg> 每个TaskManager的内存 [in MB] -z,--zookeeperNamespace <arg> 针对HA模式在zookeeper上创建NameSpace -id,--applicationId <yarnAppId> YARN集群上的任务id,附着到一个后台运行的yarn session中 3 run [OPTIONS] <jar-file> <arguments> run操作参数: -c,--class <classname> 如果没有在jar包中指定入口类,则需要在这里通过这个参数指定 -m,--jobmanager <host:port> 指定需要连接的jobmanager(主节点)地址,使用这个参数可以指定一个不同于配置文件中的jobmanager -p,--parallelism <parallelism> 指定程序的并行度。可以覆盖配置文件中的默认值。 4 启动一个新的yarn-session,它们都有一个y或者yarn的前缀 例如:./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar 连接指定host和port的jobmanager: ./bin/flink run -m SparkMaster:1234 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1 启动一个新的yarn-session: ./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1 5 注意:命令行的选项也可以使用./bin/flink 工具获得。 6 Action "run" compiles and runs a program. Syntax: run [OPTIONS] <jar-file> <arguments> "run" action options: -c,--class <classname> Class with the program entry point ("main" method or "getPlan()" method. Only needed if the JAR file does not specify the class in its manifest. -C,--classpath <url> Adds a URL to each user code classloader on all nodes in the cluster. The paths must specify a protocol (e.g. file://) and be accessible on all nodes (e.g. by means of a NFS share). You can use this option multiple times for specifying more than one URL. The protocol must be supported by the {@link java.net.URLClassLoader}. -d,--detached If present, runs the job in detached mode -n,--allowNonRestoredState Allow to skip savepoint state that cannot be restored. You need to allow this if you removed an operator from your program that was part of the program when the savepoint was triggered. -p,--parallelism <parallelism> The parallelism with which to run the program. Optional flag to override the default value specified in the configuration. -q,--sysoutLogging If present, suppress logging output to standard out. -s,--fromSavepoint <savepointPath> Path to a savepoint to restore the job from (for example hdfs:///flink/savepoint-1537). 7 Options for yarn-cluster mode: -d,--detached If present, runs the job in detached mode -m,--jobmanager <arg> Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration. -yD <property=value> use value for given property -yd,--yarndetached If present, runs the job in detached mode (deprecated; use non-YARN specific option instead) -yh,--yarnhelp Help for the Yarn session CLI. -yid,--yarnapplicationId <arg> Attach to running YARN session -yj,--yarnjar <arg> Path to Flink jar file -yjm,--yarnjobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB) -yn,--yarncontainer <arg> Number of YARN container to allocate (=Number of Task Managers) -ynl,--yarnnodeLabel <arg> Specify YARN node label for the YARN application -ynm,--yarnname <arg> Set a custom name for the application on YARN -yq,--yarnquery Display available YARN resources (memory, cores) -yqu,--yarnqueue <arg> Specify YARN queue. -ys,--yarnslots <arg> Number of slots per TaskManager -yst,--yarnstreaming Start Flink in streaming mode -yt,--yarnship <arg> Ship files in the specified directory (t for transfer) -ytm,--yarntaskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB) -yz,--yarnzookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode
