赞
踩
事务处理:java 后端->数据库
分析处理:离线数仓
有状态的流式处理:实时处理
流处理的演变:第二代流失处理架构(lambda)
延迟:flink 毫秒级延迟,sparkStreaming 秒级延迟
架构:flink 真正的流,sparkStreaming 微批
flink没有Stage的概念,每个个节点的计算不需要等待。
dataSet是数据集不是数据流
package com.atguigu.wc;/** * Copyright (c) 2018-2028 尚硅谷 All Rights Reserved * <p> * Project: FlinkTutorial * Package: com.atguigu.wc * Version: 1.0 * <p> * Created by wushengran on 2020/11/6 11:22 */ import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.util.Collector; /** * @ClassName: WordCount * @Description: * @Author: wushengran on 2020/11/6 11:22 * @Version: 1.0 */ // 批处理word count public class WordCount { public static void main(String[] args) throws Exception{ // 创建执行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //data/sensor.txt // 从文件中读取数据 // String inputPath = "D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\hello.txt"; String inputPath = "data/hello.txt"; DataSet<String> inputDataSet = env.readTextFile(inputPath); // 对数据集进行处理,按空格分词展开,转换成(word, 1)二元组进行统计 DataSet<Tuple2<String, Integer>> resultSet = inputDataSet.flatMap(new MyFlatMapper()) .groupBy(0) // 按照第一个位置的word分组 .sum(1); // 将第二个位置上的数据求和 resultSet.print(); } // 自定义类,实现FlatMapFunction接口 public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { // 按空格分词 String[] words = value.split(" "); // 遍历所有word,包成二元组输出 for (String word : words) { out.collect(new Tuple2<>(word, 1)); } } } }
流式处理API
package com.atguigu.wc;/** * Copyright (c) 2018-2028 尚硅谷 All Rights Reserved * <p> * Project: FlinkTutorial * Package: com.atguigu.wc * Version: 1.0 * <p> * Created by wushengran on 2020/11/6 11:48 */ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.net.URL; /** * @ClassName: StreamWordCount * @Description: * @Author: wushengran on 2020/11/6 11:48 * @Version: 1.0 */ public class StreamWordCount { public static void main(String[] args) throws Exception{ // 创建流处理执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // env.setParallelism(1); // env.disableOperatorChaining(); // // 从文件中读取数据 // String inputPath = "D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\hello.txt"; // DataStream<String> inputDataStream = env.readTextFile(inputPath); // 用parameter tool工具从程序启动参数中提取配置项 ParameterTool parameterTool = ParameterTool.fromArgs(args); String host = parameterTool.get("host"); int port = parameterTool.getInt("port"); // 从socket文本流读取数据 DataStream<String> inputDataStream = env.socketTextStream(host, port); // 基于数据流进行转换计算 DataStream<Tuple2<String, Integer>> resultStream = inputDataStream.flatMap(new WordCount.MyFlatMapper()).slotSharingGroup("green") .keyBy(0) .sum(1).setParallelism(2).slotSharingGroup("red"); resultStream.print().setParallelism(1); // 执行任务 env.execute(); } }
nc -lk 7777
flink下载
https://flink.apache.org/zh/
flink 1.10之前的版本都包含了对hadoop的依赖
flink1.10之后的版本flink的安装包和hadoop的依赖分离开来了
flink/conf/flink-conf.yaml的主要内容
解压缩 flink-1.10.1-bin-scala_2.12.tgz,进入 conf 目录中。
1)修改 flink/conf/flink-conf.yaml 文件:
3)分发给另外两台机子:
4)启动 :
flink/conf/flink-conf.yaml
访问 http://localhost:8081 可以对 flink 集群和任务进行监控管理。
web页面提交任务
task任务需要的taskSloat 一般等于执行任务需要最大一个并行度
命令提交任务
准备数据文件(如果需要)
把含数据文件的文件夹,分发到 taskmanage 机器
如 果 从 文 件 中 读 取 数 据 , 由 于 是 从 本 地 磁 盘 读 取 , 实 际 任 务 会 被 分 发 到
taskmanage 的机器中,所以要把目标文件分发
执行程序
./flink run -p 2 -c com.atguigu.wc.StreamWordCount /root/ysw/flink/jar/FlinkTutorial-1.0-SNAPSHOT.jar --host kafka1 --port 6666
4) 查看计算结果
注意:如果输出到控制台,应该在 taskmanager 下查看;如果计算结果输出到文
件,同样会保存到 taskmanage 的机器下,不会在 jobmanage 下。
现在流行的资源管理平台主要是yarn和k8s
/yarn-session.sh -n 2 -s 2 -jm 1024 -tm 1024 -nm test -d
3) 执行任务
./flink run -c com.atguigu.wc.StreamWordCount FlinkTutorial-1.0-SNAPSHOT-jar-with-dependencies.jar --host lcoalhost –port 7777
yarn application --kill application_1577588252906_0001
./flink run –m yarn-cluster -c com.atguigu.wc.StreamWordCount FlinkTutorial-1.0-SNAPSHOT-jar-with-dependencies.jar --host lcoalhost –port 7777
// 启动 jobmanager-service 服务
kubectl create -f jobmanager-service.yaml
// 启动 jobmanager-deployment 服务
kubectl create -f jobmanager-deployment.yaml
// 启动 taskmanager-deployment 服务
kubectl create -f taskmanager-deployment.yaml
4)访问 Flink UI 页面
集群启动后,就可以通过 JobManagerServicers 中配置的 WebUI 端口,用浏览器
输入以下 url 来访问 Flink UI 页面了:
http://{JobManagerHost:Port}/api/v1/namespaces/default/services/flink-jobmanage
r:ui/proxy
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。