当前位置:   article > 正文

springboot集成kafka收发消息_springboot集成kafka发送500

springboot集成kafka发送500

引入pom依赖,建议使用gson解析kafka数据,使用fastjson解析有时会出现问题。

  1. <dependency>
  2. <groupId>com.google.code.gson</groupId>
  3. <artifactId>gson</artifactId>
  4. <version>2.8.2</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.kafka</groupId>
  8. <artifactId>spring-kafka</artifactId>
  9. </dependency>
  10. <dependency>
  11. <groupId>com.fasterxml.jackson.core</groupId>
  12. <artifactId>jackson-core</artifactId>
  13. <version>2.9.0</version>
  14. </dependency>
  15. <dependency>
  16. <groupId>com.fasterxml.jackson.core</groupId>
  17. <artifactId>jackson-databind</artifactId>
  18. <version>2.9.0</version>
  19. </dependency>
  20. <dependency>
  21. <groupId>com.fasterxml.jackson.core</groupId>
  22. <artifactId>jackson-annotations</artifactId>
  23. <version>2.9.0</version>
  24. </dependency>

application.properties配置

  1. #消费端配置
  2. kafka.consumer.bootstrap-servers=23.99.189.34:9092,23.99.189.87:9092,23.99.189.98:9092,23.99.189.105:9092,23.99.189.126:9092
  3. kafka.consumer.group-id=group_kafka
  4. kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonSerializer
  5. kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonSerializer
  6. kafka.consumer.enable-auto-commit=false
  7. kafka.consumer.session.timeout=30000
  8. kafka.consumer.concurrency=3
  9. #kafka.consumer.auto-offset-reset=latest
  10. kafka.consumer.auto-offset-reset=earliest
  11. kafka.consumer.auto.commit.interval.ms=120000
  12. kafka.consumer.properties.request.timeout.ms=180000
  13. #生产端配置
  14. spring.kafka.bootstrap-servers=23.99.189.34:9092,23.99.189.87:9092,23.99.189.98:9092,23.99.189.105:9092,23.99.189.126:9092
  15. spring.kafka.producer.retries=0
  16. spring.kafka.producer.acks=all
  17. kafka.producer.topic=abnormal_people
  18. spring.kafka.producer.batch-size=16384
  19. spring.kafka.producer.properties.linger.ms=0
  20. spring.kafka.producer.buffer-memory = 33554432
  21. spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer
  22. spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
  23. # 以下为kafka用户名密码的配置,不开启sasl时将以下配置删除
  24. spring.kafka.producer.properties.sasl.mechanism: PLAIN
  25. spring.kafka.producer.properties.security.protocol: SASL_PLAINTEXT
  26. spring.kafka.producer.properties.sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";

消费者配置

  1. import org.apache.kafka.clients.consumer.ConsumerConfig;
  2. import org.apache.kafka.common.serialization.StringDeserializer;
  3. import org.springframework.beans.factory.annotation.Value;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. import org.springframework.kafka.annotation.EnableKafka;
  7. import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
  8. import org.springframework.kafka.config.KafkaListenerContainerFactory;
  9. import org.springframework.kafka.core.ConsumerFactory;
  10. import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
  11. import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
  12. import org.springframework.kafka.listener.ContainerProperties;
  13. import java.util.HashMap;
  14. import java.util.Map;
  15. @Configuration
  16. @EnableKafka
  17. public class kafkaConsumerConfig {
  18. @Value("${kafka.consumer.bootstrap-servers}")
  19. private String servers;
  20. @Value("${kafka.consumer.enable-auto-commit}")
  21. private boolean enableAutoCommit;
  22. @Value("${kafka.consumer.session.timeout}")
  23. private String sessionTimeout;
  24. @Value("${kafka.consumer.auto.commit.interval.ms}")
  25. private String autoCommitInterval;
  26. @Value("${kafka.consumer.group-id}")
  27. private String groupId;
  28. @Value("${kafka.consumer.auto-offset-reset}")
  29. private String autoOffsetReset;
  30. @Value("${kafka.consumer.concurrency}")
  31. private int concurrency;
  32. @Bean("kafkaListenerContainnerContainerFactory")
  33. public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
  34. ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  35. factory.setConsumerFactory(consumerFactory());
  36. factory.setConcurrency(concurrency);
  37. factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
  38. factory.getContainerProperties().setPollTimeout(1500);
  39. return factory;
  40. }
  41. private ConsumerFactory<String, String> consumerFactory() {
  42. return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  43. }
  44. private Map<String, Object> consumerConfigs() {
  45. Map<String, Object> propsMap = new HashMap<>();
  46. propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
  47. propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
  48. propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
  49. propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
  50. propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  51. propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  52. propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
  53. propsMap.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,100);
  54. propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
  55. //kafka有jaas验证配置
  56. propsMap.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\"cigc_dsjj\" password=\"1qaz2wsx\";");
  57. //kafka有jaas验证配置
  58. propsMap.put("sasl.mechanism","PLAIN");
  59. //kafka有jaas验证配置
  60. propsMap.put("security.protocol", "SASL_PLAINTEXT");
  61. return propsMap;
  62. }
  63. }

消费者

  1. package com.abc.renchewu.kafka;
  2. import com.abc.renchewu.kafka.entity.CarEntity;
  3. import com.google.gson.Gson;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.apache.kafka.clients.consumer.ConsumerRecord;
  6. import org.springframework.kafka.annotation.KafkaListener;
  7. import org.springframework.kafka.support.Acknowledgment;
  8. import org.springframework.stereotype.Component;
  9. import java.util.Optional;
  10. /**
  11. * kafka监听消费
  12. * @author ws
  13. *
  14. */
  15. @Component
  16. @Slf4j
  17. public class KafkaConsumerListener {
  18. @KafkaListener(topics = "CIGC_DSJJ_COV19_AREA", containerFactory = "kafkaListenerContainnerContainerFactory")
  19. public void listenOnMessageManual(ConsumerRecord<String, String> record,
  20. Acknowledgment ack) {
  21. Optional<?> kafkaMessage = Optional.ofNullable(record.value());
  22. if (kafkaMessage.isPresent()) {
  23. Gson gson = new Gson();
  24. log.info("接收车辆消息:{}", kafkaMessage.get());
  25. CarEntity carEntity = gson.fromJson(kafkaMessage.get().toString(), CarEntity.class);
  26. //手动签收
  27. ack.acknowledge();
  28. }
  29. }
  30. }

生产者

  1. package com.dcqc.kafkatranscode.utils;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.beans.factory.annotation.Value;
  5. import org.springframework.kafka.core.KafkaTemplate;
  6. import org.springframework.stereotype.Component;
  7. @Component
  8. @Slf4j
  9. public class kafkaUtils {
  10. @Value("${kafka.producer.topic}")
  11. private String topicName;
  12. @Autowired
  13. private KafkaTemplate kafkaTemplate;
  14. /**
  15. * 发送kafka消息
  16. */
  17. public void send(String jsonMessage) {
  18. kafkaTemplate.send(topicName, jsonMessage);
  19. }
  20. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Li_阴宅/article/detail/936165
推荐阅读
相关标签
  

闽ICP备14008679号