赞
踩
yum update
yum install epel-release -y
yum clean all
yum list
yum install docker-io -y
FROM rabbitmq:management
MAINTAINER LCJ
# 添加插件到指定目录 可按照此方式自行扩展其他插件
# ADD ./rabbitmq_delayed_message_exchange-3.11.1.ez /plugins
# 开启管理界面插件
RUN rabbitmq-plugins enable rabbitmq_management
# 开启延迟队列插件
#RUN rabbitmq-plugins enable rabbitmq_delayed_message_exchange ENTRYPOINT ["rabbitmq-server"]
version: '3.5'
services:
rabbitmq:
image: rabbitmq:3.11-alpine #镜像版本
hostname: rabbit_1
container_name: rabbitmq
restart: always
build:
context: .
ports:
- "15672:15672"
- "5672:5672"
volumes:
- ./data:/var/lib/rabbitmq
- ./log:/var/log/rabbitmq
environment:
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=123456
network_mode: "bridge"
chmod +x /root/rabbitmq/data
chmod +x /root/rabbitmq/log
<!--引入AMQP-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
server:
port: 8080
#消息队列
spring:
rabbitmq:
host: xx.xx.xx
virtual-host: /dev
password: 123456
username: admin
#开启消息二次确认,生产者到broker的交换机
publisher-confirm-type: correlated
#开启消息二次确认,交换机到队列的可靠性投递
publisher-returns: true
#为true,则交换机处理消息到路由失败,则会返回给生产者
template:
mandatory: true
#消息手工确认ACK
listener:
simple:
acknowledge-mode: manual
RabbitM@Config文件
@Configuration
public class RabbitMQConfig {
public static final String EXCHANGE_NAME = "order_exchange";
public static final String QUEUE_NAME = "order_queue";
/**
* 交换机
* @return
*/
@Bean
public Exchange orderExchange() {
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
//return new TopicExchange(EXCHANGE_NAME, true, false);
}
/**
* 队列
* @return
*/
@Bean
public Queue orderQueue() {
return QueueBuilder.durable(QUEUE_NAME).build();
//return new Queue(QUEUE_NAME, true, false, false, null);
}
/**
* 交换机和队列绑定关系
*/
@Bean
public Binding orderBinding(Queue queue, Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();
//return new Binding(QUEUE_NAME, Binding.DestinationType.QUEUE, EXCHANGE_NAME, "order.#", null);
}
}
生产者;
@SpringBootTest
class DemoApplicationTests {
@Autowired
private RabbitTemplate template;
@Test
void send() {
template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"order.new","新订单来啦1");
}
}
消费者
@Component
@RabbitListener(queues = "order_queue")
public class OrderMQListener {
/**
* RabbitHandler 会自动匹配 消息类型(消息自动确认)
* @param msg
* @param message
* @throws IOException
*/
@RabbitHandler
public void releaseCouponRecord(String msg, Message message) throws IOException {
long msgTag = message.getMessageProperties().getDeliveryTag();
System.out.println("msgTag="+msgTag);
System.out.println("message="+message.toString());
System.out.println("监听到消息:消息内容:"+message.getBody());
}
}
生产者-> 交换机 -> 队列 -> 消费者
通过两个节点控制消息可靠性投递
@Autowired
private RabbitTemplate template;
@Test
void testConfirmCallback() {
template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
*
* @param correlationData 配置
* @param ack 交换机是否收到消息,true是成功,false是失败
* @param cause 失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm=====>");
System.out.println("confirm==== ack="+ack);
System.out.println("confirm==== cause="+cause);
//根据ACK状态做对应的消息更新操作 TODO
}
});
template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME+,"order.new","新订单来啦1");
}
@Test
void testReturnCallback() {
//为true,则交换机处理消息到路由失败,则会返回给生产者
//开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息
template.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
int code = returned.getReplyCode();
System.out.println("code="+code);
System.out.println("returned="+returned.toString());
}
});
template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"xxx.order.new","新订单来啦11");
}
spring:
rabbitmq:
#开启手动确认消息,如果消息重新入队,进行重试
listener:
simple:
acknowledge-mode: manual
@RabbitHandler
public void releaseCouponRecord(String body, Message message, Channel channel) throws IOException {
long msgTag = message.getMessageProperties().getDeliveryTag();
System.out.println("msgTag="+msgTag);
System.out.println("message="+message.toString());
System.out.println("body="+body);
//成功确认,使用此回执方法后,消息会被 rabbitmq broker 删除
//channel.basicAck(msgTag,false);
//channel.basicNack(msgTag,false,true);
}
需求:商家新建账户检查是否在规定时间内商家商品,否则对该账户做出处理
RabbitMQConfig
package net.xdclass.xdclasssp.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* 小滴课堂,愿景:让技术不再难学
*
* 新商家审核通过->new_merchant_queue -> 死信消息交换机 -> 死信队列
*
* @Description
* @Author 二当家小D
* @Remark 有问题直接联系我,源码-笔记-技术交流群
* @Version 1.0
**/
@Configuration
public class RabbitMQConfig {
/**
* 死信队列
*/
public static final String LOCK_MERCHANT_DEAD_QUEUE = "lock_merchant_dead_queue";
/**
* 死信交换机
*/
public static final String LOCK_MERCHANT_DEAD_EXCHANGE = "lock_merchant_dead_exchange";
/**
* 进入死信队列的路由key
*/
public static final String LOCK_MERCHANT_ROUTING_KEY = "lock_merchant_routing_key";
/**
* 创建死信队列
* @return
*/
@Bean
public Queue lockMerchantDeadQueue(){
return QueueBuilder.durable(LOCK_MERCHANT_DEAD_QUEUE).build();
}
/**
* 绑定死信交换机和死信队列
* @return
*/
@Bean
public Binding lockMerchantBinding(){
return new Binding(LOCK_MERCHANT_DEAD_QUEUE,Binding.DestinationType.QUEUE,
LOCK_MERCHANT_DEAD_EXCHANGE,LOCK_MERCHANT_ROUTING_KEY,null);
}
/**
* 普通队列,绑定的个死信交换机
*/
public static final String NEW_MERCHANT_QUEUE = "new_merchant_queue";
/**
* 普通的topic交换机
*/
public static final String NEW_MERCHANT_EXCHANGE = "new_merchant_exchange";
/**
* 路由key
*/
public static final String NEW_MERCHANT_ROUTIING_KEY = "new_merchant_routing_key";
/**
* 创建普通交换机
* @return
*/
@Bean
public Exchange newMerchantExchange(){
return new TopicExchange(NEW_MERCHANT_EXCHANGE,true,false);
}
/**
* 创建普通队列
* @return
*/
@Bean
public Queue newMerchantQueue(){
Map<String,Object> args = new HashMap<>(3);
//消息过期后,进入到死信交换机
args.put("x-dead-letter-exchange",LOCK_MERCHANT_DEAD_EXCHANGE);
//消息过期后,进入到死信交换机的路由key
args.put("x-dead-letter-routing-key",LOCK_MERCHANT_ROUTING_KEY);
//过期时间,单位毫秒
args.put("x-message-ttl",10000);
return QueueBuilder.durable(NEW_MERCHANT_QUEUE).withArguments(args).build();
}
/**
* 绑定交换机和队列
* @return
*/
@Bean
public Binding newMerchantBinding(){
return new Binding(NEW_MERCHANT_QUEUE,Binding.DestinationType.QUEUE,
NEW_MERCHANT_EXCHANGE,NEW_MERCHANT_ROUTIING_KEY,null);
}
生产者
@GetMapping("check")
public Object check(){
//修改数据库的商家账号状态 TODO
rabbitTemplate.convertAndSend(RabbitMQConfig.NEW_MERCHANT_EXCHANGE,RabbitMQConfig.NEW_MERCHANT_ROUTIING_KEY,"商家账号通过审核");
Map<String,Object> map = new HashMap<>();
map.put("code",0);
map.put("msg","账号审核通过,请10秒内上传1个商品");
return map;
}
消费者
@Component
@RabbitListener(queues = "lock_merchant_dead_queue")
public class MerchantMQListener {
@RabbitHandler
public void messageHandler(String body, Message message, Channel channel) throws IOException {
long msgTag = message.getMessageProperties().getDeliveryTag();
System.out.println("msgTag="+msgTag);
System.out.println("body="+body);
//做复杂业务逻辑 TODO 商家业务
//告诉broker,消息已经被确认
channel.basicAck(msgTag,false);
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。