赞
踩
在 Ubuntu 中:
sudo apt update
sudo apt install openjdk-11-jdk
在 CentOS 中:
sudo yum install java-11-openjdk
验证 Java 安装:
java -version
访问 Flink 官网 下载最新版本的 Flink。
wget https://downloads.apache.org/flink/flink-1.16.2/flink-1.16.2-bin-scala_2.12.tgz
tar -xzvf flink-1.16.2-bin-scala_2.12.tgz
mv flink-1.16.2 /usr/local/flink
编辑 ~/.bashrc
文件,添加以下内容:
export FLINK_HOME=/usr/local/flink
export PATH=$PATH:$FLINK_HOME/bin
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
应用更改:
source ~/.bashrc
编辑 Flink 的配置文件,位于 $FLINK_HOME/conf
。
flink-conf.yaml
jobmanager.rpc.address: localhost
taskmanager.memory.process.size: 1024m
masters
文件在单机模式下,默认配置即可,无需修改。
workers
文件在单机模式下,默认配置即可,无需修改。
start-cluster.sh
访问 Flink Dashboard,确保 Flink 正常运行:
flink-conf.yaml
中的内存配置。在 JobManager 节点上生成 SSH 密钥:
ssh-keygen -t rsa
将公钥复制到所有 TaskManager 节点:
ssh-copy-id user@taskmanager1
ssh-copy-id user@taskmanager2
在所有节点上安装 Flink,步骤与单机安装相同。
flink-conf.yaml
在所有节点上编辑 $FLINK_HOME/conf/flink-conf.yaml
,配置如下内容:
jobmanager.rpc.address: master
taskmanager.numberOfTaskSlots: 2
parallelism.default: 1
taskmanager.memory.process.size: 1024m
masters
文件在 JobManager 节点上编辑 $FLINK_HOME/conf/masters
文件,添加 JobManager 的主机名:
master:8081
workers
文件在 JobManager 节点上编辑 $FLINK_HOME/conf/workers
文件,添加所有 TaskManager 节点的主机名:
taskmanager1
taskmanager2
在 JobManager 节点上启动 Flink 集群:
start-cluster.sh
访问 Flink Dashboard,确保所有节点正常运行:
flink-conf.yaml
中的资源配置。ntpd
或 chrony
确保所有节点的时钟同步。import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.core.fs.FileSystem; import java.util.Arrays; public class WordCount { public static void main(String[] args) throws Exception { // 获取运行参数 final ParameterTool params = ParameterTool.fromArgs(args); // 设置执行环境 final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 从输入文件中读取数据 env.readTextFile(params.get("input")) // 拆分成单词 .flatMap((String value, Collector<Tuple2<String, Integer>> out) -> { Arrays.stream(value.split("\\W+")) .forEach(word -> out.collect(new Tuple2<>(word, 1))); }) .returns(TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class)) // 按单词分组 .groupBy(0) // 统计每个单词的出现次数 .sum(1) // 将结果保存到输出文件 .writeAsCsv(params.get("output"), FileSystem.WriteMode.OVERWRITE); // 执行程序 env.execute("Word Count Example"); } }
确保你已经安装了 Maven,并在 pom.xml
文件中配置了 Flink 依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.16.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.16.2</version>
</dependency>
编译并打包:
mvn clean package
将输入文件上传到 HDFS 或本地文件系统:
hdfs dfs -put localfile.txt /user/flink/input
运行 Flink 应用:
flink run -c com.example.WordCount target/wordcount-1.0-SNAPSHOT.jar --input /user/flink/input --output /user/flink/output
hdfs dfs -cat /user/flink/output/part-00000
可以使用 Apache Beam 的 Python API 来实现 Flink 版的 Word Count。
pip install apache-beam
创建 wordcount.py
:
import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions def run(): # 创建 PipelineOptions options = PipelineOptions( runner='FlinkRunner', flink_master='localhost:8081', environment_type='LOOPBACK' ) # 创建管道 with beam.Pipeline(options=options) as p: # 读取输入文件 (p | 'Read' >> beam.io.ReadFromText('input.txt') | 'Split' >> beam.FlatMap(lambda x: x.split()) | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum) | 'Write' >> beam.io.WriteToText('output.txt') ) if __name__ == '__main__': run()
python wordcount.py
cat output.txt-00000-of-00001
通过以上步骤,我们成功部署了 Flink 单机和集群环境,并实现了一个简单的 Word Count 应用。Flink 提供了强大的流处理和批处理能力,可以在多种场景下处理实时数据。
ntpd
或 chrony
确保所有节点的时钟同步。通过合理配置和优化,Flink 可以在多种场景下提供高效的流处理和批处理能力,为大规模数据处理提供强有力的支持。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。