当前位置:   article > 正文

6.2、Flink数据写入到Kafka_flink写kafka

flink写kafka

目录

1、添加POM依赖

2、API使用说明

3、序列化器

3.1 使用预定义的序列化器

3.2 使用自定义的序列化器

4、容错保证级别

4.1 至少一次 的配置

4.2 精确一次 的配置

5、这是一个完整的入门案例


1、添加POM依赖

Apache Flink 集成了通用的 Kafka 连接器,使用时需要根据生产环境的版本引入相应的依赖

  1. <!-- 引入 kafka连接器依赖-->
  2. <dependency>
  3. <groupId>org.apache.flink</groupId>
  4. <artifactId>flink-connector-kafka</artifactId>
  5. <version>1.17.1</version>
  6. </dependency>

2、API使用说明

KafkaSink 可将数据流写入一个或多个 Kafka topic。

官网链接:官网链接

  1. DataStream<String> stream = ...;
  2. KafkaSink<String> sink = KafkaSink.<String>builder() // 泛型为 输入输入的类型
  3. // TODO 必填项:配置 kafka 的地址和端口
  4. .setBootstrapServers(brokers)
  5. // TODO 必填项:配置消息序列化器信息 Topic名称、消息序列化器类型
  6. .setRecordSerializer(KafkaRecordSerializationSchema.builder()
  7. .setTopic("topic-name")
  8. .setValueSerializationSchema(new SimpleStringSchema())
  9. .build()
  10. )
  11. // TODO 必填项:配置容错保证级别 精准一次、至少一次、不做任何保证
  12. .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
  13. .build();
  14. stream.sinkTo(sink);

3、序列化

序列化器的作用是将flink数据转换成 kafka的ProducerRecord

3.1 使用预定义的序列化器

功能:将 DataStream 数据转换为 Kafka消息中的value,key为默认值null,timestamp为默认值

  1. // 初始化 KafkaSink 实例
  2. KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
  3. // TODO 必填项:配置 kafka 的地址和端口
  4. .setBootstrapServers("worker01:9092")
  5. // TODO 必填项:配置消息序列化器信息 Topic名称、消息序列化器类型
  6. .setRecordSerializer(
  7. KafkaRecordSerializationSchema.<String>builder()
  8. .setTopic("20230912")
  9. .setValueSerializationSchema(new SimpleStringSchema())
  10. .build()
  11. )
  12. .build();

3.2 使用自定义的序列化器

功能:可以对 kafka消息的key、value、partition、timestamp进行赋值

  1. /**
  2. * 如果要指定写入kafka的key,可以自定义序列化器:
  3. * 1、实现 一个接口,重写 序列化 方法
  4. * 2、指定key,转成 字节数组
  5. * 3、指定value,转成 字节数组
  6. * 4、返回一个 ProducerRecord对象,把key、value放进去
  7. */
  8. // 初始化 KafkaSink 实例 (自定义 KafkaRecordSerializationSchema 实例)
  9. KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
  10. // TODO 必填项:配置 kafka 的地址和端口
  11. .setBootstrapServers("worker01:9092")
  12. // TODO 必填项:配置消息序列化器信息 Topic名称、消息序列化器类型
  13. .setRecordSerializer(
  14. new KafkaRecordSerializationSchema<String>() {
  15. @Nullable
  16. @Override
  17. public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {
  18. String[] datas = element.split(",");
  19. byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);
  20. byte[] value = element.getBytes(StandardCharsets.UTF_8);
  21. Long currTimestamp = System.currentTimeMillis();
  22. Integer partition = 0;
  23. return new ProducerRecord<>("20230913", partition, currTimestamp, key, value);
  24. }
  25. }
  26. )
  27. .build();

4、容错保证级别

