赞
踩
1.控制台创建队列simple.queue
2.利用SpringAMQP直接像simple.queue发送消息
3.利用SpringAMQP编写消费者,监听simple.queue队列
多个消费者绑定同一个队列,可以加快消息处理速度
同一条消息只会被一个消费者处理、
通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一条,实现能者多劳
接收publisher发送的消息
将消息按照规则路由到与之绑定的队列
fanoutExchange会将消息路由到每个绑定的队列
发送消息到交换机的API
// 发送消息到交换机,参数分别是:交换机名称,RoutingKey,消息
rabbitTemplate.convertAndSend(exchangeName,"",message)
//发送消息到队列,参数为:队列名称。消息
rabbitTemplate.convertAndSend(queue,message)
Direct Exchange会将接收到的消息根据规则路由到指定的Queue,因此被称为定向路由
每一个Queue都与Exchange设置一个BindingKey
发布者发送消息时,指定消息的RoutingKey
Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
Fanout交换机将消息路由给每一个与之绑定的队列
Direct交换机根据RoutingKey判断路由给哪个队列
如果多个队列具有相同的RoutingKey,则与Fanout功能类似
TopicExchange也是基于RoutingKey做消息路由,但是routingKey通常是多个单词的组合,并且以“.”分割。
Queue与Exchange指定BindingKey时可以使用通配符
“#”:代指0个或多个单词
“*”:代指一个单词
Queue:用于声明队列,可以用工厂类QueueBulider构建
Exchange:用于声明交换机,可以用工厂类ExchangeBulider构建
Binding:用于声明队列和交换机的关系,可以用工厂类BindingBulider构建
代码演示:
@Configuration public class FanoutConfiguration { //声明交换机 @Bean public FanoutExchange fanoutExchange() { //return new FanoutExchange("hmall.fanout"); return ExchangeBuilder.fanoutExchange("hmall.fanout").build(); } //声明第一个队列 @Bean public Queue fanoutQueue1() { //return new FanoutExchange("fanout.queue1"); return QueueBuilder.durable("fanout.queue1").build(); } //绑定第一个队列到交换机 @Bean public Binding fanoutQueue1Binding(Queue fanoutQueue1, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); } //声明第二个队列 @Bean public Queue fanoutQueue2() { //return new FanoutExchange("fanout.queue2"); return QueueBuilder.durable("fanout.queue2").build(); } //绑定第二个队列到交换机 @Bean public Binding fanoutQueue2Binding(Queue fanoutQueue2, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); } }
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2", durable = "true"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg) {
log.info("消费者2监听到direct2.queue消息:【{}】", msg);
}
使用JSON序列化代替默认的JDK序列化
1.引入jackson依赖
<!--Jackson-->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
</dependency>
2.配置消息转换器
@Bean
public MessageConverter jacksonMessageConvertor() {
return new Jackson2JsonMessageConverter();
}
1.导入mq依赖
<!-- AMQP-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.配置RabbitMQ
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
virtual-host: /hmall
username: hmall
password: 123
3.配置消息转换器
@Configuration
public class MqConfig {
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
4.在spring.factories配置扫描路径
com.hmall.common.config.MqConfig,\
5.编写消费者,声明队列、交换机以及key
@Component
@RequiredArgsConstructor
public class PayStatusListener {
private final IOrderService orderService;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "trade.pay.success.queue", durable = "true"),
exchange = @Exchange(name = "pay.direct"),
key = "pay.success"
))
public void listenerPaySuccess(Long orderId) {
orderService.markOrderPaySuccess(orderId);
}
}
6.发送消息
1.引入RabbltTemplate
private final RabbitTemplate rabbitTemplate; //构造方法注入
@Autowired
private RabbitTemplate rabbitTemplate; //注解注入
2.发送消息
try {
rabbitTemplate.convertAndSend("pay.direct", "pay.success", po.getPayOrderNo());
} catch (Exception e) {
log.error("发送支付状态通知失败,订单id:【{}】",po.getPayOrderNo(),e);
}
spring:
rabbitmq:
host: 127.0.0.1 #IP地址
port: 5672 #端口
virtual-host: /hmall
username: hmall
password: 123
connection-timeout: 1s # 连接超时时间
template:
retry:
multiplier: 2 #失败后下次等待时长倍数
max-attempts: 3 # 最大重试次数
initial-interval: 1s # 第一次重试时间
enabled: true # 是否开启重试
注意:
当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试(同步),也就是说多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能
如果对业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,也可以考虑使用异步线程来执行发送消息的代码
SpringAMQP提供了Publisher Confirm和Publisher Return两种确认机制。开启机制后,当发送者发送消息给MQ后,MQ会返回确认结果给发送者。返回的结果有一下几种情况
操作步骤
1.添加配置
spring:
rabbitmq:
publisher-confirm-type:correlated #开启publisher confirm机制,并设置confirm类型
publisher-returns: true # 开启publisher-return机制
配置说明:
2.添加ReturnCallback
每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置
@Configuration @Slf4j @AllArgsConstructor public class MqConfig { private final RabbitTemplate rabbitTemplate; @PostConstruct public void init(){ rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returned) { log.error("触发return Callback"); log.debug("exchange:{}", returned.getExchange()); log.debug("routingKey:{}", returned.getRoutingKey()); log.debug("replyCode:{}", returned.getReplyCode()); log.debug("replyText:{}", returned.getReplyText()); log.debug("message:{}", returned.getMessage()); } }); } }
3.发送消息,指定消息ID、消息ConfirmCallback
@Test void testPublisherConfirm() throws InterruptedException{ //1.创建CorrelationData CorrelationData cd = new CorrelationData(); //2.给Future添加ConfirmCallback cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() { @Override public void onFailure(Throwable ex) { //2.1 Future发生异常时的处理逻辑,基本不会触发 log.error("handle message ack fail",ex); } @Override public void onSuccess(CorrelationData.Confirm result) { //2.2 Future接收到回执的处理逻辑,参数中的result就是回执内容 if(result.isAck()){ log.debug("发送消息成功,收到ack"); }else{ log.error("发送消息失败,收到nack,reason:{}",result.getReason()); } } }); //3.发送消息 rabbitTemplate.convertAndSend("hmall.direct","red1","hello",cd); }
在默认情况下,RabbitMQ将接收到的消息保存在内存中以降低收发消息的延迟。这样会导致两个问题
RabbitMQ实现数据持久化包括的三个方面
从RabbitMQ的3.6.0版本开始,就增加了Lazy Queue的概念,也就是惰性队列
惰性队列有一下特征:
在3.12版本后,所有队列都是Lazy Queue模式,无法更改
要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可
控制台添加
点击 Lazy mode 后,会在Arguments添加一条数据,这时表示当前队列为Layz Queue队列
2.Java代码添加
声明Bean
@Bean
public Queue layzQueue() {
return QueueBuilder
.durable("lazy.queue")
.lazy() //开启Lazy模式
.build();
}
注解
//注解声明LazyQueue
@RabbitListener(queuesToDeclare = @Queue(
name = "lazy.queue",
durable = "true",
//队列类型
arguments = @Argument(name = "x-queue-mode",value = "lazy")
))
public void ListenLazyQueue(String msg){
log.info("接收到lazy.queue的消息:{}",msg);
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。