赞
踩
生产者客户端代码
public class SzzTestSend {
public static final String bootStrap = "xxxxxx:9090";
public static final String topic = "t_3_1";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootStrap);
// 序列化协议 下面两种写法都可以
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//过滤器 可配置多个用逗号隔开
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"org.apache.kafka.clients.producer.SzzProducerInterceptorsTest");
//构造 KafkaProducer
KafkaProducer producer = new KafkaProducer(properties);
// 发送消息, 并设置 回调(回调函数也可以不要)
ProducerRecord<String,String> record = new ProducerRecord(topic,"Hello World!");
try {
producer.send(record,new SzzTestCallBack(record.topic(), record.key(), record.value()));
}catch (Exception e){
e.printStackTrace();
}
}
/**
* 发送成功回调类
*/
public static class SzzTestCallBack implements Callback{
private static final Logger log = LoggerFactory.getLogger(SzzTestCallBack.class);
private String topic;
private String key;
private String value;
public SzzTestCallBack(String topic, String key, String value) {
this.topic = topic;
this.key = key;
this.value = value;
}
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
log.error("Error when sending message to topic {} with key: {}, value: {} with error:",
topic, key,value, e);
}else {
log.info("send message to topic {} with key: {} value:{} success, partiton:{} offset:{}",
topic, key,value,metadata.partition(),metadata.offset());
}
}
}
}
KafkaProducer 通过解析producer.propeties
文件里面的属性来构造自己。例如 :分区器、Key 和 Value 序列化器、拦截器、RecordAccumulator消息累加器 、元信息更新器、启动发送请求的后台线程
//构造 KafkaProducer
KafkaProducer producer = new KafkaProducer(properties);
我们之前有讲过. 客户端都会保存集群的元信息,例如生产者的元信息是 ProducerMetadata. 消费组的是 ConsumerMetadata 。
相关的 Producer 配置有:
虽然 Producer 元信息会自动更新, 但是有可能在生产者发送消息的时候,发现某个 TopicPartition 不存在,这个时候可能就需要立刻发起一个元信息更新了。
org.apache.kafka.common.ClusterResourceListener
在构造 KafkaConsumer 的时候, 还会构造一个 集群资源变更监听器 ClusterResourceListener
当用户希望收到有关集群元数据更改的通知时,可以实现回调接口。
需要在拦截器、指标采样器、序列化器和反序列化器 中访问集群元数据的用户可以实现此接口。
public interface ClusterResourceListener {
/**
* 用户可以实现以获取 ClusterResource 更新的回调方法。
* @param clusterResource cluster metadata
*/
void onUpdate(ClusterResource clusterResource);
}
下面描述了每种类型的方法调用顺序。
Clients
在每个元数据响应之后都会调用一次 onUpdate(ClusterResource)
当在org.apache.kafka.clients.producer.ProducerInterceptor
实现的 ClusterResourceListener 的时候
调用顺序为: ProducerInterceptor.onSend() -> onUpdate(ClusterResource) -> ProducerInterceptor.onAcknowledgement()
当在org.apache.kafka.clients.consumer.ConsumerInterceptor
实现的 ClusterResourceListener 的时候
调用顺序为:onUpdate() - > ConsumerInterce
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。