赞
踩
拦截器 : 在不修改业务逻辑下,动态实现一组可插拔的事件处理逻辑链
Kafka 拦截器能在消息处理的前后多个时点动态植入不同的处理逻辑
Kafka 拦截器分 :
指定 Producer 拦截器:
List<String> interceptors = new ArrayList<>();
interceptors.add("com.cpucode.kafkaproject.interceptors.AddTimestampInterceptor"); // 拦截器 1
interceptors.add("com.cpucode.kafkaproject.interceptors.UpdateCounterInterceptor"); // 拦截器 2
Properties props = new Properties();
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
所有 Producer 拦截器的实现类都要继承 org.apache.kafka.clients.producer.ProducerInterceptor
接口
消费者拦截器的实现类都要继承org.apache.kafka.clients.consumer.ConsumerInterceptor
接口
Kafka 拦截器可用于:客户端监控、端到端系统性能检测、消息审计
用拦截器统计消息端到端处理的延时
实现生产者拦截器:
public class AvgLatencyProducerInterceptor implements ProducerInterceptor<String, String> { private Jedis jedis; // 省略 Jedis 初始化 @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { // 更新总发送消息数 jedis.incr("totalSentMessage"); return record; } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { } @Override public void close() { } @Override public void configure(Map<java.lang.String, ?> configs) { } }
消费者的拦截器实现:
public class AvgLatencyConsumerInterceptor implements ConsumerInterceptor<String, String> { private Jedis jedis; // 省略 Jedis 初始化 @Override public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) { long lantency = 0L; // 用当前的时钟时间 - 在消息中的创建时间,累计这批消息时间 for (ConsumerRecord<String, String> record : records) { lantency += (System.currentTimeMillis() - record.timestamp()); } // 总的端到端处理延时并更新到 Redis jedis.incrBy("totalLatency", lantency); long totalLatency = Long.parseLong(jedis.get("totalLatency")); long totalSentMsgs = Long.parseLong(jedis.get("totalSentMessage")); // 总延时/总消息数 = 端到端消息的平均处理延时 jedis.set("avgLatency", String.valueOf(totalLatency / totalSentMsgs)); return records; } @Override public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) { } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。