当前位置:   article > 正文

最简单的kafka接入方式(kafka配置),kafka整合Spring_kafka对接

kafka对接

一.前言.

本文主要介绍了Springboot项目整合kafka的最简单的方式.

二.主要流程.

1.引入Maven
2.增加消费者和生产者配置
3.初始化读取配置
4.进行消费和生产消息
  • 1
  • 2
  • 3
  • 4

三.各个细节,步骤

1. 引入Maven

   <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.1.0</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

2. 增加消费者和生产者配置
这里需要三个配置文件,application.yml,消费者配置文件,生产者配置文件.
在application.yml中指定消费者和生产者配置文件的名称.
application.yml:

kafka:
  consumerTopic: topic1 #消费的topic名称
  consumerConfig: kafka_consumer.properties  #配置文件名称
  consumerCommit: true    #消费者是否提交偏移量
  producerTopic: topic2 #发送的topic名称
  producerConfig: kafka_producer.properties  #配置文件名称
  producerEnable: true  #生产者是否生产开关
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

kafka_consumer.properties:

#消费者组ID
group.id=test
#broker地址 中间以逗号分隔
bootstrap.servers=10.100.100.100:9092
#zookeeper地址
zookeeper.connect=10.100.100.100:2181
#是否自动提交偏移量 建议设置成false  自己控制偏移量提交
enable.auto.commit=false
#提交offset到zookeeper的时间间隔
auto.commit.interval.ms=1000
#重要 消费的起始位置  earliest表示从最早记录开始消费 latest表示从最新记录消费
auto.offset.reset=earliest
#每次拉取的最大条数
max.poll.records=100
#处理的超时时间
session.timeout.ms=100000
#单条消息的最大大小
max.request.size=104857600
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

kafka_producer.properties

#broker地址
bootstrap.servers=10.100.100.100:9092
#重要 副本确认模式 producer的消息发送确认机制
acks=1
#单条消息的最大大小
max.request.size=104857600
#生产缓冲区大小
buffer.memory=104857600
#重试次数
retries=3
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

3. 初始化读取配置.
      (1)创建配置类,利用Springboot自动加载application.yml中配置文件中kafka的相关参数.

@Component
@ConfigurationProperties(prefix = "kafka")
public class KafkaConfig {
    //消费TOPIC
    private String consumerTopic;
    //消费者相关配置文件路径
    private String consumerConfig;
    //消费者是否提交偏移量
    private boolean consumerCommit;
    //生产者生产TOPIC
    private String producerTopic;
    //生产者相关配置文件路径
    private String producerConfig;
    //生产者是否生产开关
    private boolean producerEnable;

    public String getConsumerTopic() {
        return consumerTopic;
    }

    public void setConsumerTopic(String consumerTopic) {
        this.consumerTopic = consumerTopic;
    }

    public String getConsumerConfig() {
        return consumerConfig;
    }

    public void setConsumerConfig(String consumerConfig) {
        this.consumerConfig = consumerConfig;
    }

    public boolean isConsumerCommit() {
        return consumerCommit;
    }

    public void setConsumerCommit(boolean consumerCommit) {
        this.consumerCommit = consumerCommit;
    }

    public String getProducerTopic() {
        return producerTopic;
    }

    public void setProducerTopic(String producerTopic) {
        this.producerTopic = producerTopic;
    }

    public String getProducerConfig() {
        return producerConfig;
    }

    public void setProducerConfig(String producerConfig) {
        this.producerConfig = producerConfig;
    }

    public boolean isProducerEnable() {
        return producerEnable;
    }

    public void setProducerEnable(boolean producerEnable) {
        this.producerEnable = producerEnable;
    }

