当前位置:   article > 正文

Flink实现kafka到kafka、kafka到doris的精准一次消费_flink kafka doris

flink kafka doris

起始消费位点:

scan.startup.mode 配置项决定了 Kafka consumer 的启动模式。有效值为:

  • group-offsets:从 Zookeeper/Kafka 中某个指定的消费组已提交的偏移量开始。
  • earliest-offset:从可能的最早偏移量开始。
  • latest-offset:从最末尾偏移量开始。
  • timestamp:从用户为每个 partition 指定的时间戳开始。
  • specific-offsets:从用户为每个 partition 指定的偏移量开始。

一致性保证:

默认情况下,如果查询在 启用 checkpoint 模式下执行时,Kafka sink 按照至少一次(at-lease-once)语义保证将数据写入到 Kafka topic 中。

当 Flink checkpoint 启用时,kafka 连接器可以提供精确一次(exactly-once)的语义保证。

除了启用 Flink checkpoint,还可以通过传入对应的 sink.semantic 选项来选择三种不同的运行模式:

  • none:Flink 不保证任何语义。已经写出的记录可能会丢失或重复。
  • at-least-once (默认设置):保证没有记录会丢失(但可能会重复)。
  • exactly-once:使用 Kafka 事务提供精确一次(exactly-once)语义。当使用事务向 Kafka 写入数据时,请将所有从 Kafka 中消费记录的应用中的 isolation.level 配置项设置成实际所需的值(read_committedread_uncommitted,后者为默认值)。

properties.auto.offset.reset:

properties.auto.offset.reset 是 Kafka 消费者配置中的一个重要参数,用于指定消费者在以下情况下如何处理偏移量(offset):

  1. earliest:

    • 如果消费者在找不到先前偏移量(或者该偏移量无效)的情况下开始消费,它将从最早的可用消息开始消费。这意味着消费者会从主题的起始处开始读取消息。
  2. latest:

    • 如果消费者在找不到先前偏移量(或者该偏移量无效)的情况下开始消费,它将从最新的消息开始消费。这意味着消费者只会消费自它启动后发布到主题的消息。

1 流程图

2 Flink来源表建模

  1. --来源-城市topic
  2. CREATE TABLE NJ_QL_JC_SSJC_SOURCE (
  3. record string,
  4. city_partition_id bigint METADATA FROM 'partition' VIRTUAL,
  5. city_offset_id bigint METADATA FROM 'offset' VIRTUAL,
  6. city_ts timestamp METADATA FROM 'timestamp'
  7. ) WITH (
  8. 'connector' = 'kafka',
  9. 'topic' = 'QL_JC_SSJC',
  10. 'properties.bootstrap.servers' = '172.*.*.*:9092',
  11. 'properties.group.id' = 'QL_JC_SSJC_NJ_QL_JC_SSJC_SOURCE',
  12. 'scan.startup.mode' = 'group-offsets',-- Kafka consumer 的启动模式
  13. 'properties.isolation.level' = 'read_committed',
  14. 'properties.auto.offset.reset' = 'earliest',
  15. 'format' = 'raw'
  16. );
  17. --来源-中台kafka-topic
  18. CREATE TABLE ODS_QL_JC_SSJC_SOURCE (
  19. sscsdm string,
  20. extract_time TIMESTAMP,
  21. record string,
  22. city_partition_id bigint,
  23. city_offset_id bigint ,
  24. city_ts timestamp ,
  25. hw_partition_id bigint METADATA FROM 'partition' VIRTUAL,
  26. hw_offset_id bigint METADATA FROM 'offset' VIRTUAL,
  27. hw_ts timestamp METADATA FROM 'timestamp'
  28. ) WITH (
  29. 'connector' = 'kafka',
  30. 'topic' = 'ODS_QL_JC_SSJC',
  31. 'properties.bootstrap.servers' = '172.*.*.*:21007,172.*.*.*:21007,172.*.*.*:21007',
  32. 'properties.security.protocol' = 'SASL_PLAINTEXT',
  33. 'properties.sasl.kerberos.service.name' = 'kafka',
  34. 'properties.kerberos.domain.name' = 'hadoop.hadoop.com',
  35. 'properties.group.id' = 'ODS_QL_JC_SSJC_SOURCE_ODS_QL_JC_SSJC_SOURCE',
  36. 'scan.startup.mode' = 'group-offsets',
  37. 'properties.auto.offset.reset' = 'earliest',
  38. 'properties.isolation.level' = 'read_committed',
  39. 'format' = 'json'
  40. );

