赞
踩
要在应用程序中使用这些连接器之一,通常需要额外的第三方组件,例如用于数据存储或消息队列的服务器。另请注意,虽然本节中列出的流连接器是 Flink 项目的一部分并且包含在源代码版本中,但它们不包含在二进制发行版中。
该文档描述的是基于新数据源 API的 Kafka Source。
依赖
Apache Flink 集成了通用的 Kafka 连接器,它会尽力与 Kafka client 的最新版本保持同步。该连接器使用的 Kafka client 版本可能会在 Flink 版本之间发生变化。 当前 Kafka client 向后兼容 0.10.0 或更高版本的 Kafka broker。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.14.4</version>
</dependency>
Flink 目前的流连接器还不是二进制发行版的一部分。
使用方法
Kafka Source 提供了构建类来创建 KafkaSource
的实例。以下代码片段展示了如何构建 KafkaSource
来消费 “input-topic” 最早位点的数据, 使用消费组 “my-group”,并且将 Kafka 消息体反序列化为字符串:
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("input-topic")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
以下属性在构建 KafkaSource 时是必须指定的:
setBootstrapServers(String)
方法配置setGroupId(String)
配置实战
1、编辑 docker-compose.yml
version: "2.1" services: zookeeper: image: wurstmeister/zookeeper:3.4.6 kafka: image: wurstmeister/kafka:2.12-2.2.1 environment: KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://:9094 KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE KAFKA_CREATE_TOPICS: "input:2:1, output:2:1" KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 ports: - 9092:9092 - 9094:9094 jobmanager: image: flink:1.14.4-scala_2.11 ports: - "8081:8081" command: jobmanager environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager: image: flink:1.14.4-scala_2.11 depends_on: - jobmanager command: taskmanager scale: 1 environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager.numberOfTaskSlots: 4
2、启动服务
$ docker-compose up -d
3、编写kafka数据源程序
package quick; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; public class KafkaExample { public static void main(String[] args) throws Exception{ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final ParameterTool params = ParameterTool.fromArgs(args); String brokers = params.get("bootstrap.servers", "kafka:9092"); KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(brokers) .setTopics("input") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); DataStream<Tuple2<String, Integer>> dataStream = env .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source") .flatMap(new Splitter()) .keyBy(value -> value.f0) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .sum(1); dataStream.print(); env.execute("KafkaExample job"); } public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception { for (String word: sentence.split(" ")) { out.collect(new Tuple2<String, Integer>(word, 1)); } } } }
4、kafka生产者不断提交数据
$ docker exec -it kafka_kafka_1 /bin/bash
$ cd /opt/kafka
# 查看已创建的topic信息
$ bin/kafka-topics.sh --describe --topic input --bootstrap-server localhost:9092
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic input
This is a message
This is another message
5、flink日志查看
然后,将打包应用程序提交,Flink 的Web UI来提交作业监控集群的状态和正在运行的作业。
参数是 --参数名 值 (中间空格隔开)
程序参数,不用输入
--bootstrap.servers kafka:9092
只需输入一些单词,然后按回车键即可传入新单词。这些将作为单词统计程序的输入。如果想查看大于 1 的计数,在 5 秒内重复输入相同的单词即可(如果无法快速输入,则可以将窗口大小从 5 秒增加 ☺)。
$ docker-compose logs -f taskmanager
查看输出
taskmanager_1 | (This,1)
taskmanager_1 | (message,1)
taskmanager_1 | (a,1)
taskmanager_1 | (is,1)
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。