赞
踩
import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ContainerProperties; import java.util.HashMap; import java.util.Map; /** * kafka整体配置类 * * @author Dean */ @Configuration public class KafkaConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.admin.client-id}") private String adminClientId; @Bean public AdminClient adminClient() { Map<String, Object> configs = new HashMap<>(5); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configs.put(AdminClientConfig.CLIENT_ID_CONFIG, adminClientId); return AdminClient.create(configs); } /** * 如果有多个消费组,需要定义多个不同的ConcurrentKafkaListenerContainerFactory * * @return ConcurrentKafkaListenerContainerFactory */ @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); String groupId = "dmGroup"; props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); //是否自动提交ack props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); //一次拉取最大数据量,默认值为500,如果拉取时不足配置的条数则有多少拉取多少 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); //是否批量这个设置好像只对配置了@KafkaListener的方法有用 factory.setBatchListener(false); factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props)); //手动提交ack factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); return factory; } }
import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.listener.AcknowledgingMessageListener; import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Service; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiConsumer; /** * kafka中间件的逻辑封装类 */ @Slf4j @Service public class KafkaListenerManagement { private final Map<String, MessageListenerContainer> containers = new ConcurrentHashMap<>(); /** * 如果有多个消费组,需要注入多个不同的ConcurrentKafkaListenerContainerFactory */ private final ConcurrentKafkaListenerContainerFactory<String, String> containerFactory; @Autowired public KafkaListenerManagement(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) { this.containerFactory = containerFactory; } /** * 开启Topic的监听 * * @param topic topic * @param bizLogicConsumer 消息的业务逻辑处理 */ public void startListening(String topic, BiConsumer<String, Acknowledgment> bizLogicConsumer) { //必须手动提交ACK,否则停止监听后重新监听可能导致拉取到重复的记录 AcknowledgingMessageListener<String, String> messageListener = (message, acknowledgment) -> bizLogicConsumer.accept(message.value(), acknowledgment); MessageListenerContainer container = containerFactory.createContainer(topic); container.setupMessageListener(messageListener); container.start(); containers.put(topic, container); } /** * 暂停监听 * * @param topic topic */ public void pauseListening(String topic) { MessageListenerContainer container = containers.get(topic); container.pause(); } /** * 暂停后继续监听 * * @param topic topic */ public void resumeListening(String topic) { MessageListenerContainer container = containers.get(topic); container.resume(); } /** * 停止监听 * * @param topic topic */ public void stopListening(String topic) { MessageListenerContainer container = containers.remove(topic); if (container != null) { container.stop(); } } }
/** * Kafka生产者 * * @author LiuChang */ @Service public class KafkaProducerManagement { @Resource private KafkaTemplate<String, String> kafkaTemplate; /** * 异步发送 * * @param topic topic * @param message 消息 * @return ListenableFuture */ public ListenableFuture<SendResult<String, String>> send(String topic, String message) { return kafkaTemplate.send(topic, message); } }
import com.feiynn.kafka.management.KafkaListenerManagement; import com.feiynn.kafka.management.KafkaProducerManagement; import lombok.extern.slf4j.Slf4j; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; import org.springframework.util.concurrent.ListenableFuture; import javax.annotation.Resource; import java.util.concurrent.TimeUnit; /** * 业务逻辑类 * 注意:业务逻辑类不建议直接调用kafka的API,都调用封装后的Kafka相关的Management类 * * @author Dean */ @Slf4j @Service public class BizService { @Resource private KafkaListenerManagement kafkaListenerManagement; @Resource private KafkaProducerManagement kafkaProducerManagement; /** * 开启topic监听后的业务逻辑 * * @param topic topic */ public void startListening(String topic) { kafkaListenerManagement.startListening(topic, (data, acknowledgment) -> { //消息处理业务逻辑 log.info("Received message value: [{}]", data); try { //降低消费速率,方便观察日志 TimeUnit.MILLISECONDS.sleep(100L); } catch (InterruptedException e) { e.printStackTrace(); } acknowledgment.acknowledge(); }); } /** * 停止topic监听 * * @param topic topic */ public void stopListening(String topic) { kafkaListenerManagement.stopListening(topic); } /** * 暂停监听 * * @param topic topic */ public void pauseListening(String topic) { kafkaListenerManagement.pauseListening(topic); } /** * 暂停后继续监听 * * @param topic topic */ public void resumeListening(String topic) { kafkaListenerManagement.resumeListening(topic); } /** * 发送消息 * * @param topic topic * @param message 消息 */ public void sendMsg(String topic, String message) { ListenableFuture<SendResult<String, String>> listenableFuture = kafkaProducerManagement.send(topic, message); //添加回调逻辑,异步获取发送结果 listenableFuture.addCallback((sendResult) -> { //发送成功 log.trace("Send [{}] success", message); }, (e) -> { //发送失败,可以执行降级策略,或者把消息写入日志后续进行统一处理 log.error("Send [{}] failed", message, e); }); } }
@Slf4j @RunWith(SpringRunner.class) @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = KafkaAdvancedApplication.class) public class KafkaAdvancedTest { @Resource private BizService bizService; /** * 测试topic监听的开启、暂停、暂停后重新开始、停止 */ @Test public void startStopListening() throws InterruptedException { String topicDm = "dm0"; //开启topic监听 bizService.startListening(topicDm); TimeUnit.SECONDS.sleep(2); //消息前缀,用来区分是上一次发送的未消费完的消息还是本次发送的消息 String msgPre = LocalTime.now().toString(); log.info("msgPre=[{}]", msgPre); for (int i = 0; i < 2000; i++) { bizService.sendMsg(topicDm, "Msg_" + msgPre + "_" + i); } TimeUnit.SECONDS.sleep(5); log.info("pause listening begin"); bizService.pauseListening(topicDm); log.info("pause listening success"); //暂停监听成功后,消费者会把配置max.poll.records条数的消息消费完才会真正停止,因此停顿足够长的时间后观察消息消费的日志是否会暂停输出 TimeUnit.SECONDS.sleep(20); log.info("resume listening"); //暂停后重新开启消息监听 bizService.resumeListening(topicDm); TimeUnit.SECONDS.sleep(20); //新一轮暂停与重启 log.info("pause listening again"); bizService.pauseListening(topicDm); TimeUnit.SECONDS.sleep(10); log.info("resume listening again"); bizService.resumeListening(topicDm); //继续消费一段时间 TimeUnit.SECONDS.sleep(10); //消费一段时间后停止监听 log.info("stop listening"); bizService.stopListening(topicDm); TimeUnit.SECONDS.sleep(20); //重新开启topic监听 log.info("start listening again"); bizService.startListening(topicDm); TimeUnit.SECONDS.sleep(120); } }
直接运行测试用例,通过观察日志,即可看出各种操作效果
遇到停止监听topic后,从看到消费消息的日志观察,有时会一直打印,有时会打印一段时间就停止打印的问题,最终发现暂停监听方法调用成功后,消费者会把配置max.poll.records条数的消息消费完才会真正暂停或者停止。
另外如果不是手动提交ack,停止stop(不是暂停pause)订阅topic然后后重新开始订阅(start),可能会出现重复消费消息的问题,改成手动提交ack后问题不再出现。
还考虑到max.poll.interval.ms 最大拉取时间间隔是5分钟,尝试了暂停5分30秒看是否消费者会被因为rebalance,导致在resume重新监听无法成功,测试结果是没有问题,可以成功继续监听并消费消息。
代码已使用到生产环境。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。