赞
踩
写在前面:我是「且听风吟」,目前是某上市游戏公司的大数据开发工程师,热爱大数据开源技术,喜欢分享自己的所学所悟,现阶段正在从头梳理大数据体系的知识,以后将会把时间重点放在Spark和Flink上面。
如果你也对大数据感兴趣,希望在这个行业一展拳脚。欢迎关注我,我们一起学习。博客地址:https://ropledata.blog.csdn.net
博客的名字来源于:且听风吟,静待花开。也符合我对技术的看法,想要真正掌握一门技术就需要厚积薄发的毅力,同时保持乐观的心态。
你只管努力,剩下的交给时间!
前面两篇文章,我们对kafka的生产者和消费者进行了java实战。通过逻辑分析辅以代码实战,更加深刻的掌握了kafka同步发送消息和异步发送消息以及手动提交offset和自动提交offset的实现。
其实在实际的生产过程中,光有消息的发送和消费是不够的,我们还需要掌握生产者在发送消息时候的拦截器操作。那么本文咱们就详解kafka的拦截器,并根据案例实现两个常见的自定义的拦截器,并将它们组成拦截链。
注意:我所使用的kafka版本为2.4.1,java版本为1.8,本文会对一些新老版本的改动地方加以说明,同时本文的拦截器demo也会开源分享到csdn和github。
kafka的拦截器指的是Producer拦截器(interceptor),主要是为了实现clients端的自定义化逻辑控制,是在Kafka 0.10版本被引入的。
想要实现拦截器,我们需要先实现ProducerInterceptor接口org.apache.kafka.clients.producer.ProducerInterceptor
,关于ProducerInterceptor接口,我们需要掌握如下这些方法:
configure(configs
)方法:用于获取配置信息和初始化数据。
onSend(ProducerRecord)
方法:该方法封装进KafkaProducer.send方法中,即它运行在用户的主线程(main线程)中。Producer确保在消息被序列化以及计算分区前调用该方法。注意:用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算。
onAcknowledgement(RecordMetadata, Exception)
方法:该方法会在消息从RecordAccumulator成功发送到Kafka Broker之后,或者在发送过程中失败时调用。并且通常都是在producer回调逻辑触发之前。注意:onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很复杂的逻辑,否则会拖慢producer的消息发送效率。
close
方法:可以关闭拦截器,主要用于执行一些资源清理工作。
有几点是使用拦截器的时候需要特别注意的:
见到onsend方法就要注意topic和分区,一般不进行修改;
见到onAcknowledgement方法就要注意里面的逻辑不要太复杂,以免影响消息发送效率。
由于拦截器可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外如果指定了多个拦截器,则producer将按照指定顺序调用它们,并仅仅是捕获每个interceptor可能抛出的异常记录到错误日志中而不是向上传递,大家在使用过程中要特别留意。
现在有一个需求:想要用拦截器实现producer发送数据后,这部分数据都带上时间戳,并且在producer发送完成后,统计到有多少数据发送成功了,有多少数据发送失败了。
需求解析:
这个需求主要考察对拦截器的基础掌握程度,比如给数据加上时间戳,这个功能可以在拦截器里的onSend(ProducerRecord)
方法中实现;统计消息发送的成功和失败次数,这个可以在拦截器里的onAcknowledgement(RecordMetadata, Exception)
方法里来实现。
需求实现:
通过上面的需求解析,我们知道这个需求可以用一个拦截器来实现,但是为了展示拦截器链的关联使用,这里我们给它分成两个拦截器来实现。拦截器1实现增加时间戳的功能,拦截器2实现消息发送情况统计次数的功能。
大致如下图所示:
想要把发送的数据都带上时间戳,其实很简单,我们只需要实现 ProducerInterceptor接口,然后按照我们的需求重写其中的onSend方法就可以了,别的方法不用动。
完整代码如下:
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。