当前位置:   article > 正文

Kafka整合java代码实现生产者与消费者_kafka消费者代码

kafka消费者代码

1.引入pom依赖,Kafka和阿里的JSON

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>2.8.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>com.alibaba</groupId>
  8. <artifactId>fastjson</artifactId>
  9. <version>1.2.41</version>
  10. </dependency>

2.实现kafka的生产者

2.1异步发送kafka生产者代码实现

  1. package com.ztesoft.kafka;
  2. import java.util.HashMap;
  3. import java.util.Map;
  4. import java.util.Properties;
  5. import org.apache.kafka.clients.producer.Callback;
  6. import org.apache.kafka.clients.producer.KafkaProducer;
  7. import org.apache.kafka.clients.producer.Producer;
  8. import org.apache.kafka.clients.producer.ProducerConfig;
  9. import org.apache.kafka.clients.producer.ProducerRecord;
  10. import org.apache.kafka.clients.producer.RecordMetadata;
  11. import com.alibaba.fastjson.JSON;
  12. /**
  13. * kafka生产者代码
  14. *
  15. */
  16. public class MyKafkaProducer {
  17. public static void main(String[] args) throws InterruptedException {
  18. Properties props = new Properties();
  19. // kafka集群,broker-list
  20. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.21.72.166:9092");
  21. props.put(ProducerConfig.ACKS_CONFIG, "all");
  22. // 重试次数
  23. props.put("retries", 1);
  24. // 批次大小
  25. props.put("batch.size", 16384);
  26. // 等待时间
  27. props.put("linger.ms", 1);
  28. // RecordAccumulator缓冲区大小
  29. props.put("buffer.memory", 33554432);
  30. // 设置序列化
  31. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  32. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  33. Producer<String, String> producer = new KafkaProducer<String, String>(props);
  34. for (int i = 100; i < 200; i++) {
  35. Map<String, Object> recordMap = new HashMap<String, Object>(20);
  36. recordMap.put("seq", i);
  37. recordMap.put("name", "测试" + i);
  38. recordMap.put("age", i % 20);
  39. ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("user_topic",
  40. String.valueOf(i), JSON.toJSONString(recordMap));
  41. producer.send(producerRecord, new Callback() {
  42. // 回调函数,该方法会在Producer收到ack时调用,为异步调用
  43. public void onCompletion(RecordMetadata metadata, Exception e) {
  44. String topic = metadata.topic();
  45. int partition = metadata.partition();
  46. long offset = metadata.offset();
  47. if (e != null) {
  48. System.out.printf("消息发送失败:topic=%s,offset=%d,key=%d,error=%s%n", topic, partition, offset,
  49. e.getMessage());
  50. } else {
  51. System.out.printf("消息发送成功:topic=%s,offset=%d,key=%d,value=%s%n", topic, partition, offset);
  52. }
  53. }
  54. });
  55. Thread.sleep(2 * 1000);
  56. }
  57. producer.close();
  58. }
  59. }

2.2同步发送kafka生产者代码实现

  1. package com.ztesoft.kafka;
  2. import java.util.HashMap;
  3. import java.util.Map;
  4. import java.util.Properties;
  5. import java.util.concurrent.ExecutionException;
  6. import org.apache.kafka.clients.producer.Callback;
  7. import org.apache.kafka.clients.producer.KafkaProducer;
  8. import org.apache.kafka.clients.producer.Producer;
  9. import org.apache.kafka.clients.producer.ProducerConfig;
  10. import org.apache.kafka.clients.producer.ProducerRecord;
  11. import org.apache.kafka.clients.producer.RecordMetadata;
  12. import com.alibaba.fastjson.JSON;
  13. /**
  14. * kafka生产者代码
  15. *
  16. */
  17. public class MyKafkaProducer {
  18. public static void main(String[] args) throws InterruptedException, ExecutionException {
  19. Properties props = new Properties();
  20. // kafka集群,broker-list
  21. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.21.72.166:9092");
  22. props.put(ProducerConfig.ACKS_CONFIG, "all");
  23. // 重试次数
  24. props.put("retries", 1);
  25. // 批次大小
  26. props.put("batch.size", 16384);
  27. // 等待时间
  28. props.put("linger.ms", 1);
  29. // RecordAccumulator缓冲区大小
  30. props.put("buffer.memory", 33554432);
  31. // 设置序列化
  32. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  33. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  34. Producer<String, String> producer = new KafkaProducer<String, String>(props);
  35. for (int i = 100; i < 200; i++) {
  36. Map<String, Object> recordMap = new HashMap<String, Object>(20);
  37. recordMap.put("seq", i);
  38. recordMap.put("name", "测试" + i);
  39. recordMap.put("age", i % 20);
  40. ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("user_topic",
  41. String.valueOf(i), JSON.toJSONString(recordMap));
  42. //调用get方法为同步发送返回
  43. producer.send(producerRecord).get();
  44. Thread.sleep(2 * 1000);
  45. }
  46. producer.close();
  47. }
  48. }

