当前位置:   article > 正文

Flink 【一】_flink-connector-kafka

flink-connector-kafka

flink.apache.org
Flink是有状态的(sateful):Stateful Computations over Data Streams

起源欧洲,后被阿里收购,才在中国普及。在此之前都是用spark。

Flink也是做客户端,Flink on k8s、Yarn、Mesos,目前还是 Flink on Yarn,以后 on k8s.
on k8s.可以实现资源隔离,各个任务不用存在资源抢占。

可以接实时的数据,做流处理;也可以接DB,dfs的数据 ,做批处理。

Flink 更强于做流处理 【DataSet API (Legacy)】,Spark更强于做批处理

Flink 的特点

批流一体 DataSet-批 、DataStream-流。
高吞吐、低延迟、高性能。
真正的流处理,支持基于Event-time的操作。也支持window操作。
支持带状态的sateful的Exactly-Once(*****)。

DataStream

位置: documentation/Application Development/DataStream API
DataStream 也是不可变的。

简单上手

可以通过maven快速创建一个fink工程demo

$ mvn archetype:generate \
    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-walkthrough-datastream-java \
    -DarchetypeVersion=1.14.4 \
    -DgroupId=frauddetection \
    -DartifactId=frauddetection \
    -Dversion=0.1 \
    -Dpackage=spendreport \
    -DinteractiveMode=false
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/datastream_api.html

Example
  1. 引入依赖
 <dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-streaming-scala_2.12</artifactId>
     <version>${flink.version}</version>
 </dependency>
  <!--  提前引入,不然会报错:No ExecutorFactory found to execute the application,且报错看不出是没有flink-client导致  -->
 <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.12</artifactId> 
    <version>${flink.version}</version>
 </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  1. 批处理
// 隐式转换的包,不然会报错 could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[String]
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}

object BatchWCApp {

