当前位置:   article > 正文

Kafka:将消息发送指定分区_kafka发送数据指定partition

kafka发送数据指定partition
  1. import org.apache.kafka.clients.producer.*;
  2. import org.apache.kafka.common.serialization.StringSerializer;
  3. import java.util.Properties;
  4. public class CustomProducerCallbackPartitions {
  5. public static void main(String[] args) throws InterruptedException {
  6. // 0 配置
  7. Properties properties = new Properties();
  8. // 连接集群 bootstrap.servers
  9. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.25.129:9092,192.168.25.128:9092");
  10. // 指定对应的key和value的序列化类型 key.serializer
  11. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  12. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
  13. // 1 创建kafka生产者对象
  14. // KafkaProducer<K, V>
  15. // 泛型K为key一般为String类型 泛型V为传递消息的类型,此处发送字符串用String类型
  16. // 下列发送数据即为 “” “hello ”
  17. KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
  18. // 2 发送数据
  19. for (int i = 0; i < 5; i++) {
  20.             // 发送到1号分区
  21.             // 或者是依次指定 key 值为 a,b,f ,数据 key 的 hash 值与 3 个分区求余,分别发往 1、2、0
  22. kafkaProducer.send(new ProducerRecord<>("first", 1, "hello" + i), new Callback() {
  23. @Override
  24. public void onCompletion(RecordMetadata metadata, Exception exception) {
  25. if (exception == null){
  26. System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());
  27. }
  28. }
  29. });
  30. Thread.sleep(2);
  31. }
  32. // 3 关闭资源
  33. kafkaProducer.close();
  34. }
  35. }
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/空白诗007/article/detail/794040
推荐阅读
相关标签
  

闽ICP备14008679号