赞
踩
flink.apache.org
Flink是有状态的(sateful):Stateful Computations over Data Streams
起源欧洲,后被阿里收购,才在中国普及。在此之前都是用spark。
Flink也是做客户端,Flink on k8s、Yarn、Mesos,目前还是 Flink on Yarn,以后 on k8s.
on k8s.可以实现资源隔离,各个任务不用存在资源抢占。
可以接实时的数据,做流处理;也可以接DB,dfs的数据 ,做批处理。
Flink 更强于做流处理 【DataSet API (Legacy)】,Spark更强于做批处理
批流一体 DataSet-批 、DataStream-流。
高吞吐、低延迟、高性能。
真正的流处理,支持基于Event-time的操作。也支持window操作。
支持带状态的sateful的Exactly-Once(*****)。
位置: documentation/Application Development/DataStream API
DataStream 也是不可变的。
可以通过maven快速创建一个fink工程demo
$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-walkthrough-datastream-java \
-DarchetypeVersion=1.14.4 \
-DgroupId=frauddetection \
-DartifactId=frauddetection \
-Dversion=0.1 \
-Dpackage=spendreport \
-DinteractiveMode=false
https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/datastream_api.html
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 提前引入,不然会报错:No ExecutorFactory found to execute the application,且报错看不出是没有flink-client导致 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
// 隐式转换的包,不然会报错 could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[String] import org.apache.flink.api.scala._ import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment} object BatchWCApp { def main(args: Array[String]): Unit = { // 1. 获取批处理上下文对象 --> SparkContext val env = ExecutionEnvironment.getExecutionEnvironment // 2. 批处理 获取数据 readTextFile val text: DataSet[String] = env.readTextFile("data/wc.txt") // 返回的就是 DataSet // 需要导入隐式转换的包 import org.apache.flink.api.scala._ val result = text.flatMap(_.split(",")) .map((_, 1)) .groupBy(0) // 索引下标 .sum(1) // 索引下标 result.print() } }
import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} object StreamingWCApp { def main(args: Array[String]): Unit = { // 1. 获取流处理上下文对象 val env = StreamExecutionEnvironment.getExecutionEnvironment // 2. 流处理 DataStream val text: DataStream[String] = env.socketTextStream("gargantua", 9527) // 返回的就是 DataStream text.flatMap(_.split(",")) .filter(_.nonEmpty) .map((_, 1)) .keyBy(_._1) .sum(1) .print().setParallelism(1) // 3. 流处理需要 手动 execute env.execute(this.getClass.getSimpleName) // 现在终端启动 [liqiang@Gargantua ~]$ nc -lk 9527 // 再启动main方法 // 在终端输入需要统计的单词(要回车)就能实时统计9527的数据 4>(pig,1) 4>(dog,2) // 4代表并行度 4>(dog,3) } }
public static void main(String[] args) throws Exception { // java 版本对应的 gargantua.basic.BatchWCApp.scala ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSource<String> source = env.readTextFile("data/wc.txt"); // 匿名内部类 // test01(source); // lambda + stream流 test02(source); } private static void test02(DataSource<String> source) throws Exception { System.out.println("------------"); // 生成的 lambda 写法. 注意每一步都要先返回,再做下一步 FlatMapOperator<String, String> flatMapOperator = source.flatMap((FlatMapFunction<String, String>) (s, collector) -> { String[] words = s.split(","); for (String word : words) { collector.collect(word); } }).returns(Types.STRING); // stream流 写法:注意每一步都要先返回,再做下一步 /* FlatMapOperator<String, String> flatMapOperator = source.flatMap((String line, Collector<String> collector) -> { Arrays.stream(line.split(",")).forEach(collector::collect); }).returns(Types.STRING);*/ MapOperator<String, Tuple2<String, Integer>> mapOperator = flatMapOperator.map(x -> Tuple2.of(x, 1)).returns(Types.TUPLE(Types.STRING, Types.INT)); mapOperator.groupBy(0).sum(1).print(); // mapOperator.groupBy(x -> x.f0).sum(1).print();
Anatomy of a Flink Program
获取执行环境 [val env = getExecutionEnvironment()、createLocalEnvironment()]
加载/创建初始数据 [val input = env.readTextFile("data/wc.txt")、env.socketTextStream("localhost", 9999)]
作用此数据的transformations算子 [input.map { x => x.toInt }...]
指定计算结果的存放位置 [writeAsText(path: String))、print()、writeToSocket()、addSink()]
触发程序执行(流处理) [env.execute(this.getClass.getSimpleName)]
获取批处理执行环境 ,如同SparkConf、SparkContext
val env = ExecutionEnvironment.getExecutionEnvironment
获取流处理执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
获取环境时可以指定让这个任务展示到Web UI。
引入依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
</dependency>
获取带Web UI 的 Environment
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
默认启动在8081端口。如果有打开的UI页面提前打开,或前一次没有关闭,但后台重启,会报错。需要把UI关掉再打开。
也可以自己设置端口号:
val configuration = new Configuration()
configuration.setInteger("rest.port",7777);
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration)
其实就用val env = StreamExecutionEnvironment.getExecutionEnvironment也会有Web UI界面的,不过端口不固定,需要通过启动日志查看。
基于文件(批处理):
readTextFile()
readFile(fileInputFormat, path)
readFile(fileInputFormat, path, watchType, interval, pathFilter)
基于socket(流处理):
socketTextStream
基于scala/java集合
// 内置
fromCollection(Seq) // 单并行,
fromCollection(Iterator)
fromElements(elements: _*)
fromParallelCollection(SplittableIterator) // 并行,多个task
generateSequence(from, to)
// 自定义
addSource
map、 filter 、keyby 、window...
writeAsText() / TextOutputFormat
writeAsCsv(...) / CsvOutputFormat
print() / printToErr()
writeUsingOutputFormat() / FileOutputFormat
writeToSocket
addSink
env.socketTextStream 、env.fromCollection(List()) 、fromElements 的时候,并行度是1, 意味着不能并行接收。
且不能自己强制设置并行度,因为源码设置为1,或只要实现SourceFuncation的也都是单并行。
env.readTextFile() 、env.fromParallelCollection()、env.generateSequence()是多并行度读取。
对带并行度的数据源ParallSourceFunction如果不指定并行度,就会使用当前机器的CPU线程数。会把资源占尽。
transformation : fliter、flatmap 等,如果不指定并行度,也是取决于CPU线程。
print():如果不指定并行度,也是取决于CPU线程。
(仅针对能设置并行度时)
env阶段: env.setParallelism(2)
source阶段 : env.addSource(...).setParallelism(2)
transformation阶段: 默认是用完CPU,在生产一般都要自己重新设置。
sink阶段:
内置的数据源如fromCollection(),底层也是用的addSource()。
对于xxxSourceFunction,有很多,都是常用Function的子接口和子类,如可以并行的ParalleSourceFunction、RichSourceFunction、RichParalleSourceFunction
继承自单并行度的自定义sourceFunction只能串行,继承自多并行度的自定义ParalleSourceFunction默认并行度为CPU线程数,也可以自己setParallelism。
自定义sourceFunction,需要实现run() 和cancel() 方法。源码有demo参考。
继承自RichFunction(增强)的自定义sourceFunction,除了run() 和cancel() ,还可以实现open()和close()方法(生命周期方法)。open是每一个并行度执行前都会调用的,如自定义关系型数据源(如mysql),可以在open中加载连接。
导入依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
新API可以直接使用KafkaSource.builder() 出一个kafkaSource。是多并行度的。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val source = KafkaSource.builder()
.setBootstrapServers("hadoop000:9092,hadoop000:9093,hadoop000:9094")
.setTopics("flinktopic")
.setGroupId("ruozedata_flink_topic_group") // pk-group
.setStartingOffsets(OffsetsInitializer.latest()) // earliest
.setValueOnlyDeserializer(new SimpleStringSchema())
.build()
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source").print()
Map、FlatMap、Filter、KeyBy、Reduce、Window、WindowAll 、Window Apply、WindowReduce、Union、Window Join、Interval Join 、Window CoGroup、Connect、CoMap, CoFlatMap、Iterate
map 是将一个 DataStream转另一个 DataStream
.map(_*2)的底层操作是
.transform("my map",new SteamMap(new MapFunction(
override def map(value:Int) = value * 2
))
union: 两个或多个数据源 合并成一个数据源。 union时是必须要相同的数据类型
自己合并自己会得到两倍…
connect:两个数据源,关联成一个,但是内部其实两个流还是独立,知识可以共用State状态信息,且两个流数据类型还可以不一样
connect vs union
1) 合并后:一个流 / 多个流
2) 数据类型:是否一定要相同的数据类型
3) 个数问题:connect的map操作,也是会有两个参数,分开操作
需求:两个流分别做,stream1转大写,stream转Int
stream1.connect(stream2).map(new CoMapFunction[String,Int, String] {
override def map1(value: String): String = value.toUpperCase()
override def map2(value: Int): String = value * 10 + ""
})
自定义分区器MyPartitioner,继承Partitioner,重写partition方法,在其中指定分区规则。
通过partitionCustom指定使用分区
.partitionCustom(new MyPartitioner, x => x._1)
Flink03:https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/
自定义 Sink。继承RichSinkFunction,重写open方法和invoke方法,在invoke方法中指定具体逻辑输出。
输出到文件系统,由于多并行,会产生很多小文件夹。流处理就不适合输出文件系统。
但是setParallelism(1)时可以输出到一个文件
需要引入依赖(和source是同一个)
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
</dependency>
官网提供API可以直接使用KafkaSource.builder() 出一个sink,使用sinkTo输出到kafka
val sink: KafkaSink[String] = KafkaSink.builder()
.setBootstrapServers("hadoop000:9092,hadoop000:9093,hadoop000:9094")
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("flinktopic")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
).build()
需要引入依赖
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
API只提供了RedisSink,还需要自己实现RedisMapper。
参考官网。
accessStream.writeToSocket("gargantua", 9526, new SimpleStringSchema())
输出到 nc - lk 9526 的窗口了
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。