赞
踩
[root@CentOS ~]# tar -zxf flink-1.8.1-bin-scala_2.11.tgz -C /usr
root@CentOS ~]# vi /usr/flink-1.8.1/conf/flink-conf.yaml
jobmanager.rpc.address: CentOS
taskmanager.numberOfTaskSlots: 4
parallelism.default: 3
[root@CentOS ~]# vi /usr/flink-1.8.1/conf/slaves
CentOS
[root@CentOS flink-1.8.1]# ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host CentOS.
Starting taskexecutor daemon on host CentOS.
[root@CentOS flink-1.8.1]# jps
2912 Jps
2841 TaskManagerRunner
2397 StandaloneSessionClusterEntrypoint
<properties>
<flink.version>1.8.1</flink.version>
<scala.version>2.11</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<!--在执行package时候,将scala源码编译进jar-->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.0.1</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<!--将依赖jar打入到jar中-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
package com.hw.demo01
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
/**
* @aurhor:fql
* @date 2019/10/14 20:03
* @type: flink的单词统计案例
*/
object WordCount_flink {
def main(args: Array[String]): Unit = {
//1.创建流处理的环境-远程发布|本地执行
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
//2.读取外围系统数据-细化
val lines:DataStream[String]=fsEnv.socketTextStream("CentOS",9999)
lines.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(t=>t._1)
.sum(1)
.print()
//3.执行计算
fsEnv.execute("wordcount")
}
}
[root@CentOS flink-1.8.1]# ./bin/flink run --class com.hw.demo01.WordCount_flink --detached --parallelism 3 /root/original-flink-1.0-SNAPSHOT.jar
Starting execution of program
Job has been submitted with JobID b7c2555ba5f9e847c426a18e8af0748f
[root@CentOS flink-1.8.1]# ./bin/flink cancel -m CentOS:8081 221d5fa916523f88741e2abf39453b81
Cancelling job 221d5fa916523f88741e2abf39453b81.
Cancelled job 221d5fa916523f88741e2abf39453b81.
[root@CentOS flink-1.8.1]# ./bin/flink run --class com.hw.demo01.WordCount_flink --detached --parallelism 3 /root/original-flink-1.0-SNAPSHOT.jar
val jarFiles="flink\\target\\original-flink-1.0-SNAPSHOT.jar" //测试
val fsEnv = StreamExecutionEnvironment.createRemoteEnvironment("CentOS",8081,jarFiles)
val fsEnv = StreamExecutionEnvironment.createLocalEnvironment(3)
或者
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment //自动识别运行环境,一般用于生产
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。