当前位置:   article > 正文

Kafka基础-生产者发送消息_kafka发消息

kafka发消息

无论你是使用Kafka作为队列,消息总线还是数据存储平台,你都会用到生产者,用于发送数据到Kafka。下文介绍如何使用Java来发送消息到Kafka。

1. 发送消息的主要步骤

  • 首先创建ProducerRecord对象,此对象除了包括需要发送的数据value之外还必须指定topic,另外也可以指定key和分区。当发送ProducerRecord的时候,生产者做的第一件事就是把key和value序列化为ByteArrays,以便它们可以通过网络发送。
  • 接下来,数据会被发送到分区器。如果在ProducerRecord中指定了一个分区,那么分区器会直接返回指定的分区;否则,分区器通常会基于ProducerRecord的key值计算出一个分区。一旦分区被确定,生产者就知道数据会被发送到哪个topic和分区。然后数据会被添加到同一批发送到相同topic和分区的数据里面,一个单独的线程会负责把那些批数据发送到对应的brokers。
  • 当broker接收到数据的时候,如果数据已被成功写入到Kafka,会返回一个包含topic、分区和偏移量offset的RecordMetadata对象;如果broker写入数据失败,会返回一个异常信息给生产者。当生产者接收到异常信息时会尝试重新发送数据,如果尝试失败则抛出异常。

2. 创建生产者

发送数据到Kafka的第一步是创建一个生产者,必须指定以下三个属性:

  • bootstrap.servers:生产者用于与Kafka集群建立初始连接的主机和端口的列表。该列表不需要包括所有的brokers信息,因为生产者在建立连接后能够获取所有brokers的信息。但建议至少包含两个,防止一个broker宕机,生产者仍然能够通过另外一个broker连接到群集。
  • key.serializer:用于序列化keys的类名。Kafka brokers期待key和value的类型为byte数组,但是也允许使用参数化的Java对象作为key和value。这使得代码非常易读,但也意味着生产者必须知道如何把这些对象转换为byte数组。key.serializer应设为实现了org.apache.kafka.common.serialization.Serializer接口的类名,生产者将会使用这个类来把key对象序列化为byte数组。Kafka内置实现了ByteArraySerializer、StringSerializer和IntegerSerializer。注意,即使生产者发送的数据没有指定key,也必须设置key.serializer这个属性。
  • value.serializer:用于序列化value的类名。类似于key.serializer,生产者将会使用指定的类来把value对象序列化为byte数组。

下面是创建生产者的代码示例:

  1. Properties kafkaProps = new Properties();
  2. kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");
  3. kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  4. kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  5. producer = new KafkaProducer<String, String>(kafkaProps);

3. 发送消息

发送消息主要有以下三种方法:

3.1 Fire-and-forget

发送消息后不需要关心是否发送成功。因为Kafka是高可用的,而且生产者会自动重新发送,所以大多数情况都会成功,但是有时也会失败。

下面是代码示例:

  1. ProducerRecord<String, String> record = new ProducerRecord<String, String>("CustomerCountry",
  2. "Precision Products", "France");
  3. try {
  4. producer.send(record);
  5. } catch (Exception e) {
  6. e.printStackTrace();
  7. }

ProducerRecord有多个构造器,这里使用了三个参数的,topi

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/707573
推荐阅读
相关标签
  

闽ICP备14008679号