  def main(args: Array[String]): Unit = {

    // 1. 获取批处理上下文对象  --> SparkContext
    val env = ExecutionEnvironment.getExecutionEnvironment

    // 2. 批处理 获取数据 readTextFile
    val text: DataSet[String] = env.readTextFile("data/wc.txt") // 返回的就是 DataSet

    // 需要导入隐式转换的包  import org.apache.flink.api.scala._
    val result = text.flatMap(_.split(","))
      .map((_, 1))
      .groupBy(0)  // 索引下标
      .sum(1)  // 索引下标
      
    result.print()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  1. 流处理
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object StreamingWCApp {

  def main(args: Array[String]): Unit = {
    // 1. 获取流处理上下文对象
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 2. 流处理 DataStream
    val text: DataStream[String] = env.socketTextStream("gargantua", 9527) // 返回的就是 DataStream
    
    text.flatMap(_.split(","))
      .filter(_.nonEmpty)
      .map((_, 1))
      .keyBy(_._1)
      .sum(1)
      .print().setParallelism(1)

    // 3. 流处理需要 手动 execute
    env.execute(this.getClass.getSimpleName)

    // 现在终端启动 [liqiang@Gargantua ~]$ nc -lk 9527
    // 再启动main方法
    // 在终端输入需要统计的单词(要回车)就能实时统计9527的数据
	
	4>(pig,1)
    4>(dog,2)  // 4代表并行度
    4>(dog,3) 
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  1. java 版本的批处理
public static void main(String[] args) throws Exception {
    // java 版本对应的 gargantua.basic.BatchWCApp.scala
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    DataSource<String> source = env.readTextFile("data/wc.txt");
    // 匿名内部类
    // test01(source);
    // lambda + stream流
    test02(source);
}
private static void test02(DataSource<String> source) throws Exception {
    System.out.println("------------");
    // 生成的 lambda 写法. 注意每一步都要先返回,再做下一步
    FlatMapOperator<String, String> flatMapOperator = source.flatMap((FlatMapFunction<String, String>) (s, collector) -> {
        String[] words = s.split(",");
        for (String word : words) {
            collector.collect(word);
        }
    }).returns(Types.STRING);
    
    //  stream流 写法:注意每一步都要先返回,再做下一步
   /* FlatMapOperator<String, String> flatMapOperator = source.flatMap((String line, Collector<String> collector) -> {
        Arrays.stream(line.split(",")).forEach(collector::collect);
    }).returns(Types.STRING);*/
    
    MapOperator<String, Tuple2<String, Integer>> mapOperator = flatMapOperator.map(x -> Tuple2.of(x, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));
    mapOperator.groupBy(0).sum(1).print();
    // mapOperator.groupBy(x -> x.f0).sum(1).print();

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  1. java 版本的流处理
Flink 的编程模型

Anatomy of a Flink Program

获取执行环境 [val env = getExecutionEnvironment()、createLocalEnvironment()]
加载/创建初始数据  [val input = env.readTextFile("data/wc.txt")、env.socketTextStream("localhost", 9999)]
作用此数据的transformations算子 [input.map { x => x.toInt }...]
指定计算结果的存放位置 [writeAsText(path: String))、print()、writeToSocket()、addSink()]
触发程序执行(流处理)  [env.execute(this.getClass.getSimpleName)]
  • 1
  • 2
  • 3
  • 4
  • 5
获取执行环境
  1. 获取批处理执行环境 ,如同SparkConf、SparkContext

    val env = ExecutionEnvironment.getExecutionEnvironment
    
    • 1
  2. 获取流处理执行环境

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    
    • 1
  3. 获取环境时可以指定让这个任务展示到Web UI。
    引入依赖

     <dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
     </dependency>
    
    • 1
    • 2
    • 3
    • 4

    获取带Web UI 的 Environment

    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
    
    • 1

    默认启动在8081端口。如果有打开的UI页面提前打开,或前一次没有关闭,但后台重启,会报错。需要把UI关掉再打开。
    也可以自己设置端口号:

    val configuration = new Configuration()
    configuration.setInteger("rest.port",7777);
    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration)
    
    • 1
    • 2
    • 3

    其实就用val env = StreamExecutionEnvironment.getExecutionEnvironment也会有Web UI界面的,不过端口不固定,需要通过启动日志查看。

数据源

基于文件(批处理):

readTextFile()
readFile(fileInputFormat, path)
readFile(fileInputFormat, path, watchType, interval, pathFilter) 
  • 1
  • 2
  • 3

基于socket(流处理):

socketTextStream 
  • 1

基于scala/java集合

// 内置
fromCollection(Seq)  // 单并行,
fromCollection(Iterator)
fromElements(elements: _*)
fromParallelCollection(SplittableIterator)  // 并行,多个task
generateSequence(from, to)
// 自定义
addSource
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
Transformations
map、 filter 、keyby 、window...
  • 1
数据输出
writeAsText() / TextOutputFormat
writeAsCsv(...) / CsvOutputFormat
print() / printToErr() 
writeUsingOutputFormat() / FileOutputFormat
writeToSocket 
addSink
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
并行度
获取当前并行度:stream.parallelism

env.socketTextStream 、env.fromCollection(List()) 、fromElements 的时候,并行度是1, 意味着不能并行接收。
且不能自己强制设置并行度,因为源码设置为1,或只要实现SourceFuncation的也都是单并行。

env.readTextFile() 、env.fromParallelCollection()、env.generateSequence()是多并行度读取。

对带并行度的数据源ParallSourceFunction如果不指定并行度,就会使用当前机器的CPU线程数。会把资源占尽。
transformation : fliter、flatmap 等,如果不指定并行度,也是取决于CPU线程。
print():如果不指定并行度,也是取决于CPU线程。

设置并行度

(仅针对能设置并行度时)

env阶段: env.setParallelism(2)  
source阶段 :  env.addSource(...).setParallelism(2)
transformation阶段: 默认是用完CPU,在生产一般都要自己重新设置。
sink阶段:
  • 1
  • 2
  • 3
  • 4
自定义数据源 addSource

内置的数据源如fromCollection(),底层也是用的addSource()。

定义数据源 addSource(new xxxSourceFunction())

对于xxxSourceFunction,有很多,都是常用Function的子接口和子类,如可以并行的ParalleSourceFunction、RichSourceFunction、RichParalleSourceFunction

继承自单并行度的自定义sourceFunction只能串行,继承自多并行度的自定义ParalleSourceFunction默认并行度为CPU线程数,也可以自己setParallelism。

自定义sourceFunction,需要实现run() 和cancel() 方法。源码有demo参考。

继承自RichFunction(增强)的自定义sourceFunction,除了run() 和cancel() ,还可以实现open()和close()方法(生命周期方法)。open是每一个并行度执行前都会调用的,如自定义关系型数据源(如mysql),可以在open中加载连接。

  • jdbc 读取数据源到source
    继承RichSourceFunction,重写以下方法:在open方法中加载驱动、获取jdbc连接、准备查询语句,在run 方法执行SQL语句,在close中关闭连接释放资源。
kafka 读取数据源到source

导入依赖

 <dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-connector-kafka_2.12</artifactId>
     <version>${flink.version}</version>
 </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

新API可以直接使用KafkaSource.builder() 出一个kafkaSource。是多并行度的。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val source = KafkaSource.builder()
	.setBootstrapServers("hadoop000:9092,hadoop000:9093,hadoop000:9094")
	 .setTopics("flinktopic")
	 .setGroupId("ruozedata_flink_topic_group")       // pk-group
	 .setStartingOffsets(OffsetsInitializer.latest()) // earliest
	 .setValueOnlyDeserializer(new SimpleStringSchema())
	 .build()
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source").print()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
Transformations

Map、FlatMap、Filter、KeyBy、Reduce、Window、WindowAll 、Window Apply、WindowReduce、Union、Window Join、Interval Join 、Window CoGroup、Connect、CoMap, CoFlatMap、Iterate

map 是将一个 DataStream转另一个 DataStream
.map(_*2)的底层操作是

.transform("my map",new SteamMap(new MapFunction(
	override def map(value:Int) = value * 2
))
  • 1
  • 2
  • 3

union: 两个或多个数据源 合并成一个数据源。 union时是必须要相同的数据类型
自己合并自己会得到两倍…

connect:两个数据源,关联成一个,但是内部其实两个流还是独立,知识可以共用State状态信息,且两个流数据类型还可以不一样

connect vs union
1) 合并后:一个流 / 多个流
2) 数据类型:是否一定要相同的数据类型
3) 个数问题:connect的map操作,也是会有两个参数,分开操作
  • 1
  • 2
  • 3
  • 4

需求:两个流分别做,stream1转大写,stream转Int

stream1.connect(stream2).map(new CoMapFunction[String,Int, String] {
    override def map1(value: String): String = value.toUpperCase()
    override def map2(value: Int): String = value * 10 + ""
})
  • 1
  • 2
  • 3
  • 4
分区器 Partitioner

自定义分区器MyPartitioner,继承Partitioner,重写partition方法,在其中指定分区规则。

