当前位置:   article > 正文

Kafka生产者客户端详解_kafka client

kafka client

引言

  • 生产者(Producer)就是负责向Kafka发送消息的应用程序;
  • 消费者(Consumer)就是可以从Kafka订阅主题,并从订阅的主题中拉取消息的应用程序;
  • 对于Kafka来说,生产者和消费者都属于客户端

生产者 vs 消费者

线程安全
  • 生产者KakfaProducer是线程安全的,可以在多个线程中共享KafkaProducer实例,也可以将Kafka实例进行池化来供其他线程调用;
  • 消费者KafkaConsumer是非线程安全的;

1. 生产者Java客户端

使用Java开发生产者/消费者客户端需要先引入Pom依赖:

<!-- 使用Kafaka生产者/消费者客户端需要的依赖 -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.0.0</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 注意:kafka从2.0.0版本开始,不再支持JDK7及以下的版本!
生产者客户端代码:
/**
 * Java生产者客户端
 */
public class ProducerFastStart {
    public static final String brokerList = "localhost:9092";
    public static final String topic = "topic-learn";

    /**
     * 构建生产者客户端配置参数
     */
    public static Properties initConfig() {
        Properties proper = new Properties();
        proper.put("bootstrap.servers", brokerList);
        proper.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        proper.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        proper.put("client.id", "producer.client.id.demo");

        return proper;
    }

    public static void main(String[] args) {
        Properties proper = initConfig();
        // 创建一个生产者客户端实例
        KafkaProducer<String, String> producer = new KafkaProducer<>(proper);
        // 构建所需要发送的消息
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, "Hello,Kafka");
        
        // 发送消息
        producer.send(record);

        // 关闭生产者客户端实例
        producer.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • bootstrap.serverskey.serializervalue.serializer是生产者客户端连接kafak必备的参数;
  • 为了防止参数名记错,可以使用 ProductConfig的常量配置,每一个生产者客户端需要的参数都有一个常量名;
  • 序列化器要使用全类别(带包名),为了防止字符串写错,可以使用StringSerializer.class.getName(),其中getName()方法是获取全限定类名,getShortName()方法是只获取类名;
必备参数配置
  • bootstrap.servers:连接Kafka集群所需要的broker地址清单。可以设置一个或多个,中间以逗号隔开,默认值是"";
    • 注意:这里并非需要所有的broker地址,因为生产者会从给定的broker里查找其他broker的信息。不过建议至少要配置两个以上的broker地址信
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/142440
推荐阅读
相关标签
  

闽ICP备14008679号