当前位置:   article > 正文

springboot对接kafka_springboot接入kafka

springboot接入kafka

第一次做为生产端进行接入kafka,网上找了一些相关的接入方式,整理了一份比较简单的接入方式

首先是对接的工具类 KafkaUtil.java

  1. package com.ruoyi.soaworkflow.utils.kafka;
  2. import java.util.Properties;
  3. import org.apache.kafka.clients.consumer.KafkaConsumer;
  4. import org.apache.kafka.clients.producer.KafkaProducer;
  5. import org.apache.kafka.clients.producer.ProducerConfig;
  6. import org.apache.kafka.common.serialization.StringSerializer;
  7. import org.springframework.beans.factory.annotation.Value;
  8. import org.springframework.stereotype.Component;
  9. /**
  10. kafka有用户名验证的配置
  11. */
  12. @Component //添加注释进行实例化
  13. public class KafkaUtil {
  14. //服务器地址 没密码使用PLAINTEXT前缀 有密码了使用SASL_PLAINTEXT前缀
  15. //public static final String servers="SASL_PLAINTEXT://10.172.16.27:1024,10.172.16.28:1024,10.172.16.29:1024";
  16. //要使用set方式读取配置文件里面内容将数据放到静态变量中
  17. private static String bootstrapServers;
  18. @Value("${spring.kafka.bpm.bootstrap-servers}")
  19. public void setBootstrapServers(String bootstrapServers){
  20. KafkaUtil.bootstrapServers = bootstrapServers;
  21. }
  22. private static Integer retries;
  23. @Value("${spring.kafka.bpm.producer.retries}")
  24. public void setRetries(Integer retries){
  25. KafkaUtil.retries = retries;
  26. }
  27. private static Integer batchSize;
  28. @Value("${spring.kafka.bpm.producer.batch-size}")
  29. public void setBatchSize(Integer batchSize){
  30. KafkaUtil.batchSize = batchSize;
  31. }
  32. private static Integer bufferMemory;
  33. @Value("${spring.kafka.bpm.producer.buffer-memory}")
  34. public void setBufferMemory(Integer bufferMemory){
  35. KafkaUtil.bufferMemory = bufferMemory;
  36. }
  37. private static Integer linger;
  38. @Value("${spring.kafka.bpm.producer.linger}")
  39. public void setLinger(Integer linger){
  40. KafkaUtil.linger = linger;
  41. }
  42. private static String acks;
  43. @Value("${spring.kafka.bpm.producer.acks}")
  44. public void setAcks(String acks){
  45. KafkaUtil.acks = acks;
  46. }
  47. private static String username;
  48. @Value("${spring.kafka.bpm.producer.username}")
  49. public void setUsername(String username){
  50. KafkaUtil.username = username;
  51. }
  52. private static String passwd;
  53. @Value("${spring.kafka.bpm.producer.passwd}")
  54. public void setPasswd(String passwd){
  55. KafkaUtil.passwd = passwd;
  56. }
  57. //kafka集群生产者配置
  58. public static KafkaProducer<String, String> getProducer() {
  59. Properties props = new Properties();
  60. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers );
  61. props.put(ProducerConfig.ACKS_CONFIG, acks);
  62. props.put(ProducerConfig.RETRIES_CONFIG, retries);
  63. props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
  64. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  65. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  66. props.setProperty("security.protocol", "SASL_PLAINTEXT");
  67. props.setProperty("sasl.mechanism", "SCRAM-SHA-512");
  68. String jassc = "org.apache.kafka.common.security.scram.ScramLoginModule required username=" + username + " password=" + passwd + ";";
  69. props.setProperty("sasl.jaas.config", jassc);
  70. KafkaProducer<String, String> kp = new KafkaProducer<String, String>(props);
  71. return kp;
  72. }
  73. public static KafkaConsumer<String, String> getConsumer(String groupId,String username,String passwd) {
  74. Properties props = new Properties();
  75. props.put("bootstrap.servers", "SASL_PLAINTEXT://10.172.16.27:1024,10.172.16.28:1024,10.172.16.29:1024");
  76. props.put("auto.offset.reset", "earliest"); //必须要加,如果要读旧数据
  77. props.put("group.id", groupId);
  78. props.put("enable.auto.commit", "false");
  79. props.put("auto.commit.interval.ms", "100");
  80. props.put("max.partition.fetch.bytes", "10240");//每次拉取的消息字节数,10K?,每次取回20条左右
  81. props.put("session.timeout.ms", "30000");
  82. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  83. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  84. props.setProperty ("security.protocol", "SASL_PLAINTEXT");
  85. props.setProperty ("sasl.mechanism", "SCRAM-SHA-512");
  86. String jassc = "org.apache.kafka.common.security.scram.ScramLoginModule required username=" + username + " password=" + passwd + ";";
  87. props.setProperty("sasl.jaas.config", jassc);
  88. KafkaConsumer<String, String> kc = new KafkaConsumer<String, String>(props);
  89. return kc;
  90. }
  91. }