3.实现Kafka的消费者

3.1实现kafka消费的自动提交offset

  1. package com.ztesoft.kafka;
  2. import java.util.Arrays;
  3. import java.util.Properties;
  4. import org.apache.kafka.clients.consumer.ConsumerRecord;
  5. import org.apache.kafka.clients.consumer.ConsumerRecords;
  6. import org.apache.kafka.clients.consumer.KafkaConsumer;
  7. public class MyKafkaConsumer {
  8. public static void main(String[] args) {
  9. Properties props = new Properties();
  10. // kafka地址
  11. props.put("bootstrap.servers", "172.21.72.166:9092");
  12. // 设置消费组
  13. props.put("group.id", "bigdata");
  14. // 是否自动提交
  15. props.put("enable.auto.commit", "true");
  16. // 设置自动提交时间隔
  17. props.put("auto.commit.interval.ms", "1000");
  18. // 设置消费,一般设置earliest或者latest
  19. props.put("auto.offset.reset", "earliest");
  20. // 序列化
  21. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  22. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  23. KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
  24. consumer.subscribe(Arrays.asList("user_topic"));
  25. while (true) {
  26. ConsumerRecords<String, String> records = consumer.poll(10);
  27. for (ConsumerRecord<String, String> record : records) {
  28. System.out.printf("partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),
  29. record.offset(), record.key(), record.value());
  30. int partition = record.partition();
  31. long offset = record.offset();
  32. String key = record.key();
  33. String value = record.value();
  34. // 打印消费参数
  35. System.out.printf("partition = %d,offset = %d, key = %s, value = %s%n", partition, offset, key, value);
  36. }
  37. consumer.commitAsync();
  38. }
  39. }
  40. }

3.2实现kafka消费的手动提交offset

  1. package com.ztesoft.kafka;
  2. import java.util.Arrays;
  3. import java.util.Properties;
  4. import org.apache.kafka.clients.consumer.ConsumerRecord;
  5. import org.apache.kafka.clients.consumer.ConsumerRecords;
  6. import org.apache.kafka.clients.consumer.KafkaConsumer;
  7. public class MyKafkaConsumer {
  8. public static void main(String[] args) {
  9. Properties props = new Properties();
  10. // kafka地址
  11. props.put("bootstrap.servers", "172.21.72.166:9092");
  12. // 设置消费组
  13. props.put("group.id", "bigdata");
  14. // 是否自动提交
  15. props.put("enable.auto.commit", "false");
  16. // 设置自动提交时间隔
  17. props.put("auto.commit.interval.ms", "1000");
  18. // 设置消费,一般设置earliest或者latest
  19. props.put("auto.offset.reset", "earliest");
  20. // 序列化
  21. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  22. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  23. KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
  24. consumer.subscribe(Arrays.asList("user_topic"));
  25. while (true) {
  26. ConsumerRecords<String, String> records = consumer.poll(10);
  27. for (ConsumerRecord<String, String> record : records) {
  28. System.out.printf("partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),
  29. record.offset(), record.key(), record.value());
  30. int partition = record.partition();
  31. long offset = record.offset();
  32. String key = record.key();
  33. String value = record.value();
  34. // 打印消费参数
  35. System.out.printf("partition = %d,offset = %d, key = %s, value = %s%n", partition, offset, key, value);
  36. }
  37. //每次消费完数据进行异步提交
  38. consumer.commitAsync();
  39. }
  40. }
  41. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/638904
推荐阅读
相关标签
  

闽ICP备14008679号