3 Flink去向表建模

  1. --去向-中台kafka-topic
  2. CREATE TABLE KAFKA_ODS_QL_JC_SSJC_SINK (
  3. sscsdm string,
  4. extract_time TIMESTAMP,
  5. record string,
  6. city_partition_id bigint,
  7. city_offset_id bigint ,
  8. city_ts timestamp
  9. ) WITH (
  10. 'connector' = 'kafka',
  11. 'topic' = 'ODS_QL_JC_SSJC',
  12. 'properties.bootstrap.servers' = '172.*.*.*:21007,172.*.*.*:21007,172.*.*.*:21007',
  13. 'properties.security.protocol' = 'SASL_PLAINTEXT',
  14. 'properties.sasl.kerberos.service.name' = 'kafka',
  15. 'properties.kerberos.domain.name' = 'hadoop.hadoop.com',
  16. 'format' = 'json',
  17. 'properties.transaction.timeout.ms' = '900000',
  18. 'sink.semantic'='exactly-once'
  19. );
  20. --去向-Doris表
  21. CREATE TABLE DORIS_ODS_QL_JC_SSJC_SINK (
  22. sscsdm STRING,
  23. extract_time TIMESTAMP(9),
  24. record STRING,
  25. city_partition_id BIGINT,
  26. city_offset_id BIGINT,
  27. city_ts TIMESTAMP(9),
  28. hw_partition_id BIGINT,
  29. hw_offset_id BIGINT,
  30. hw_ts TIMESTAMP(9)
  31. ) WITH (
  32. 'connector' = 'doris',
  33. 'fenodes' = '3.*.*.*:8030,3.*.*.*:8030,3.*.*.*:8030',
  34. 'table.identifier' = 'doris_d.ods_ql_jc_ssjc',
  35. 'username' = 'root',
  36. 'password' = '********',
  37. 'sink.properties.two_phase_commit' = 'true',
  38. 'sink.properties.read_json_by_line'='true',
  39. 'sink.properties.strip_outer_array'='true'
  40. );

4 城市Topic至中台Topic的Flinksql

  1. insert into
  2. KAFKA_ODS_QL_JC_SSJC_SINK
  3. SELECT
  4. '320100' as sscsdm,
  5. CURRENT_TIMESTAMP as extract_time,
  6. record
  7. FROM
  8. NJ_QL_JC_SSJC_SOURCE
  9. UNION ALL
  10. SELECT
  11. '320200' as sscsdm,
  12. CURRENT_TIMESTAMP as extract_time,
  13. record
  14. FROM
  15. WX_QL_JC_SSJC_SOURCE
  16. .
  17. .
  18. .
  19. UNION ALL
  20. SELECT
  21. '320583' as sscsdm,
  22. CURRENT_TIMESTAMP as extract_time,
  23. record
  24. FROM
  25. KS_QL_JC_SSJC_SOURCE

5 中台Topic至Doris的Flinksql

  1. insert into DORIS_ODS_QL_JC_SSJC_SINK
  2. SELECT
  3. sscsdm,
  4. CURRENT_TIMESTAMP as extract_time,
  5. record
  6. FROM
  7. ODS_QL_JC_SSJC_SOURCE

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/喵喵爱编程/article/detail/888234
推荐阅读
相关标签
  

闽ICP备14008679号