当前位置:   article > 正文

在Java中使用Kafka

kafka java

Producer部分

Producer在实例化后, 对外提供send方法, 用于将数据送到指定的topic和partition; 以及在退出时需要的destroy方法.

接口 KafkaProducer.java

  1. import java.util.List;
  2. import java.util.Properties;
  3. public interface KafkaProducer<D> {
  4. default void init() {
  5. }
  6. default void destroy() {
  7. }
  8. boolean send(String topic, D data);
  9. boolean send(String topic, Integer partition, D data);
  10. boolean send(String topic, List<D> dataList);
  11. boolean send(String topic, Integer partition, List<D> dataList);
  12. /**
  13. * 默认配置
  14. */
  15. default Properties getDefaultProps() {
  16. Properties props = new Properties();
  17. props.put("acks", "1");
  18. props.put("retries", 1);
  19. props.put("batch.size", 16384);
  20. props.put("linger.ms", 1);
  21. props.put("buffer.memory", 32 * 1024 * 1024L);
  22. return props;
  23. }
  24. }

参数说明

  1. Properties props = new Properties();
  2. props.put("bootstrap.servers", "localhost:9092");
  3. // The acks config controls the criteria under which requests are considered complete. The "all" setting we have specified will result in blocking on the full commit of the record, the slowest but most durable setting.
  4. props.put("acks", "all");
  5. // If the request fails, the producer can automatically retry, though since we have specified retries as 0 it won't. Enabling retries also opens up the possibility of duplicates (see the documentation on message delivery semantics for details).
  6. props.put("retries", 0);
  7. // The producer maintains buffers of unsent records for each partition. These buffers are of a size specified by the batch.size config. Making this larger can result in more batching, but requires more memory (since we will generally have one of these buffers for each active partition).
  8. props.put("batch.size", 16384);
  9. // By default a buffer is available to send immediately even if there is additional unused space in the buffer. However if you want to reduce the number of requests you can set linger.ms to something greater than 0. This will instruct the producer to wait up to that number of milliseconds before sending a request in hope that more records will arrive to fill up the same batch.
  10. props.put("linger.ms", 1);
  11. // 生产者缓冲大小,当缓冲区耗尽后,额外的发送调用将被阻塞。时间超过max.block.ms将抛出TimeoutException
  12. props.put("buffer.memory", 33554432);
  13. // The key.serializer and value.serializer instruct how to turn the key and value objects the user provides with their ProducerRecord into bytes. You can use the included ByteArraySerializer or StringSerializer for simple string or byte types.
  14. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  15. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

实现 KafkaProducerImpl.java

  1. import com.google.common.base.Strings;
  2. import org.apache.kafka.clients.producer.Producer;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import java.util.List;
  7. import java.util.Map;
  8. import java.util.Properties;
  9. public class KafkaProducerImpl<D> implements KafkaProducer<D> {
  10. private static final Logger logger = LoggerFactory.getLogger(KafkaProducerImpl.class);
  11. private final Producer<D, D> producer;
  12. public KafkaProducerImpl() {
  13. Properties props = this.getDefaultProps();
  14. props.put("bootstrap.servers", servers);
  15. props.put("key.serializer", serializer);
  16. props.put("value.serializer", serializer);
  17. producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
  18. }
  19. @Override
  20. public void destroy() {
  21. if (producer != null) {
  22. producer.close();
  23. }
  24. }
  25. @Override
  26. public boolean send(String topic, D data) {
  27. boolean isSuc = true;
  28. try {
  29. producer.send(new ProducerRecord<>(topic, data));
  30. } catch (Exception e) {
  31. isSuc = false;
  32. logger.error(String.format("KafkaStringProducer send error.topic:[%s],data:[%s]", topic, data), e);
  33. }
  34. return isSuc;
  35. }
  36. @Override
  37. public boolean send(String topic, Integer partition, D data) {
  38. boolean isSuc = true;
  39. try {
  40. producer.send(new ProducerRecord<>(topic, partition, null, data));
  41. } catch (Exception e) {
  42. isSuc = false;
  43. logger.error(String.format("KafkaStringProducer send error.topic:[%s],data:[%s]", topic, data), e);
  44. }
  45. return isSuc;
  46. }
  47. @Override
  48. public boolean send(String topic, List<D> dataList) {
  49. boolean isSuc = true;
  50. try {
  51. if (dataList != null) {
  52. dataList.forEach(item -> producer.send(new ProducerRecord<>(topic, item)));
  53. }
  54. } catch (Exception e) {
  55. isSuc = false;
  56. logger.error(String.format("KafkaStringProducer send error.topic:[%s],dataList:[%s]", topic, dataList), e);
  57. }
  58. return isSuc;
  59. }
  60. @Override
  61. public boolean send(String topic, Integer partition, List<D> dataList) {
  62. boolean isSuc = true;
  63. try {
  64. if (dataList != null) {
  65. dataList.forEach(item -> producer.send(new ProducerRecord<>(topic, partition, null, item)));
  66. }
  67. } catch (Exception e) {
  68. isSuc = false;
  69. logger.error(String.format("KafkaStringProducer send error.topic:[%s],partition[%s],dataList:[%s]", topic, partition, dataList), e);
  70. }
  71. return isSuc;
  72. }
  73. }

 