KafkaSink 总共支持三种不同的语义保证(DeliveryGuarantee

  • DeliveryGuarantee.NONE   不提供任何保证
    • 消息有可能会因 Kafka broker 的原因发生丢失或因 Flink 的故障发生重复
  • DeliveryGuarantee.AT_LEAST_ONCE  至少一次
    • sink 在 checkpoint 时会等待 Kafka 缓冲区中的数据全部被 Kafka producer 确认。
    • 消息不会因 Kafka broker 端发生的事件而丢失,但可能会在 Flink 重启时重复,因为 Flink 会重新处理旧数据。
  • DeliveryGuarantee.EXACTLY_ONCE 精确一次
    • 该模式下,Kafka sink 会将所有数据通过在 checkpoint 时提交的事务写入。
    • 因此,如果 consumer 只读取已提交的数据(参见 Kafka consumer 配置 isolation.level),在 Flink 发生重启时不会发生数据重复。
    • 然而这会使数据在 checkpoint 完成时才会可见,因此请按需调整 checkpoint 的间隔。
    • 请确认事务 ID 的前缀(transactionIdPrefix)对不同的应用是唯一的,以保证不同作业的事务 不会互相影响!此外,强烈建议将 Kafka 的事务超时时间调整至远大于 checkpoint 最大间隔 + 最大重启时间,否则 Kafka 对未提交事务的过期处理会导致数据丢失。

4.1 至少一次 的配置

  1. DataStream<String> stream = ...;
  2. // 初始化 KafkaSink 实例
  3. KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
  4. // TODO 必填项:配置 kafka 的地址和端口
  5. .setBootstrapServers("worker01:9092")
  6. // TODO 必填项:配置消息序列化器信息 Topic名称、消息序列化器类型
  7. .setRecordSerializer(
  8. KafkaRecordSerializationSchema.<String>builder()
  9. .setTopic("20230912")
  10. .setValueSerializationSchema(new SimpleStringSchema())
  11. .build()
  12. )
  13. // TODO 必填项:配置容灾保证级别设置为 至少一次
  14. .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
  15. .build();
  16. stream.sinkTo(sink);

4.2 精确一次 的配置

  1. // 如果是精准一次,必须开启checkpoint
  2. env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
  3. DataStream<String> stream = ...;
  4. KafkaSink<String> sink = KafkaSink.<String>builder() // 泛型为 输入输入的类型
  5. // TODO 必填项:配置 kafka 的地址和端口
  6. .setBootstrapServers(brokers)
  7. // TODO 必填项:配置消息序列化器信息 Topic名称、消息序列化器类型
  8. .setRecordSerializer(KafkaRecordSerializationSchema.builder()
  9. .setTopic("topic-name")
  10. .setValueSerializationSchema(new SimpleStringSchema())
  11. .build()
  12. )
  13. // TODO 必填项:配置容灾保证级别设置为 精准一次
  14. .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
  15. // 如果是精准一次,必须设置 事务的前缀
  16. .setTransactionalIdPrefix("flink-")
  17. // 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟
  18. .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "6000")
  19. .build();
  20. stream.sinkTo(sink);

5、这是一个完整的入门案例

需求:Flink实时读取 socket数据源,将读取到的数据写入到Kafka (要保证不丢失,不重复)

开发语言:java1.8

flink版本:flink1.17.0

  1. package com.baidu.datastream.sink;
  2. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  3. import org.apache.flink.connector.base.DeliveryGuarantee;
  4. import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
  5. import org.apache.flink.connector.kafka.sink.KafkaSink;
  6. import org.apache.flink.streaming.api.CheckpointingMode;
  7. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  8. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  9. import org.apache.kafka.clients.producer.ProducerConfig;
  10. // TODO flink 数据输出到kafka
  11. public class SinkKafka {
  12. public static void main(String[] args) throws Exception {
  13. // 1.获取执行环境
  14. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  15. env.setParallelism(2);
  16. // 如果是精准一次,必须开启checkpoint
  17. env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
  18. // 2.指定数据源
  19. DataStreamSource<String> streamSource = env.socketTextStream("localhost", 9999);
  20. // 3.初始化 KafkaSink 实例
  21. KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
  22. // TODO 必填项:配置 kafka 的地址和端口
  23. .setBootstrapServers("worker01:9092")
  24. // TODO 必填项:配置消息序列化器信息 Topic名称、消息序列化器类型
  25. .setRecordSerializer(
  26. KafkaRecordSerializationSchema.<String>builder()
  27. .setTopic("20230912")
  28. .setValueSerializationSchema(new SimpleStringSchema())
  29. .build()
  30. )
  31. // TODO 必填项:配置容灾保证级别设置为 精准一次
  32. .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
  33. // 如果是精准一次,必须设置 事务的前缀
  34. .setTransactionalIdPrefix("flink-")
  35. // 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟
  36. .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "6000")
  37. .build();
  38. streamSource.sinkTo(kafkaSink);
  39. // 3.触发程序执行
  40. env.execute();
  41. }
  42. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/493545
推荐阅读
相关标签
  

闽ICP备14008679号