当前位置:   article > 正文

RabbitMQ学习之路 - 消费者并发配置及消息手动确认机制_rabbitmq配置文件设置并发

rabbitmq配置文件设置并发

最近在系统学习RabbitMQ的实战课程,感觉很受用,总结一下以供参考及分享
搭建spring boot项目,引入rabbitmq依赖。这里不再赘述
一、application.properties配置

server.port=8730
server.servlet.context-path=/mq

#rabbitmq配置
spring.rabbitmq.virtual-host=/
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

spring.rabbitmq.listener.concurrency=10
spring.rabbitmq.listener.max-concurrency=20
spring.rabbitmq.listener.prefetch=5

mq.env=local

simple.mq.queue.name=${mq.env}.simple.mq.queue
simple.mq.exchange.name=${mq.env}.simple.mq.exchange
simple.mq.routing.key.name=${mq.env}.simple.mq.routing.key
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

二、创建RabbitMQConfig配置类

package com.jvli.project.config;

import org.slf4j.Logger; 
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;

import com.jvli.project.rabbitmqlistener.SimpleListener;

@Configuration
public class RabbitMqConfig {

	private static final Logger logger = LoggerFactory.getLogger(RabbitMqConfig.class);

	@Autowired
	private Environment env;

	@Autowired
	private CachingConnectionFactory connectionFactory;

	@Autowired
	private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;

	@Autowired
	private SimpleListener simpleListener;

	@Bean
	public RabbitTemplate rabbitTemplate() {
		connectionFactory.setPublisherConfirms(true);
		connectionFactory.setPublisherReturns(true);
		RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
		rabbitTemplate.setMandatory(true);
		rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

			@Override
			public void confirm(CorrelationData correlationData, boolean ack, String cause) {
				logger.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause);

			}
		});
		rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {

			@Override
			public void returnedMessage(Message message, int replyCode, String replyText, String exchange,
					String routingKey) {
				logger.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey,
						replyCode, replyText, message);
			}
		});
		return rabbitTemplate;
	}

	// TODO: 并发配置-消息手动确认机制
	// 1. 创建消息模型
	@Bean(name = "simpleQueue")
	public Queue simpleQueue() {
		return new Queue(env.getProperty("simple.mq.queue.name"), true); // true表示持久化
	}

	@Bean
	public TopicExchange simpleExchange() {
		return new TopicExchange(env.getProperty("simple.mq.exchange.name"), true, false); // true表示持久化,false表示不自动删除
	}

	@Bean
	public Binding simpleBinding() {
		return BindingBuilder.bind(simpleQueue()).to(simpleExchange())
				.with(env.getProperty("simple.mq.routing.key.name"));
	}
	
	// 2.创建并发、手动确认监听容器
	@Bean(name = "simpleContainer")
	public SimpleMessageListenerContainer simpleContainer(@Qualifier("simpleQueue") Queue simpleQueue) {
		SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
		container.setConnectionFactory(connectionFactory);
		//container.setMessageConverter(new Jackson2JsonMessageConverter());
		// TODO:并发配置
		container.setConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.concurrency", Integer.class));
		container.setMaxConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.max-concurrency", Integer.class));
		container.setPrefetchCount(env.getProperty("spring.rabbitmq.listener.prefetch", Integer.class));

		// TODO:消息确认-确认机制种类
		container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
		container.setQueues(simpleQueue);
		container.setMessageListener(simpleListener);
		return container;
	}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107

三、创建消费监听

package com.jvli.project.rabbitmqlistener;

import org.slf4j.Logger; 
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;

@Component("simpleListener")
public class SimpleListener implements ChannelAwareMessageListener, MessageListener{

	private static final Logger logger = LoggerFactory.getLogger(SimpleListener.class);
	
	@Autowired
	private ObjectMapper objectMapper;
	
	@Override
	public void onMessage(Message message, Channel channel) throws Exception {
		long tag = message.getMessageProperties().getDeliveryTag();
		
		try {
			byte[] body = message.getBody();
	        //User user=objectMapper.readValue(body,User.class);
			String msg = new String(body,"UTF-8");
			logger.info("当前线程:{},简单消息监听确认机制监听到消息:{}",Thread.currentThread().getName(),msg);
			
			//int i=1/0;  测试抛出异常
			
			//手动确认
			channel.basicAck(tag, true);
		} catch (Exception e) {
			logger.error("简单消息监听确认机制发生异常:{}",e.fillInStackTrace());
			channel.basicReject(tag, false);
		}
		
	}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

四、创建Controller,触发事件

package com.jvli.project.controller;

import org.slf4j.Logger; 
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.jvli.project.dto.User;
import com.jvli.project.response.BaseResponse;
import com.jvli.project.response.StatusCode;


@RestController
@RequestMapping("/ack")
public class AcknowledgeController {

	private static final Logger logger = LoggerFactory.getLogger(AcknowledgeController.class);
	
	@Autowired
	private RabbitTemplate rabbitTemplate;
	
	@Autowired
	private ObjectMapper objectMapper;
	
	@Autowired
	private Environment env;
	
	@RequestMapping(value = "/user/info",method = RequestMethod.GET)
	public BaseResponse ackUser() {
	
		try {
			rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
			rabbitTemplate.setExchange(env.getProperty("simple.mq.exchange.name"));
			rabbitTemplate.setRoutingKey(env.getProperty("simple.mq.routing.key.name"));
			
			for (int i = 1; i <= 50; i++) {
				
				User user = new User(i, "army", "acknowledge");
				Message message = MessageBuilder.withBody(objectMapper.writeValueAsBytes(user)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
				rabbitTemplate.convertAndSend(message);
			}
			
		} catch (Exception e) {
			e.printStackTrace();
		}
		return new BaseResponse<>(StatusCode.SUCCESS);
	}
	
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

测试效果:

  1. 启动项目之后,登录RabbitMQ 可视化客户端,可以看到我们创建的队列确实对应了10个消费者
    在这里插入图片描述
    在这里插入图片描述
    2.debug请求接口测试
    在SimpleListener消费端打断点
    在这里插入图片描述
    此时,我们再来观察RabbitMQ的客户端,待确认的消息有50条
    在这里插入图片描述
    然后,我们放行,观察RabbitMQ客户端和开发工具的控制台。确实有10个消费者在消费,并且每次拉取5条消息(container.setPrefetchCount()属性)
    在这里插入图片描述

在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/386668
推荐阅读
相关标签
  

闽ICP备14008679号