Consumer 部分

Consumer 在实例化后, 负责将ConsumerListener添加到列表, 并订阅指定的topic, 启动一个阻塞的循环, 在收到消息后依次调用ConsumerListener进行处理

接口 KafkaConsumer.java

  1. import java.util.Properties;
  2. public interface KafkaConsumer {
  3. default void init() {
  4. }
  5. default void destroy() {
  6. }
  7. void start();
  8. /**
  9. * 默认配置
  10. */
  11. default Properties getDefaultProps() {
  12. Properties props = new Properties();
  13. props.put("enable.auto.commit", "true");
  14. props.put("auto.commit.interval.ms", "1000");
  15. props.put("session.timeout.ms", "30000");
  16. return props;
  17. }
  18. }  

参数说明

  1. Properties props = new Properties();
  2. props.put("bootstrap.servers", "localhost:9092");
  3. props.put("group.id", "test");
  4. // Setting enable.auto.commit means that offsets are committed automatically with a frequency controlled by the config auto.commit.interval.ms.
  5. props.put("enable.auto.commit", "true");
  6. props.put("auto.commit.interval.ms", "1000");
  7. // The deserializer settings specify how to turn bytes into objects. For example, by specifying string deserializers, we are saying that our record's key and value will just be simple strings.
  8. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  9. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  10. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  11. // This consumer is subscribing to the topics foo and bar as part of a group of consumers called test as configured with group.id.
  12. consumer.subscribe(Arrays.asList("foo", "bar"));
  13. while (true) {
  14. ConsumerRecords<String, String> records = consumer.poll(100);
  15. for (ConsumerRecord<String, String> record : records)
  16. System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
  17. }

实现 KafkaConsumerImpl.java

  1. import com.google.common.base.Strings;
  2. import org.apache.kafka.clients.consumer.Consumer;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.apache.kafka.clients.consumer.ConsumerRecords;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. import java.util.*;
  8. public class KafkaConsumerImpl<K, V> implements KafkaConsumer {
  9. private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerImpl.class);
  10. private final List<KafkaConsumerListener<K, V>> consumerListeners = new ArrayList<>();
  11. private Consumer<K, V> consumer;
  12. private boolean running = true;
  13. private final int waitingTimeout = 100;
  14. public KafkaConsumerImpl(String topic, String groupId, String deserializer) {
  15. Properties props = this.getDefaultProps();
  16. props.put("group.id", groupId);
  17. props.put("bootstrap.servers", servers);
  18. props.put("key.deserializer", deserializer);
  19. props.put("value.deserializer", deserializer);
  20. consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(props);
  21. consumer.subscribe(Arrays.asList(topic));
  22. }
  23. public void setConsumerListeners(List<KafkaConsumerListener<K, V>> consumerListeners) {
  24. synchronized (this) {
  25. this.consumerListeners.clear();
  26. if (null != consumerListeners && 0 != consumerListeners.size()) {
  27. consumerListeners.forEach(this.consumerListeners::add);
  28. }
  29. }
  30. }
  31. public void addConsumerListener(KafkaConsumerListener<K, V> consumerListener) {
  32. synchronized (this) {
  33. if (null != consumerListener && !this.consumerListeners.contains(consumerListener)) {
  34. this.consumerListeners.add(consumerListener);
  35. }
  36. }
  37. }
  38. public void removeConsumerListener(KafkaConsumerListener<K, V> consumerListener) {
  39. synchronized (this) {
  40. if (null != consumerListener && this.consumerListeners.contains(consumerListener)) {
  41. this.consumerListeners.remove(consumerListener);
  42. }
  43. }
  44. }
  45. @Override
  46. public void init() {
  47. this.start();
  48. }
  49. @Override
  50. public void destroy() {
  51. running = false;
  52. }
  53. @Override
  54. public void start() {
  55. new Thread(() -> {
  56. while (running) {
  57. ConsumerRecords<K, V> records = consumer.poll(waitingTimeout);
  58. for (ConsumerRecord<K, V> record : records) {
  59. if (consumerListeners != null) {
  60. K key = record.key();
  61. if (key == null)
  62. consumerListeners.forEach(consumer -> consumer.consume(record.value()));
  63. else
  64. consumerListeners.forEach(consumer -> consumer.consume(record.key(), record.value()));
  65. }
  66. }
  67. }
  68. //should use consumer in different thread, or it will throw ConcurrentModificationException
  69. if (consumer != null) {
  70. try {
  71. logger.info("start to close consumer.");
  72. consumer.close();
  73. } catch (Exception e) {
  74. logger.error("close kafka consumer error.", e);
  75. }
  76. consumer = null;
  77. }
  78. }).start();
  79. }
  80. }

接口 KafkaConsumerListener.java

  1. public interface KafkaConsumerListener<K, V> {
  2. void consume(V value);
  3. default void consume(K key, V value) {
  4. consume(value);
  5. }
  6. }

.

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/985132
推荐阅读
相关标签
  

闽ICP备14008679号