当前位置:   article > 正文

kafka 收发消息_kafka发送和接收消息

kafka发送和接收消息

  1. maven依赖

  1. <!--导入kafka客户端依赖-->
  2. <dependency>
  3. <groupId>org.apache.kafka</groupId>
  4. <artifactId>kafka-clients</artifactId>
  5. <version>2.4.1</version>
  6. </dependency>

注意:序列化&反序列化,可参考使用第三方库,本例只用基础的数据类型作为k/v

  1. 生产者

  1. // 公共参数
  2. private static String bootServer = "kafka1:9092,kafka2:9092,kafka3:9092";
  3. private static String TOPIC_NAME = "kafkaQQRun";
  4. private static String CONSUMER_GROUP_NAME = "consumer1";
  5. public void log(String fmt, Object... objs) {
  6. if (objs != null && objs.length > 0) {
  7. String s = String.format(fmt, objs);
  8. System.out.println(s);
  9. } else {
  10. System.out.println(fmt);
  11. }
  12. }
  13. @Test
  14. public void producer1() throws ExecutionException, InterruptedException {
  15. // 设置参数
  16. Properties props = new Properties();
  17. // 指定服务器配置【ip:端口】
  18. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootServer);
  19. // key/value 序列化
  20. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
  21. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  22. // 创建生产消息的客户端,传入参数
  23. Producer<Long, String> producer = new KafkaProducer<Long, String>(props);
  24. for (int i = 0; i < 100; i++) {
  25. // 创建消息;key:作用是决定了往哪个分区上发,value:具体要发送的消息内容
  26. ProducerRecord<Long, String> message = new ProducerRecord<>(
  27. TOPIC_NAME, (long) i, "helloKafka");
  28. // 同步发送消息
  29. RecordMetadata metadata = producer.send(message).get();
  30. log("send sync:topic:%s, partition:%d, key:%d, value:%s",
  31. metadata.topic(), metadata.partition(), metadata.offset(),
  32. message.key(), message.value());
  33. }
  34. for (int i = 0; i < 10000; i++) {
  35. ProducerRecord<Long, String> message = new ProducerRecord<>(TOPIC_NAME, 10000000l + i, "v2");
  36. producer.send(message, new Callback() {
  37. @Override
  38. public void onCompletion(RecordMetadata recordMetadata, Exception e) {
  39. log("send async complete , topic:%s, partition:%d, offset:%d, key:%d, value:%s",
  40. recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(),
  41. message.key(), message.value());
  42. }
  43. });
  44. }
  45. Thread.currentThread().join();
  46. }

3. 消费者

  1. @Test
  2. public void consumer1() {
  3. // 设置配置
  4. Properties props = new Properties();
  5. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootServer);
  6. // 消费分组名
  7. props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
  8. // key/value反序列化
  9. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
  10. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  11. // 是否自动提交offset, default:true
  12. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
  13. // 创建一个消费者
  14. KafkaConsumer<Long, String> consumer = new KafkaConsumer<Long, String>(props);
  15. // 消费者订阅主题
  16. consumer.subscribe(Arrays.asList(TOPIC_NAME));
  17. while (true) {
  18. // poll() 长轮询拉取消息的
  19. ConsumerRecords<Long, String> messages = consumer.poll(Duration.ofMillis(1000));
  20. for (ConsumerRecord<Long, String> r : messages) {
  21. log("receive:partition:%d, offset:%d, key:%d, value:%s", r.partition(), r.offset(), r.key(), r.value());
  22. }
  23. int count = messages.count();
  24. // 阻塞手动提交
  25. if (count > 0) {
  26. // consumer.commitAsync();
  27. // log("commitAsync finish records:%d", count);
  28. // 异步提交
  29. consumer.commitAsync(new OffsetCommitCallback() {
  30. @Override
  31. public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
  32. for (TopicPartition p : offsets.keySet()) {
  33. OffsetAndMetadata d = offsets.get(p);
  34. log("commitAsync complete, topic:%s, partition:%d, offset:%d", p.topic(), p.partition(), d.offset());
  35. }
  36. log("commitAsync complete records:%d", count);
  37. }
  38. });
  39. }
  40. }
  41. }

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号