当前位置:   article > 正文

6 ,kafka 拦截器,拦截器链,案例_kafka消息拦截追踪

kafka消息拦截追踪

一 ,理论 :

1 ,哪个版本开始 :

kafka1.0

2 ,拦截的位置 :

生产者

3 ,拦截器数量 :

可以有很多拦截器,形成一个拦截器链

4 ,方法介绍 :

  1. configure : 初始化 ( 调用一次 )
    获取配置信息和初始化数据时调用。
  2. onSend :前处理,可以对消息进行恩和操作 ( 每条消息,都走一次这个方法 )
    该方法封装进KafkaProducer.send方法中,即它运行在用户主线程中。Producer确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算
  3. onAcknowledgement :后处理 ( 每条消息都调用 )
    该方法会在消息被应答或消息发送失败时调用,并且通常都是在producer回调逻辑触发之前。onAcknowledgement 运行在 producer 的 IO 线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢 producer 的消息发送效率。
  4. close :释放资源 ( 调用一次 )
    关闭interceptor,主要用于执行一些资源清理工作
    如前所述,interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个interceptor,则producer将按照指定顺序调用它们,并仅仅是捕获每个interceptor可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。

二 ,案例 :

1 ,需求 :

  1. 实现一个简单的双 interceptor 组成的拦截链。
  2. 第一个 interceptor 会在消息发送前将时间戳信息加到消息 value 的最前部;
  3. 第二个 interceptor 会在消息发送后更新成功发送消息数或失败发送消息数。

2 ,效果预览 :

在这里插入图片描述

3 ,时间戳拦截器 :

4 ,失败或者成功的计数器 :

5 ,生产者主程序 :

6 ,测试 :

  1. 在 node01 启动 kafka 的消费者 :
cd /export/servers/kafka_2.11-0.11.0.0
bin/kafka-console-consumer.sh --zookeeper node01:2181 --from-beginning --topic first
  • 1
  • 2
  1. 在 idea 启动生产者 :

在这里插入图片描述

7 ,观察效果 :

  1. kafka 消费端,看到消息前缀,说明 : 时间前缀好用了
    在这里插入图片描述
  2. idea 控制台看到途中那些 : 说明计数器好用了
    在这里插入图片描述

8 ,得出结论 :

  1. 拦截器好用 : kafka 拦截器,确实可以给消息做处理。
  2. 可以做 : 前处理,后处理。
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/471615
推荐阅读
相关标签
  

闽ICP备14008679号