当前位置:   article > 正文

【Flink1.14实战】Docker环境 DataStream kafka Source_org.apache.flink.connector.kafka.source.kafkasourc

org.apache.flink.connector.kafka.source.kafkasource

DataStream 连接器

要在应用程序中使用这些连接器之一,通常需要额外的第三方组件,例如用于数据存储或消息队列的服务器。另请注意,虽然本节中列出的流连接器是 Flink 项目的一部分并且包含在源代码版本中,但它们不包含在二进制发行版中。

kafka 连接器

该文档描述的是基于新数据源 API的 Kafka Source。

依赖

Apache Flink 集成了通用的 Kafka 连接器,它会尽力与 Kafka client 的最新版本保持同步。该连接器使用的 Kafka client 版本可能会在 Flink 版本之间发生变化。 当前 Kafka client 向后兼容 0.10.0 或更高版本的 Kafka broker。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.14.4</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

Flink 目前的流连接器还不是二进制发行版的一部分。

Kafka Source

使用方法

Kafka Source 提供了构建类来创建 KafkaSource 的实例。以下代码片段展示了如何构建 KafkaSource 来消费 “input-topic” 最早位点的数据, 使用消费组 “my-group”,并且将 Kafka 消息体反序列化为字符串:

KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

以下属性在构建 KafkaSource 时是必须指定的:

  • Bootstrap server,通过 setBootstrapServers(String) 方法配置
  • 消费者组 ID,通过 setGroupId(String) 配置
  • 要订阅的 Topic / Partition
  • 用于解析 Kafka 消息的反序列化器(Deserializer)

实战

1、编辑 docker-compose.yml

version: "2.1"
services:
  zookeeper:
    image: wurstmeister/zookeeper:3.4.6
  kafka:
    image: wurstmeister/kafka:2.12-2.2.1
    environment:
      KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
      KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_CREATE_TOPICS: "input:2:1, output:2:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    ports:
      - 9092:9092
      - 9094:9094     
  jobmanager:
    image: flink:1.14.4-scala_2.11
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager        

  taskmanager:
    image: flink:1.14.4-scala_2.11
    depends_on:
      - jobmanager
    command: taskmanager
    scale: 1
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 4   
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

2、启动服务

$ docker-compose up -d
  • 1

3、编写kafka数据源程序

package quick;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;


public class KafkaExample {

    public static void main(String[] args) throws Exception{

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        final ParameterTool params = ParameterTool.fromArgs(args);
        String brokers = params.get("bootstrap.servers", "kafka:9092");

        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(brokers)
                .setTopics("input")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStream<Tuple2<String, Integer>> dataStream = env
                .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")
                .flatMap(new Splitter())
                .keyBy(value -> value.f0)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .sum(1);


        dataStream.print();
        env.execute("KafkaExample job");


    }

    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word: sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

4、kafka生产者不断提交数据

$ docker exec -it kafka_kafka_1 /bin/bash 

$ cd /opt/kafka
# 查看已创建的topic信息
$ bin/kafka-topics.sh --describe --topic input --bootstrap-server localhost:9092

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic input
This is a message
This is another message
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

5、flink日志查看

然后,将打包应用程序提交,Flink 的Web UI来提交作业监控集群的状态和正在运行的作业。

参数是 --参数名 值 (中间空格隔开)

程序参数,不用输入
--bootstrap.servers kafka:9092
  • 1
  • 2

只需输入一些单词,然后按回车键即可传入新单词。这些将作为单词统计程序的输入。如果想查看大于 1 的计数,在 5 秒内重复输入相同的单词即可(如果无法快速输入,则可以将窗口大小从 5 秒增加 ☺)。

$  docker-compose logs -f taskmanager
  • 1

查看输出

taskmanager_1  | (This,1)
taskmanager_1  | (message,1)
taskmanager_1  | (a,1)
taskmanager_1  | (is,1)
  • 1
  • 2
  • 3
  • 4
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/木道寻08/article/detail/789339
推荐阅读
相关标签
  

闽ICP备14008679号