赞
踩
Flink 常见的部署模式:本地模式
、Standalone模式
和 Yarn模式
。
Flink:是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。并且 Flink 提供了数据分布、容错机制以及资源管理等核心功能。Flink提供了诸多高抽象层的API以便用户编写分布式任务:DataSet API、DataStream API、Table API等
Local 模式是 Flink 提供的最简单部署模式,一般用来本地测试和演示使用。
下载 Apache Flink 1.10.0 for Scala 2.11 版本进行演示,下载地址:https://www.apache.org/dyn/closer.lua/flink/flink-1.10.0/flink-1.10.0-bin-scala_2.11.tgz
上传到Linux之后,解压:
tar -zxvf flink-1.10.0-bin-scala_2.11.tgz
注意:Flink的运行环境依赖与JDK,我们需要提前安装好JDK。
# 查看防火墙状态
systemctl status firewalld
# 开启防火墙
systemctl start firewalld
# 关闭防火墙
# systemctl stop firewalld
Flink启动默认端口维8081,我们需要设置放行,Web页面才能访问得到。
# 添加指定需要开放的端口:
firewall-cmd --zone=public --add-port=8081/tcp --permanent
# 重载入添加的端口:
firewall-cmd --reload
# 查询指定端口是否开启成功:
firewall-cmd --query-port=8080/tcp
# 查询全部端口:
firewall-cmd --list-all
./bin/start-cluster.sh
查看启动日志:
输入访问地址:http://IP:8081
使用Idea创建一个Maven项目,编写Job。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.dt</groupId> <artifactId>flink-java</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <name>flink-java</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <!--Flink 版本--> <flink.version>1.10.0</flink.version> <!--JDK 版本--> <java.version>1.8</java.version> <!--Scala 2.11 版本--> <scala.binary.version>2.11</scala.binary.version> <maven.compiler.source>${java.version}</maven.compiler.source> <maven.compiler.target>${java.version}</maven.compiler.target> </properties> <dependencies> <!-- Apache Flink dependencies --> <!-- These dependencies are provided, because they should not be packaged into the JAR file. --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- Add logging framework, to produce console output when running in the IDE. --> <!-- These dependencies are excluded from the application JAR by default. --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> <scope>runtime</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> <scope>runtime</scope> </dependency> </dependencies> <build> <plugins> <!-- Java Compiler --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> <!-- 使用 maven-shade 插件创建一个包含所有必要的依赖项的 fat Jar --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.0.0</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <artifactSet> <excludes> <exclude>org.apache.flink:force-shading</exclude> <exclude>com.google.code.findbugs:jsr305</exclude> <exclude>org.slf4j:*</exclude> <exclude>log4j:*</exclude> </excludes> </artifactSet> <filters> <filter> <!-- 不要拷贝 META-INF 目录下的签名,否则会引起 SecurityExceptions --> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <!--注意:这里一定要换成你自己的 Job main 方法的启动类--> <mainClass>com.dt.BoundedStreamWordCount</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> <profiles> <profile> <id>add-dependencies-for-IDEA</id> <activation> <property> <name>idea.version</name> </property> </activation> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>compile</scope> </dependency> </dependencies> </profile> </profiles> </project>
package com.dt; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; /** * @description: 流处理DataStream api * @author: DT辰白 Created by 2023/5/9 9:56 * @version: v1.0 */ public class BoundedStreamWordCount { public static void main(String[] args) throws Exception { //创建流运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(ParameterTool.fromArgs(args)); env.fromElements(WORDS) .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] splits = value.toLowerCase().split("\\W+"); for (String split : splits) { if (split.length() > 0) { out.collect(new Tuple2<>(split, 1)); } } } }) .keyBy(0) .reduce(new ReduceFunction<Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { return new Tuple2<>(value1.f0, value1.f1 + value1.f1); } }) .print(); //Streaming 程序必须加这个才能启动程序,否则不会有结果 env.execute("## word count streaming demo"); } private static final String[] WORDS = new String[]{ "To be, or not to be,--that is the question:--", "Whether 'tis nobler in the mind to suffer" }; }
IDEA本地运行测试:
在 http://IP:8081/#/submit 页面上传本地打包成功的 flink-java-1.0-SNAPSHOT.jar 后,然后点击 Submit 后就可以运行了。
Job 的结果在 Task Manager 的 Stdout 中,查看运行结果:
Flink是一个分布式计算引擎框架,可以处理有界和无界数据,是基于事件驱动的,是面向流的处理框架,Flink基于每个事件一行一行地流式处理,是真正的流式计算. 另外他也可以基于流来模拟批进行计算实现批处理。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。