赞
踩
作者RabbitmQ版本
erlang 版本:21.3
RabbitMQ 版本:3.7.14
要是用延迟消费的话,需要去下载一个插件:rabbitmq_delayed_message_exchange插件
下载地址:Community Plugins — RabbitMQ
rabbitmq-delayed-message-exchange v3.8版本适用于RabbitMQ3.7.X版本,插件要与RabbitMQ版本对应,不然会出现莫名其妙的错误
下载好后放到rabbitmq安装目录下的:plugins中
接下来直接上代码:
pom中引入需要的架包:
- <!--RabbitMq-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
配置文件中新增RabbitMq连接信息
- spring:
- rabbitmq:
- host: 127.0.0.1
- port: 5672
- username: guest
- password: guest
- virtual-host: /
配置RabbitMq
- @Data
- @Configuration
- @ConfigurationProperties(prefix = "spring.rabbitmq")
- public class RabbitMqConfig {
- private String host;
- private int port;
- private String userName;
- private String password;
-
- @Bean
- public ConnectionFactory connectionFactory() {
- CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(host,port);
- cachingConnectionFactory.setUsername(userName);
- cachingConnectionFactory.setPassword(password);
- cachingConnectionFactory.setVirtualHost("/");
- cachingConnectionFactory.setPublisherConfirms(true);
- return cachingConnectionFactory;
- }
-
- @Bean
- public RabbitTemplate rabbitTemplate() {
- RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
- return rabbitTemplate;
- }
- }
- @Configuration
- public class QueueConfig {
- /**
- * 分派普通消息交换机
- */
- @Bean
- public TopicExchange topicExchange(){
- return new TopicExchange("ordinary_exchange",true,false);
- }
-
- @Bean
- public Queue queue() {
- Queue queue = new Queue("ordinary_queue", true);
- return queue;
- }
-
- @Bean
- public Binding binding() {
- return BindingBuilder.bind(queue()).to(topicExchange()).with("ordinary_queue");
- }
-
- /**
- * 分派延迟消息交换机
- */
- @Bean
- public CustomExchange delayExchange(){
- Map<String, Object> paramMap = new HashMap<String, Object>();
- paramMap.put("x-delayed-type","direct");
- return new CustomExchange("delay_exchange","x-delayed-message",true,false,paramMap);
- }
-
- @Bean
- public Queue delayQueue(){
- Queue delayQueue = new Queue("delay_queue", true);
- return delayQueue;
- }
-
- @Bean
- public Binding delayMessagebinding(){
- return BindingBuilder.bind(delayQueue()).to(delayExchange()).with("delay_queue").noargs();
- }
- }
- @Service
- @Slf4j
- public class MessageServiceImpl implements MessageService {
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @Override
- public void sendMsg(String queueName,String msg) {
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- System.out.println("消息发送时间:"+sdf.format(new Date()));
- rabbitTemplate.convertAndSend("ordinary_exchange", queueName, msg);
- }
-
- /**
- * 实现延迟消息
- * @param queueName
- * @param message
- */
- @Override
- public void sendDelayMessage(String queueName, String message) {
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- log.debug("消息发送时间:{}",sdf.format(new Date()));
- rabbitTemplate.convertAndSend("delay_exchange", queueName, message, new MessagePostProcessor() {
- @Override
- public Message postProcessMessage(Message message) throws AmqpException {
- message.getMessageProperties().setHeader("x-delay",3000);
- return message;
- }
- });
- }
- }
- @Component
- @Slf4j
- public class MessageReceiver {
- @RabbitListener(queues = "ordinary_queue")
- public void receive(String msg) {
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- log.debug("消息接收时间:{},接收到的消息:{}",sdf.format(new Date()),msg);
- }
-
- @RabbitListener(queues = "delay_queue")
- public void receiveDelayMessage(String message) {
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- log.debug("消息接收时间:{},接收到的消息:{}",sdf.format(new Date()),message);
- }
- }
- @RestController
- public class MessageController extends BaseController {
-
- @Autowired
- private MessageService messageService;
-
- @GetMapping("send")
- public ApiRest sendMessage(String queueName, String msg){
- messageService.sendMsg(queueName,msg);
- return this.success();
- }
-
- @GetMapping("delaySend")
- public ApiRest sendDelayMessage(String queueName,String message){
- messageService.sendDelayMessage(queueName, message);
- return this.success();
- }
- }
接下来测试就搞定延迟消费
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。