赞
踩
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>bigdata16</artifactId>
- <groupId>com.shujia</groupId>
- <version>1.0-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>Flink</artifactId>
- <version>1.0</version>
-
- <properties>
- <maven.compiler.source>8</maven.compiler.source>
- <maven.compiler.target>8</maven.compiler.target>
- <flink.version>1.11.2</flink.version>
- <scala.binary.version>2.11</scala.binary.version>
- <scala.version>2.11.12</scala.version>
- <log4j.version>2.12.1</log4j.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-slf4j-impl</artifactId>
- <version>${log4j.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-api</artifactId>
- <version>${log4j.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-core</artifactId>
- <version>${log4j.version}</version>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <!-- Java Compiler -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.1</version>
- <configuration>
- <source>1.8</source>
- <target>1.8</target>
- </configuration>
- </plugin>
-
- <!-- Scala Compiler -->
- <plugin>
- <groupId>org.scala-tools</groupId>
- <artifactId>maven-scala-plugin</artifactId>
- <version>2.15.2</version>
- <executions>
- <execution>
- <goals>
- <goal>compile</goal>
- <goal>testCompile</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-
- </project>

- package com.shujia.core
-
- import org.apache.flink.streaming.api.scala._
-
- object Demo01WordCount {
- def main(args: Array[String]): Unit = {
- /**
- * 创建Flink入口
- */
- val env: StreamExecutionEnvironment= StreamExecutionEnvironment.getExecutionEnvironment
-
- //默认并行度等于CPU的逻辑核数 相当于是任务的一个并行度
- env.setParallelism(2)
-
- /**
- * 通过Socket模拟实时数据
- * nc -lk 8888
- *
- * DataStream:Flink中的编程模型
- */
- val linesDS: DataStream[String] = env.socketTextStream("master", 8888)
-
- //对每一条数据进行切分
- val wordsDS: DataStream[String] = linesDS
- .flatMap(line => line.split(","))
-
- //将每个单词变成 K V 格式
- val wordsKVDS: DataStream[(String, Int)] = wordsDS.map(word => (word, 1))
-
- //按照每个单词进行分组
- val keyByDS: KeyedStream[(String, Int), String] = wordsKVDS.keyBy(kv => kv._1)
-
- //统计每个单词的数量
- val wordCntDS: DataStream[(String, Int)] = keyByDS.sum(1)
-
- //将结果打印
- wordCntDS.print()
-
- //启动任务
- env.execute("Demo01WordCount")
- }
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。