赞
踩
参考《Flink原理、实战与性能优化》
将作业提交到jobmanager
用户没提交一个flink程序就会创建一个client,client会将flink程序翻译成一个JobGraph
整个集群的master节点,负责整个flink集群的任务调度和资源管理,整个集群有且仅有一个活跃的JobManager。
从客户端获取提交的应用,根据TaskManager上TaskSlot使用情况,为提交的作业分配TaskSlot资源,并命令TaskManager启动应用。
所有checkpoints协调过程都在JobManager完成,每个taskmanager收到触发命令后,完成checkpoints操作。
负责具体任务执行(计算)和对应任务在每个节点资源的申请和管理
source - transformation -sink
1.任意Java原生基本数据类型(装箱)
2.任意Java原生基本数据类型(装箱)数组
3.java Tuples
4.Scala Case Class
5.POJO
等
flink中时间分为三个概念
事件生成时间(Event Time):在设备上发生事件的时间
事件接入时间(Ingestion Time):接入flink系统的时间
事件处理时间(Processing Time):执行算子的时间
EventTime 和 Watermark
原因:防止由于网络或系统等因素,造成事件数据不能及时到flink系统。
简述:设置Watermark(最大延迟间隔),如果数据没有全部到达,则一直等待。
EventTime和Watermark
Flink的时间与watermarks详解
有状态的计算
flink程序运行中存储中间计算结果给后面的算子、function使用。存储可以是flink堆内对外内存或第三方存储介质。同理无状态计算不保存中间结果
适用场景
用户想获取某一特定事件规则的时间、按照时间窗口求最大值、机器学习、使用历史数据计算等
Flink状态类型
根据数据集中是否按key分区分为KeyedState、OperatorState(NonkeyedState),这两种状态均具有两种形式,托管状态(ManagedState)形式 和 原生状态(RawState)形式
ManagedState | 托管状态 | 由Flink Runtime控制管理,将状态转换为内存Hash tables或RocksDB | 通过内部接口持久化到checkpoints |
RawState | 原生状态 | 算子自己管理数据结构 | 将数据转化成bytes存储在checkpoints中 |
Checkpoints和Savepoints
异步轻量级分布式快照技术Checkpoints容错机制:
以手工命令的方式触发Checkpoint,并将结果持久化到指定的存储路径中的Savepoint机制:
Flink中提供了StateBackend来存储和管理Checkpoints过程中的状态数据:包括基于内存的MemoryStateBackend、基于文件系统的FsStateBackend,以及基于RockDB作为存储介质的RocksDBStateBackend
checkpoint优化
官网下jar包:https://flink.apache.org/zh/downloads.html
注意,根据Hadoop、scala版本选择flink,避免包冲突
本地启动flink:
./flink-1.7.1/bin/start-cluster.sh
本地停止flink:
./flink-1.7.1/bin/stop-cluster.sh
检验启动成功(默认端口8081):
http://localhost:8081/
maven 依赖
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <!-- This dependency is required to actually execute jobs. It is currently pulled in by flink-streaming-java, but we explicitly depend on it to safeguard against future changes. --> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> </dependencies>
java code
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; public class wordcount { public static void main(String[] args) throws Exception{ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSource<String> text = env.fromElements( "To be, or not to be,--that is the question:--", "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune", "Or to take arms against a sea of troubles," ); DataSet<Tuple2<String,Integer>> counts = text.flatMap(new LineSplitter()) .groupBy(0) .sum(1); counts.print(); } public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { String[] tokens = value.toLowerCase().split("\\W+");//^\w中\w表示字符类(包括大小写字母,数字),后面的+号的作用在前一个字符上,即\w+,表示一个或多个\w,最少一个 for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1)); } } } } }
如何用flink启动
~/soft/flink-1.7.1/bin/flink run -c 包名.类名 jar包名.jar
/* 运行命令: cd ~/工作/idea_project/lazy_project/java_project/demo && mvn clean install cd ~/工作/idea_project/lazy_project/java_project/demo/target && ~/soft/flink-1.7.1/bin/flink run -c base.dataStream.StreamingDemoWithMyRichPralalleSource demo-1.0-SNAPSHOT.jar */ package base.dataStream; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.windowing.time.Time; public class StreamingDemoWithMyRichPralalleSource { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); // DataStreamSource<Long> streamSource = executionEnvironment.addSource(new MyParallesource()).setParallelism(1); DataStreamSource<Long> streamSource = executionEnvironment.addSource(new MyNoParalleSource()).setParallelism(1); SingleOutputStreamOperator<Long> operator = streamSource.map(new MyMapFunction()); operator.timeWindowAll(Time.seconds(2)).sum(0).print().setParallelism(1); String jobName = StreamingDemoWithMyRichPralalleSource.class.getSimpleName(); executionEnvironment.execute(jobName); } } class MyParallesource implements ParallelSourceFunction<Long>{ private long count = 1L; private boolean isRunning = true; @Override public void run(SourceContext sourceContext) throws Exception { while (isRunning){ sourceContext.collect(count ++); Thread.sleep(1000); } } @Override public void cancel() { isRunning = false; } } //使用并行度为1的source class MyNoParalleSource implements SourceFunction<Long> {//1 private long count = 1L; private boolean isRunning = true; /** * 主要的方法 * 启动一个source * 大部分情况下,都需要在这个run方法中实现一个循环,这样就可以循环产生数据了 * * @param ctx * @throws Exception */ @Override public void run(SourceContext<Long> ctx) throws Exception { while(isRunning){ ctx.collect(count); count++; //每秒产生一条数据 Thread.sleep(1000); } } //取消一个cancel的时候会调用的方法 @Override public void cancel() { isRunning = false; } } class MyMapFunction implements MapFunction<Long,Long>{ @Override public Long map(Long value) throws Exception { System.out.println("接收到数据:" + value); return value; } }
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.7.0</version>
</dependency>
/* 启动本地zk,并查看状态,仅执行一次即可: sh ~/soft/zookeeper-3.4.9/bin/zkServer.sh start && sh ~/soft/zookeeper-3.4.9/bin/zkServer.sh status 停止运行本地zk: sh ~/soft/zookeeper-3.4.9/bin/zkServer.sh stop */ /* 启动本地kafka(先启动zookeeper): nohup sh ~/soft/kafka_2.11-1.0.1/bin/kafka-server-start.sh ~/soft/kafka_2.11-1.0.1/config/server.properties & 查看是否成功(监听端口): lsof -i:9092 关闭本地kafka: sh ~/soft/kafka_2.11-1.0.1/bin/kafka-server-stop.sh */ /* 创建kafka topic(topic_name需要给为用户定义的topic名字): sh ~/soft/kafka_2.11-1.0.1/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic 【topic_name】 ; sh ~/soft/kafka_2.11-1.0.1/bin/kafka-topics.sh --list --zookeeper localhost:2181 删除topic sh ~/soft/kafka_2.11-1.0.1/bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic 【topic_name】 && sh ~/soft/kafka_2.11-1.0.1/bin/kafka-topics.sh --list --zookeeper localhost:2181 查看topic list: sh ~/soft/kafka_2.11-1.0.1/bin/kafka-topics.sh --list --zookeeper localhost:2181 */ /* 测试kafka生产消费: 创建控制台生产者: sh ~/soft/kafka_2.11-1.0.1/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 【topic_name】 创建消费者: sh ~/soft/kafka_2.11-1.0.1/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 【topic_name】 --from-beginning */ package base.dataStream; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import java.util.ArrayList; import java.util.Properties; import java.util.Random; public class kafkaDataStream1 { public static final String TOPIC_NAME = "topic_test"; public static Properties getProperties(){ Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); return properties; } public static FlinkKafkaProducer<String> getProducer(){ Properties properties = getProperties(); FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>(kafkaDataStream1.TOPIC_NAME, new SimpleStringSchema(), properties); return producer; } public static FlinkKafkaConsumer<String> getConsumer(){ return new FlinkKafkaConsumer(TOPIC_NAME,new SimpleStringSchema(),getProperties()); } } class KafkaProducerDemo1{ public static void main(String[] args) throws Exception{ StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> source = executionEnvironment.addSource(new MyNoParalleSourceDemo1()); FlinkKafkaProducer<String> producer = kafkaDataStream1.getProducer(); source.addSink(producer); executionEnvironment.execute(); } } class KafkaConsumerDemo1{ public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); FlinkKafkaConsumer<String> consumer = kafkaDataStream1.getConsumer(); consumer.setStartFromLatest(); env.addSource(consumer).print(); env.execute(); } } class MyNoParalleSourceDemo1 implements SourceFunction<String> { boolean isRuning = true; @Override public void run(SourceContext<String> sourceContext) throws Exception { while (isRuning == true){ ArrayList<String> list = new ArrayList<>(); list.add("第1条"); list.add("第2条"); list.add("第3条"); list.add("第4条"); list.add("第5条"); int i = new Random().nextInt(5); System.out.println("============"+list.get(i)); sourceContext.collect(list.get(i)); Thread.sleep(2000); } } @Override public void cancel() { isRuning = false; } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。