然后是配置文件里面内容,当然也可以直接写死在工具类中

  1. spring:
  2. kafka:
  3. bpm:
  4. bootstrap-servers: SASL_PLAINTEXT://10.172.16.27:1024,10.172.16.28:1024,10.172.16.29:1024
  5. producer:
  6. retries: 1 #发送失败后的重复发送次数
  7. batch-size: 2 #一次最多发送数据量
  8. buffer-memory: 33554432 #32M批处理缓冲区
  9. linger: 1000
  10. acks: all
  11. username: aaaaa
  12. passwd: bbbbb

pom文件中也要引入对应的jar

  1. <dependency>
  2. <groupId>org.springframework.kafka</groupId>
  3. <artifactId>spring-kafka</artifactId>
  4. </dependency>

然后就是生产者 ProducerClient.java

  1. package com.ruoyi.soaworkflow.utils.kafka;
  2. import java.util.concurrent.Future;
  3. import com.alibaba.fastjson.JSONObject;
  4. import org.apache.kafka.clients.producer.Callback;
  5. import org.apache.kafka.clients.producer.Producer;
  6. import org.apache.kafka.clients.producer.ProducerRecord;
  7. import org.apache.kafka.clients.producer.RecordMetadata;
  8. /**
  9. * 生产者
  10. */
  11. public class ProducerClient {
  12. /**
  13. * @param args
  14. */
  15. public static void main(String[] args) {
  16. // TODO Auto-generated method stub
  17. }
  18. private static Producer<String, String> producer = KafkaUtil.getProducer();
  19. public static void sendToKafka(String topic,String processId,JSONObject bpmData) {
  20. try {
  21. final ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic,
  22. processId, bpmData.toJSONString());
  23. Future<RecordMetadata> send = producer.send(record, new Callback() {
  24. @Override
  25. public void onCompletion(RecordMetadata metadata, Exception e) {
  26. if (e != null) {
  27. e.printStackTrace();
  28. }
  29. }
  30. });
  31. System.out.println("sendToKafka-发送至Kafka:" + "d+key-" + processId);
  32. } catch (Exception e) {
  33. e.printStackTrace();
  34. }
  35. producer.close();
  36. }
  37. }

消费端代码 ConsumerClient.java

  1. package com.ruoyi.soaworkflow.utils.kafka;
  2. import java.util.Arrays;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.apache.kafka.clients.consumer.ConsumerRecords;
  5. import org.apache.kafka.clients.consumer.KafkaConsumer;
  6. /**
  7. 消费者
  8. */
  9. public class ConsumerClient {
  10. public static KafkaConsumer<String, String> consumer = null;
  11. public static void main(String[] args) {
  12. fecthKafka();
  13. }
  14. public static void fecthKafka() {
  15. consumer = KafkaUtil.getConsumer("testGroup1","oaadmin","NTA4YjRhZDBmYjQ3"); //group
  16. consumer.subscribe(Arrays.asList("3_kjczxsmrtj"));//topic
  17. int i=0;
  18. while (true) {
  19. ConsumerRecords<String, String> records ;
  20. try {
  21. records = consumer.poll(Long.MAX_VALUE);//毫秒
  22. }catch (Exception e){
  23. e.printStackTrace();
  24. continue;
  25. }
  26. for (ConsumerRecord<String, String> record : records) {
  27. System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ",key: " + record.key() + ",value:" + record.value() );
  28. i++;
  29. System.out.println(i);
  30. }
  31. try {
  32. consumer.commitSync();
  33. } catch (Exception e) {
  34. e.printStackTrace();
  35. continue;
  36. }
  37. }
  38. }
  39. }

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/码创造者/article/detail/951030
推荐阅读
相关标签
  

闽ICP备14008679号