赞
踩
org.apache.maven.archetypes:maven-archetype-quickstart
#代表普通的maven项目面板
Groupid:公司名称
Artifactid:项目模块
主要选择maven路径和settings路径
<?xml version="1.0" encoding="UTF-8"?> <project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.hainiu</groupId> <artifactId>hainiuflink</artifactId> <version>1.0</version> <properties> <java.version>1.8</java.version> <scala.version>2.11</scala.version> <flink.version>1.9.3</flink.version> <parquet.version>1.10.0</parquet.version> <hadoop.version>2.7.3</hadoop.version> <fastjson.version>1.2.72</fastjson.version> <redis.version>2.9.0</redis.version> <mysql.version>5.1.35</mysql.version> <log4j.version>1.2.17</log4j.version> <slf4j.version>1.7.7</slf4j.version> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.scope>compile</project.build.scope> <!-- <project.build.scope>provided</project.build.scope>--> <mainClass>com.hainiu.Driver</mainClass> </properties> <dependencies> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>${slf4j.version}</version> <scope>${project.build.scope}</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>${log4j.version}</version> <scope>${project.build.scope}</scope> </dependency> <!-- flink的hadoop兼容 --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> <scope>${project.build.scope}</scope> </dependency> <!-- flink的hadoop兼容 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hadoop-compatibility_${scala.version}</artifactId> <version>${flink.version}</version> <scope>${project.build.scope}</scope> </dependency> <!-- flink的java的api --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>${project.build.scope}</scope> </dependency> <!-- flink streaming的java的api --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.version}</artifactId> <version>${flink.version}</version> <scope>${project.build.scope}</scope> </dependency> <!-- flink的scala的api --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_${scala.version}</artifactId> <version>${flink.version}</version> <scope>${project.build.scope}</scope> </dependency> <!-- flink streaming的scala的api --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.version}</artifactId> <version>${flink.version}</version> <scope>${project.build.scope}</scope> </dependency> <!-- flink运行时的webUI --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web_${scala.version}</artifactId> <version>${flink.version}</version> <scope>${project.build.scope}</scope> </dependency> <!-- 使用rocksdb保存flink的state --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_${scala.version}</artifactId> <version>${flink.version}</version> <scope>${project.build.scope}</scope> </dependency> <!-- flink操作hbase --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hbase_${scala.version}</artifactId> <version>${flink.version}</version> <scope>${project.build.scope}</scope> </dependency> <!-- flink操作es --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch5_${scala.version}</artifactId> <version>${flink.version}</version> <scope>${project.build.scope}</scope> </dependency> <!-- flink 的kafka --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.10_${scala.version}</artifactId> <version>${flink.version}</version> <scope>${project.build.scope}</scope> </dependency> <!-- flink 写文件到HDFS --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-filesystem_${scala.version}</artifactId> <version>${flink.version}</version> <scope>${project.build.scope}</scope> </dependency> <!-- mysql连接驱动 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql.version}</version> <scope>${project.build.scope}</scope> </dependency> <!-- redis连接 --> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>${redis.version}</version> <scope>${project.build.scope}</scope> </dependency> <!-- flink操作parquet文件格式 --> <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-avro</artifactId> <version>${parquet.version}</version> <scope>${project.build.scope}</scope> </dependency> <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-hadoop</artifactId> <version>${parquet.version}</version> <scope>${project.build.scope}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-parquet_${scala.version}</artifactId> <version>${flink.version}</version> <scope>${project.build.scope}</scope> </dependency> <!-- json操作 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fastjson.version}</version> <scope>${project.build.scope}</scope> </dependency> </dependencies> <build> <resources> <resource> <directory>src/main/resources</directory> </resource> </resources> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptors> <descriptor>src/assembly/assembly.xml</descriptor> </descriptors> <archive> <manifest> <mainClass>${mainClass}</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.12</version> <configuration> <skip>true</skip> <forkMode>once</forkMode> <excludes> <exclude>**/**</exclude> </excludes> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> <encoding>${project.build.sourceEncoding}</encoding> </configuration> </plugin> </plugins> </build> </project>
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
String path = "H:\\flink_demo\\flink_test\\src\\main\\resources\\wordcount.txt";
DataSet<String> inputDataSet = env.readTextFile(path);
DataStreamSource<String> elementsSource= env.socketTextStream("10.50.40.131", 9999);
DataStream<String> lines = env.readTextFile("file:///path");
DataSet<Tuple2<String, Integer>> resultDataSet = inputDataSet.flatMap(new MyFlatMapFunction())
.groupBy(0) // (word, 1) -> 0 表示 word
.sum(1);
scoket
…//print()它会对流中的每个元素都调用 toString() 方法。
resultDataSet.print();
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; public class WordCount { public static void main(String[] args) throws Exception { // 1、创建执行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 2、读取数据 String path = "H:\\flink_demo\\flink_test\\src\\main\\resources\\wordcount.txt"; // DataSet -> Operator -> DataSource DataSet<String> inputDataSet = env.readTextFile(path); // 3、扁平化 + 分组 + sum DataSet<Tuple2<String, Integer>> resultDataSet = inputDataSet.flatMap(new MyFlatMapFunction()) .groupBy(0) // (word, 1) -> 0 表示 word .sum(1); resultDataSet.print(); } public static class MyFlatMapFunction implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String input, Collector<Tuple2<String, Integer>> collector) throws Exception { String[] words = input.split(" "); for (String word : words) { collector.collect(new Tuple2<>(word, 1)); } } } }
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class StreamCount { public static void main(String[] args) throws Exception { // 1、创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2、读取 文件数据 数据 DataStreamSource<String> inputDataStream = env.readTextFile("H:\\flink_demo\\flink_test\\src\\main\\resources\\wordcount.txt"); // 3、计算 SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream = inputDataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String input, Collector<Tuple2<String, Integer>> collector) throws Exception { String[] words = input.split(" "); for (String word : words) { collector.collect(new Tuple2<>(word, 1)); } } }).keyBy(0) .sum(1); // 4、输出 resultDataStream.print(); // 5、启动 env env.execute(); } }
统计文本:Flink
输出:Flink,1
统计文本:增加Spark
输出:Flink,2 Spark,1
统计文本:新增python
输出:Flink,3 Spark,2 python,1
流式处理的结果:是不断刷新的,在这个例子中,数据时一行一行进入处理任务的,进来一批处理一批,没有数据就处于等待状态。
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class StreamCount { public static void main(String[] args) throws Exception { // 1、创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2、读取 文件数据 数据 //DataStreamSource<String> inputDataStream = env.readTextFile("H:\\flink_demo\\flink_test\\src\\main\\resources\\wordcount.txt"); //2.2、用parameter tool工具从程序启动参数中提取配置项 ParameterTool parameterTool = ParameterTool.fromArgs(args); String host = parameterTool.get("host"); int port = parameterTool.getInt("port"); DataStreamSource<String> inputDataStream = env.socketTextStream(host,port); // 3、计算 SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream = inputDataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String input, Collector<Tuple2<String, Integer>> collector) throws Exception { String[] words = input.split(" "); for (String word : words) { collector.collect(new Tuple2<>(word, 1)); } } }).keyBy(0) .sum(1); // 4、输出 resultDataStream.print().setParallelism(1); //打印执行计划图 System.out.println(env.getExecutionPlan()); // 5、启动 env env.execute(); } }
(1)将jar包上传到flink集群
entry class:入口类
program Arguments:入口参数
parallelism:并行度,作业提交job时,如果代码没有配,以全局并行度为准,没有以此并性都为准,没有,以全局并行度为准
savapoint path:保存点,从之前存盘的地方启动
(3)运行结果
New -> Settings -> maven
vim {flink_home}/log
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。