当前位置:   article > 正文

多图详解 kafka 生产者消息发送过程_kafka生产者发送消息

kafka生产者发送消息

生产者客户端代码

public class SzzTestSend {
  
    public static final String bootStrap = "xxxxxx:9090";    public static final String topic = "t_3_1";
    public static void main(String[] args) {
          Properties properties = new Properties();        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootStrap);        // 序列化协议  下面两种写法都可以        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");        //过滤器 可配置多个用逗号隔开        properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"org.apache.kafka.clients.producer.SzzProducerInterceptorsTest");        //构造 KafkaProducer        KafkaProducer producer = new KafkaProducer(properties);        //  发送消息, 并设置 回调(回调函数也可以不要)        ProducerRecord<String,String> record = new ProducerRecord(topic,"Hello World!");        try {
              producer.send(record,new SzzTestCallBack(record.topic(), record.key(), record.value()));        }catch (Exception e){
              e.printStackTrace();        }    }
    /**     * 发送成功回调类     */    public static class SzzTestCallBack implements Callback{
          private static final Logger log = LoggerFactory.getLogger(SzzTestCallBack.class);        private String topic;        private String key;        private String value;
        public SzzTestCallBack(String topic, String key, String value) {
              this.topic = topic;            this.key = key;            this.value = value;
        }        public void onCompletion(RecordMetadata metadata, Exception e) {
              if (e != null) {
                  log.error("Error when sending message to topic {} with key: {}, value: {} with error:",                        topic, key,value, e);            }else {
                  log.info("send message to topic {} with key: {} value:{} success, partiton:{} offset:{}",                        topic, key,value,metadata.partition(),metadata.offset());            }        }    }}

1 构造 KafkaProducer

KafkaProducer 通过解析producer.propeties文件里面的属性来构造自己。例如 :分区器、Key 和 Value 序列化器、拦截器RecordAccumulator消息累加器元信息更新器、启动发送请求的后台线程

        //构造 KafkaProducer        KafkaProducer producer = new KafkaProducer(properties);

生产者元信息更新器

我们之前有讲过. 客户端都会保存集群的元信息,例如生产者的元信息是 ProducerMetadata. 消费组的是 ConsumerMetadata 。

 

相关的 Producer 配置有:

虽然 Producer 元信息会自动更新, 但是有可能在生产者发送消息的时候,发现某个 TopicPartition 不存在,这个时候可能就需要立刻发起一个元信息更新了。

集群资源变更监听器

org.apache.kafka.common.ClusterResourceListener

在构造 KafkaConsumer 的时候, 还会构造一个 集群资源变更监听器 ClusterResourceListener

当用户希望收到有关集群元数据更改的通知时,可以实现回调接口。

需要在拦截器指标采样器序列化器反序列化器 中访问集群元数据的用户可以实现此接口。

public interface ClusterResourceListener {
      /**     * 用户可以实现以获取 ClusterResource 更新的回调方法。     * @param clusterResource cluster metadata     */    void onUpdate(ClusterResource clusterResource);}

下面描述了每种类型的方法调用顺序。

Clients

在每个元数据响应之后都会调用一次 onUpdate(ClusterResource)

当在org.apache.kafka.clients.producer.ProducerInterceptor实现的 ClusterResourceListener 的时候

调用顺序为: ProducerInterceptor.onSend() -> onUpdate(ClusterResource) -> ProducerInterceptor.onAcknowledgement()

当在org.apache.kafka.clients.consumer.ConsumerInterceptor实现的 ClusterResourceListener 的时候

调用顺序为:onUpdate() - > ConsumerInterce

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/645144
推荐阅读
相关标签
  

闽ICP备14008679号