当前位置:   article > 正文

Flink消费kafka获取kafka消息的offset_flink 获取当前offset

flink 获取当前offset

虽然Flink消费kafka有着完善的checkpoint机制,可以使得程序停止后再次能从上一次的消费位点继续消费,但是有时候flink的checkpoint也会失败,或者checkpoint管理起来不够灵活,我们想自己维护kafka 的offset信息。
但是Flink封装的FlinkKafkaConsumer并不能直接的获取kafka 消息的offset
现在有两种实现方法,原理都是一样的,第二种就是知道这里可以改就行了,真正使用的时候还是第一种。

原理:将kafka消息的offset和partition信息整合到kafka消息中。

第一种最简单:
自定义MyKafkaDeserializationSchema,实现KafkaDeserializationSchema接口即可:
这里因为我的kafka消息已经是json串了,所以我把消息的offset 和 partition 信息直接插入到json里了。
如果 kafka中消息不是json串,那就可以自己组织数据结构,将 offset 和 partition 信息 插入到value信息中。

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.nio.charset.Charset;

public class MyKafkaDeserializationSchema implements KafkaDeserializationSchema<String> {

    public static final Charset UTF_8 = Charset.forName("UTF-8");

    @Override
    public boolean isEndOfStream(String s) {
        return false;
    }

    @Override
    public String deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {
        String value = new String(consumerRecord.value(), UTF_8.name());
        long offset = consumerRecord.offset();
        int partition = consumerRecord.partition();
        JSONObject jsonObject = JSONObject.parseObject(value);
        jsonObject.put("offset",offset);
        jsonObject.put("partition",partition);
        return jsonObject.toString();
    }

    @Override
    public TypeInformation<String> getProducedType() {
        return null;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

使用的时候:

 FlinkKafkaConsumer<String> consumer=new FlinkKafkaConsumer<>(topic,new MyKafkaDeserializationSchema(),KafkaProperties );
  • 1

第二种比较复杂:修改源码,知道有这个方法就行了

修改org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcherr#runFetchLoop方法中的代码:

使用默认的SimpleStringSchema即可。

 FlinkKafkaConsumer<String> consumer=new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), KafkaProperties);
  • 1

这样操作完后,flink每消费一条数据,都可以及时获取到该消息的partition和offset信息,再结合事务,成功处理一条数据就自己更新一条offset信息,就能做到严格的exactly-once。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/381378
推荐阅读
相关标签
  

闽ICP备14008679号