    通过partitionCustom指定使用分区

    .partitionCustom(new MyPartitioner, x => x._1)
    
    • 1
    Sink

    Flink03:https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/

    自定义 Sink。继承RichSinkFunction,重写open方法和invoke方法,在invoke方法中指定具体逻辑输出。

    输出到文件

    输出到文件系统,由于多并行,会产生很多小文件夹。流处理就不适合输出文件系统。
    但是setParallelism(1)时可以输出到一个文件

    输出到 kafka

    需要引入依赖(和source是同一个)

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4

    官网提供API可以直接使用KafkaSource.builder() 出一个sink,使用sinkTo输出到kafka

    val sink: KafkaSink[String] = KafkaSink.builder()
      .setBootstrapServers("hadoop000:9092,hadoop000:9093,hadoop000:9094")
      .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
      .setRecordSerializer(KafkaRecordSerializationSchema.builder()
        .setTopic("flinktopic")
        .setValueSerializationSchema(new SimpleStringSchema())
        .build()
      ).build()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    输出到 redis

    需要引入依赖

     <dependency>
         <groupId>org.apache.bahir</groupId>
         <artifactId>flink-connector-redis_2.11</artifactId>
         <version>1.0</version>
     </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    API只提供了RedisSink,还需要自己实现RedisMapper。

    数据到 jdbc

    参考官网。

      输出到 socket
       accessStream.writeToSocket("gargantua", 9526, new SimpleStringSchema())
      
      • 1

      输出到 nc - lk 9526 的窗口了

      声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/493538
      推荐阅读
      相关标签
        

      闽ICP备14008679号