    /**
    *@Description 将给定topic列表字符串拆解成列表
    *@Param [topics] topic列表字符串,中间逗号分隔
    *@Return java.util.List<java.lang.String> topic列表
    */
    public static List<String> getTopicLists(String topics) {
        String[] items = topics.split(",");
        List<String> result = new LinkedList<>();
        for (String item : items) {
            if (!StringUtils.isBlank(item)) {
                result.add(StringUtils.trim(item));
            }
        }
        return result;
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81

     (2)定义一个service类,加载生产者和消费者真正配置文件,完成生产对象和消费对象的初始化.

    @Autowired
    private KafkaConfig config;
    //消费对象
    private Consumer<String, String> messageConsumer;
    //生产对象
    private Producer<String, String> messageProducer;

    /**
    *@Description Kafka集群的初始化  包含加载配置文件,消费者、生产者的初始化
    */
    public void init() {
        //初始化生产者
        if (messageProducer == null && config.isProducerEnable()) {
            messageProducer = new KafkaProducer<>(PropertyFileUtil.load(config.getProducerConfig()),
                    new StringSerializer(), new StringSerializer());
        }

        //初始化消费者
        if (messageConsumer == null) {
            //已经在配置中配置了消费者组等等信息
            messageConsumer = new KafkaConsumer<>(PropertyFileUtil.load(config.getConsumerConfig()),
                    new StringDeserializer(), new StringDeserializer());
            //订阅主题
            messageConsumer.subscribe(config.getTopicLists(config.getConsumerTopic()));
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
public class PropertyFileUtil {

	/**
	*@Description 加载配置文件
	*/
	public static Properties load(String path) {
		InputStream stream = findInUserDir(path);
		if (stream == null) {
			stream = findInResource(path);
		}
		if (stream == null) {
			return null;
		}
		Properties properties = new Properties();
		try {
			properties.load(stream);
		} catch (IOException ex) {
			throw new RuntimeException(ex);
		}
		return properties;
	}

	private static InputStream findInUserDir(String file) {
		String userDir = System.getProperty("user.dir");
		if (userDir != null) {
			File f = new File(userDir, file);
			if (f.exists() && f.isFile()) {
				try {
					FileInputStream in = new FileInputStream(f);
					logger.info("加载配置:" + f.getAbsolutePath());
					return in;
				} catch (FileNotFoundException e) {
					return null;
				}
			}
		}
		return null;
	}

	private static InputStream findInResource(String file) {
		logger.info("加载配置:classpath:/" + file);
		return PropertyFileUtil.class.getResourceAsStream("/" + file);
	}
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

4. 进行真正的生产和消费.
生产:

 /**
    *@Description Kafka消息发送方式,这里是异步发送
    */
    public void produce(String key, String value) {
        //首先判断如果value为空则跳过
        if(StringUtils.isBlank(value)) {
            return;
        }
        if (messageProducer != null) {
            String topic = config.getProducerTopic();
            messageProducer.send(
                    new ProducerRecord<>(topic, key, value),
                    new Callback() {
                        @Override
                        public void onCompletion(RecordMetadata metadata, Exception ex) {
                            if (ex != null) {
                                // 发生错误
                                LOGGER.error("kafka生产数据异常,异常数据为-->key:{},value:{}",key,value,ex);
                                throw new RuntimeException("kafka生产数据异常...");
                            } else {
                                // 发送成功
                                LOGGER.info("{}异步生产完毕",key);
                            }
                        }
                    });
        }
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

消费:

     //默认为false
    private volatile boolean reqClose;
  /**
    *@Description 消费者消费消息方法
    */
    public void consume() {
        //计数器
        long retry = 0;
        //因为要不断获取消息,所以需要循环
        while (!reqClose) {
            try {
                ConsumerRecords<String, String> records = messageConsumer.poll(1000);
                //会一次返回多个值,所以需要遍历
                for (ConsumerRecord<String, String> record : records) {
                    if (reqClose) {
                        LOGGER.info("跳出消費..");
                        break;
                    }
                    String key =  record.key();
                    String value = record.value();
                    //这里写真正的业务逻辑,处理完再提offset,避免丢失数据
                   
                }
                /**
                 * 手动提交offset,第一次消费数据的时候会从初始值,也就是你当前消费者保存的offset开始消费,并且会持续消费
                 *    下一条数据,因为虽然我们没有提交offset,但是内存中还维护了一个offset,但是一旦我们关闭了消费者,
                 *     再下一次启动时,又会从当前消费者保存的offset开始消费,那也就会造成重复消费.
                 */
                if (config.isConsumerCommit()) {
                    //同步提交,线程会阻塞到当前offset提交成功.
                    messageConsumer.commitSync();
                }
            } catch (Exception ex) {
                if (retry % 60 == 0) {
                    // 防止Kafka失联时,错误过多,控制1分钟出一次日志
                    LOGGER.error("Kafka Operation Failed", ex);
                    retry++;
                }
            }
        }
        // 释放资源
        this.close();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

完整的service处理类,这里定义了一个接口,然后使用一个实现类去实现.

public interface KafkaService {
    /**
    *@Description 初始化Kafka生产者及消费者
    */
    public void init();

    /**
    *@Description 消费kafka消息
    */
    public void consume();

    /**
    *@Description 生产kafka消息
    *@Param [key, value] 消息的key和value
    */
    public void produce(String key, String value);
    /**
    *@Description 关闭释放资源
    */
    void reqClose();

    /**
     * 重置kafka消费的offset
     */
    void resetOffset();


    /**
     *@Description 处理消息的业务方法
     */
    void consumeRecord(String key,String msg);



}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36


@Service
public class KafkaServiceImpl implements KafkaService {

    private static Logger LOGGER = LoggerFactory.getLogger(KafkaServiceImpl.class);

    @Autowired
    private KafkaConfig config;

    private volatile boolean reqClose;
    private volatile boolean closed;

    private Consumer<String, String> messageConsumer;
    private Producer<String, String> messageProducer;

    /**
    *@Description Kafka集群的初始化  包含加载配置文件,消费者、生产者的初始化
    */
    @Override
    public void init() {
        LOGGER.info("初始化生产者..");
        if (messageProducer == null && config.isProducerEnable()) {
            messageProducer = new KafkaProducer<>(PropertyFileUtil.load(config.getProducerConfig()),
                    new StringSerializer(), new StringSerializer());
        }

        LOGGER.info("初始化消费者..");
        if (messageConsumer == null) {
            //已经在配置中配置了消费者组等等信息
            messageConsumer = new KafkaConsumer<>(PropertyFileUtil.load(config.getConsumerConfig()),
                    new StringDeserializer(), new StringDeserializer());
            //订阅主题
            messageConsumer.subscribe(config.getTopicLists(config.getConsumerTopic()));
        }
    }

    /**
    *@Description Kafka消息发送方式
    */
    @Override
    public void produce(String key, String value) {
        //首先判断如果value为空则跳过
        if(StringUtils.isBlank(value)) {
            return;
        }
        if (messageProducer != null) {
            String topic = config.getProducerTopic();
            messageProducer.send(
                    new ProducerRecord<>(topic, key, value),
                    new Callback() {
                        @Override
                        public void onCompletion(RecordMetadata metadata, Exception ex) {
                            if (ex != null) {
                                // 发生错误
                                LOGGER.error("kafka生产数据异常,异常数据为-->key:{},value:{}",key,value,ex);
                                throw new RuntimeException("kafka生产数据异常...");
                            } else {
                                // 发送成功
                                LOGGER.info("{}异步生产完毕",key);
                            }
                        }
                    });
        }
    }

    /**
    *@Description 消费者消费消息方法
    */
    @Override
    public void consume() {
        //计数器
        long retry = 0;
        //因为要不断获取消息,所以需要循环
        while (!reqClose) {
            try {
                ConsumerRecords<String, String> records = messageConsumer.poll(1000);
                //会一次返回多个值,所以需要遍历
                for (ConsumerRecord<String, String> record : records) {
                    if (reqClose) {
                        LOGGER.info("跳出消費..");
                        break;
                    }
                    String key =  record.key();
                    String value = record.value();
                    //处理具体的逻辑业务,一条一条处理,处理完再提交offset,避免丢失数据
                    consumeRecord(key,value);
                }
                /**
                 * 手动提交offset,第一次消费数据的时候会从初始值,也就是你当前消费者保存的offset开始消费,并且会持续消费
                 *    下一条数据,因为虽然我们没有提交offset,但是内存中还维护了一个offset,但是一旦我们关闭了消费者,
                 *     再下一次启动时,又会从当前消费者保存的offset开始消费,那也就会造成重复消费.
                 */
                if (config.isConsumerCommit()) {
                    //同步提交,线程会阻塞到当前offset提交成功.
                    messageConsumer.commitSync();
                }
            } catch (Exception ex) {
                if (retry % 60 == 0) {
                    // 防止Kafka失联时,错误过多,控制1分钟出一次日志
                    LOGGER.error("Kafka Operation Failed", ex);
                    retry++;
                }
            }
        }
        // 释放资源
        this.close();
    }

    /**
    *@Description 处理消息的业务方法
    */
    @Override
    public void consumeRecord(String key,String msg) {
        LOGGER.info("接收到kafka消息,key:{}",key);
       
    }

    /**
    *@Description 重置消费者偏移量
    */
    @Override
    public void resetOffset() {
        Consumer<String, String> consumer = new KafkaConsumer<>(PropertyFileUtil.load(config.getConsumerConfig()),
                new StringDeserializer(), new StringDeserializer());
        List<String> topics = config.getTopicLists(config.getConsumerTopic());
        for (String topic : topics) {
            consumer.subscribe(Arrays.asList(topic));
            ConsumerRecords<String, String> records = consumer.poll(2000);
            Set<TopicPartition> topicList = consumer.assignment();
            Map<TopicPartition, Long> endMap = consumer.endOffsets(topicList);
            Map<TopicPartition, Long> beginmap = consumer.beginningOffsets(topicList);
            long singleTpLagSize = 1000000;
            for (TopicPartition tp : topicList) {
                long endOffset = endMap.get(tp);
                long beginOffset = beginmap.get(tp);
                long aimOffset = endOffset - singleTpLagSize;
                if (aimOffset > 0 && aimOffset >= beginOffset) {
                    consumer.seek(tp, endOffset-singleTpLagSize);
                } else {
                    consumer.seek(tp, beginOffset);
                }
            }
            consumer.commitSync();
        }
    }

    /**
     * 是否关闭kafka得长轮询
     * @return
     */
    public boolean isClosed() {
        return closed;
    }

    /**
     * 关闭资源
     */
    private void close() {
        try {
            if (messageConsumer != null) {
                messageConsumer.close();
            }
            if (messageProducer != null) {
                messageProducer.close();
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }
        closed = true;
        LOGGER.info("Kafka資源釋放完畢!");
    }

    /**
     * 关闭kafka得循环消费逻辑
     */
    public void reqClose() {
        this.reqClose = true;
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181

江湖路远,相逢即为缘分,如略有帮助,一键三联可否,谢谢啦

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/502236
推荐阅读
相关标签
  

闽ICP备14008679号