赞
踩
虽然Flink消费kafka有着完善的checkpoint机制,可以使得程序停止后再次能从上一次的消费位点继续消费,但是有时候flink的checkpoint也会失败,或者checkpoint管理起来不够灵活,我们想自己维护kafka 的offset信息。
但是Flink封装的FlinkKafkaConsumer
并不能直接的获取kafka 消息的offset
现在有两种实现方法,原理都是一样的,第二种就是知道这里可以改就行了,真正使用的时候还是第一种。
第一种最简单:
自定义MyKafkaDeserializationSchema
,实现KafkaDeserializationSchema
接口即可:
这里因为我的kafka消息已经是json串了,所以我把消息的offset 和 partition 信息直接插入到json里了。
如果 kafka中消息不是json串,那就可以自己组织数据结构,将 offset 和 partition 信息 插入到value信息中。
import com.alibaba.fastjson.JSONObject; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; import org.apache.kafka.clients.consumer.ConsumerRecord; import java.nio.charset.Charset; public class MyKafkaDeserializationSchema implements KafkaDeserializationSchema<String> { public static final Charset UTF_8 = Charset.forName("UTF-8"); @Override public boolean isEndOfStream(String s) { return false; } @Override public String deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception { String value = new String(consumerRecord.value(), UTF_8.name()); long offset = consumerRecord.offset(); int partition = consumerRecord.partition(); JSONObject jsonObject = JSONObject.parseObject(value); jsonObject.put("offset",offset); jsonObject.put("partition",partition); return jsonObject.toString(); } @Override public TypeInformation<String> getProducedType() { return null; } }
使用的时候:
FlinkKafkaConsumer<String> consumer=new FlinkKafkaConsumer<>(topic,new MyKafkaDeserializationSchema(),KafkaProperties );
第二种比较复杂:修改源码,知道有这个方法就行了
修改org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcherr#runFetchLoop
方法中的代码:
使用默认的SimpleStringSchema
即可。
FlinkKafkaConsumer<String> consumer=new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), KafkaProperties);
这样操作完后,flink每消费一条数据,都可以及时获取到该消息的partition和offset信息,再结合事务,成功处理一条数据就自己更新一条offset信息,就能做到严格的exactly-once。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。