赞
踩
指定direct后, 消息会根据你设置的routeing key(路由键), 发送到对应的队列中
1,新建direct交换机
2,添加队列, 并且绑定路由键
3,测试发送消息, 只有emp队列可以收到
fanout, 广播, 不管你指定不指定路由键, 都会向他绑定的所有队列发送消息
1, 新建fannout交换机
2, 绑定路由键, 因为他是fanout的, 其实这里绑定路由键或者不绑定, 都一样
3, 测试发送消息
4, 可以看到虽然发送指定了路由键是emps, 但是所有的队列都收到了
1, 新建topic交换机, 绑定路由键
2, 发送消息
3, 查看接收情况, 发现全部可以接收到
5, 测试发送消息
6, 查看接收结果, 只有路由键是*.news对应的交换机可以收到
1, 引依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2, 开启
@EnableRabbit
3, 配置
spring:
rabbitmq:
host: ${version3Ip}
port: 5672
virtual-host: /
测试创建MQ组件
@Slf4j @SpringBootTest class GulimallOrderApplicationTests { @Autowired private AmqpAdmin amqpAdmin; //创建基础信息 @Test void createExchange() { //AmqpAdmin创建 /* name: 交换机名字, durable:是否持久化, autoDelete:是否自动删除, args:其他参数 DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) { */ DirectExchange directExchange = new DirectExchange("hello-exchange",true,false ); amqpAdmin.declareExchange(directExchange); log.info("交换机{},创建成功","hello-exchange"); } /** * 创建队列 */ @Test public void createuqeue(){ Queue queue = new Queue("hello-queue"); amqpAdmin.declareQueue(queue); log.info("队列创建成功!"); } /** * 将交换机和队列绑定, 指定路由键 */ @Test public void createBinding(){ /* Binding(String destination, Binding.DestinationType destinationType, String exchange, String routingKey, @Nullable Map<String, Object> arguments) { destination: 目的地 destinationType:目的地类型 exchange:交换机 routingKey: 路由键 args: 自定义参数 将exchange指定的交换机和destination目的地进行绑定, 使用routingKey作为指定的路由键 */ Binding binding = new Binding( "hello-queue" , Binding.DestinationType.QUEUE , "hello-exchange" , "hello.java" , null ); amqpAdmin.declareBinding(binding); log.info("绑定成功!"); } //发送消息 @Autowired private RabbitTemplate rabbitTemplate; /** * 发送消息 */ @Test public void sendMsg(){ OrderEntity orderEntity = new OrderEntity(); orderEntity.setBillContent("Java发送的消息内容!"); /** * 如果发送的消息是个对象, 我们会使用序列化机制, 将对象写出去, * 所以对象必须实现 Serializable, * 但是注意这样序列化之后, 在rabbitMQ管理界面,只能看到序列化后的数据, 不能直接查看数据 * 如下: rO0ABXNyAC1jb20uYXRndWlndS5ndWxpbWFsbC5vcmRlci5lbnRpdHkuT3JkZXJFbnRpdHkAAAAAAAAAAQIAKkwADmF1dG9Db25maXJtRGF5dAATTGphdmEv * 因此为了让界面直接可以看到, 可以新增一个配置, 改变rabbitMQ的默认转化方式 * Jackson2JsonMessageConverter * */ rabbitTemplate.convertAndSend("hello-exchange","hello.java",orderEntity); log.info("消息发送成功!"); } }
测试接收消息
/** * 接收消息: * 1, @RabbitListener * 标在类 + 方法上 * 2, @RabbitHandler * 标在方法上 * * @RabbitListener 这个注解必须在容器中才能生效, 比如@Service, @Component * queues:声明需要监听的所有队列 * * 接收到的消息到时候就会在形参里 * 可以写Object msg 也可以是 Message msg * 1, (Object msg) * 2, (Message msg) 这种方式是可以得到原生消息的详细信息, 消息头+ 消息体 * byte[] body = msg.getBody(); * //消息头属性信息 * MessageProperties messageProperties = msg.getMessageProperties(); * 3, (Message msg, OrderEntity content) * T<发送的消息类型 OrderEntity entity * 4,(Message msg, OrderEntity content, Channel channel) * channel:当前传输数据的通道, 而且一个客户端连接rabbitMQ, 只会创建一个通道, * * Queue: 可以很多人监听, 但是消息队列中的消息只能有一个接收到, 只要有一个人接收到, 消息就会被删除, 因此只能有一个人接收到 * 比如: 订单服务启动了多个(三个A,B,C),但是同一个消息只能让一个服务消费, 比如A接收到了, BC就接收不到 * */ @RabbitListener(queues = "hello-queue") public void recieveMessage(Message msg, OrderEntity content, Channel channel){ byte[] body = msg.getBody(); //消息头属性信息 MessageProperties messageProperties = msg.getMessageProperties(); log.info("接收到的消息是:{},类型是:{},内容是: {}",msg,msg.getClass(),content); }
1), @RabbitListener
标在类 + 方法上
2), @RabbitHandler
标在方法上
先看下雷丰阳老师讲的
根据图可以看到, 可靠投递分为两个角度,
1, comfirmCallback: 消息推送到整个broker的过程, 进行确认回调, 如果是单机的, 只要broker接收到消息就会执行回调, `如果是集群模式, 需要所有的broker接收到才会回调`
但是注意这一步, 只是说broker接收到了消息, 但是不保证可以正确的投递, 因此引入出了第2步
2,returnCallback: 在broker内部, exchange交换机根据routingKey路由键, 找到Queue队列的时候, 也是可能存在消息丢失, 比如路由键找不到, 这个时候就会回调, 因此也是需要加入确认回调
注意:
这个确认回调就类似于,ajax的提交, 成功回调success, 失败回调error, 而且这个是将生产者生产消息, 推送, 交换机寻找推送队列, 等都作为一个事务去处理的, 因此很消耗效率
保证每个消息被正确消费掉后, broker才可以删除这个消息
1, 自动签收, 改成手动签收
, 或者拒绝签收, 拒绝签收有设置可以让他, 重新入队, 也可以直接丢弃
, 就这一个好玩的, 其他没发现
生产端需要确认的地方有两个点, 一个是消息抵达broker的过程中
, 另一个是broker内部通过exchange交换机根据routingKey找到队列的过程中
, 因此生产端需要确认的点也有两个
, 而代码实现上, 第一个对应的是ConfirmCallback回调方法, 第二个对应的是ReturnCallback回调方法
需要注意的是
1, 发送消息的时候, 可以指定消息的id(CorrelationData),
这样后续consumer接收到消息,可以记录在库, 后续对账, 找出没接收到消息的id
/**
* 发送指定id, 这样就可以将生产者确认接收到的消息id都记录下来,
* 后续就可以遍历就可以找到没收到的消息是那些了
*/
@Test
public void sendMsg2(){
OrderEntity orderEntity = new OrderEntity();
orderEntity.setBillContent("Java发送的消息内容!");
rabbitTemplate.convertAndSend("hello-exchange","hello11.java111",orderEntity, new CorrelationData(UUID.randomUUID().toString()));
log.info("消息发送成功!");
}
2, 修改rabbitTemplate的配置
@Autowired RabbitTemplate rabitTemplate; /** * 生产者的可靠消息传递 * 1, 开启配置 * publisher-confirms: true #开启发送端确认 * 2, 修改rabbitTemplate的ConfirmCallback回调方法 --> 这个是消息到broker的过程 * 3, 开启配置 * publisher-returns: true # 开启发送端消息抵达队列确认 第二步, broker内交换机到队列的过程 * template: * mandatory: true #只要抵达队列, 以异步方式, 优先回调returnConfirm的方法 * 3, 修改rabbitTemplate的ReturnCallback回调方法 --> 这个是broker内, 交换机到队列的过程 * * */ @PostConstruct //当前类对象创建完成后, 执行这个方法 public void initTemplate(){ //注意这个方法是, 只要消息抵达到了broker, ack = true, 服务器收到消息就回调 rabitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * @param correlationData 当前消息的唯一关联数据, CorrelationData.id , 可以直接理解为这个消息的整体的唯一id * @param ack 消息是否成功收到 * @param cause 失败的原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("消息生产确认!!!!correlationData:{},ack:{},cause:{} ",correlationData,ack,cause); } }); //设置消息抵达队列的确认回调, 就是broker内, 交换机到队列的过程 rabitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { //只要消息没有投递给指定的队列, 就会触发, 这个类似于一个失败回调 /** * @param message 投递失败的消息, 的详细信息 * @param replyCode 回复的状态码 * @param replyText 回复的文本内容 * @param exchange 当时这个消息发送给那个交换机 * @param routingKey 当时这个消息用的哪个路由键 */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.error("交换机内部投递失败, message:{},replayCode:{},replayText:{}, routingKey:{}",message,replyCode,replyText,exchange,routingKey); } }); }
没啥好说的
实现方式一, 给交换机exchange设置过期参数, 本交换机都是30分钟过期
生产者, 推送路由键给exchange1, exchange1接收到以后, 在里边保存30分钟后, exchange1将过期30分钟的消息推送给exchange2, 然后由消费者监听exchange2, 这样, exchange2里边就都是过期了30分钟的数据了;
简单说就是:
推送正常推送, 只不过以前是推送给A, 然后消费者就监听A,但是这里是为了做延时, 就有点不一样; 推送正常推, 推送给A, 但是A需要设置一些交换机参数, 让A将消息保存三十分钟后丢弃, 丢弃的时候, 就丢弃给B, 然后B接收到的消息, 就是过了30分钟后的消息, 然后消费者, 去监听B的消息;
也就是: 生产者推送消息给A, A将消息攥在手里30分钟后, 丢给B, 用户去监听消费B的消息;
实现方式二: 给推送的消息设置30分钟过期时间, 交换机设置将过期消息(死信)推送给B交换机
简单说: 类似于1, 只是说1是给交换机设置全部的过期时间, 这里是给推送的消息设置过期时间
总结:
其实两种方式, 目的都是, 先将推送的消息, 都变成死信
,然后将死信推送给新的交换机
, 然后监听这个新的交换机就好了;
主要参考雷神讲解, 链接:
https://www.bilibili.com/video/BV1np4y1C7Yf?p=292&vd_source=2f702df7504ba1aeb00e78f5a3c23047
主要是创建了两个队列, 一个交换机
然后进行了两次绑定
package com.atguigu.gulimall.order.config; import com.atguigu.gulimall.order.entity.OrderEntity; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.io.IOException; import java.util.HashMap; import java.util.Map; /** * 如果rabbitMQ里没有交换机和队列啥的, 系统运行就会自动创建, 如果MQ里已经有了, 就不会创建, 使用MQ里边的 * * 流程就是: * 1, * 生产者生产 路由键为: order.create.order 的消息, 根据这个消息, 找到对应的交换机 order-event-exchange * 参考orderCreateOrderBinging()方法, 这个交换机内部, 将order.create.order 和 order.delay.queue 绑定 * 相当于又转发到了, order.delay.queue 这个路由键, * 通过order.delay.queue 这个路由键, 找到对应的交换机, 还是order-event-exchange 参考orderCreateOrderBinging() * * 2, * 参考orderDelayQueue()方法, 上述过程已经到了order.delay.queue 这个路由键 * 可以看到这个方法给这个路由键设置了, ttl等参数, 而且对应的交换机是order-event-exchange * 重要的是, 指定了另外一个路由的路由键 order.release.order * * 3, * 等到1分钟一过, order.release.order 就会接收到新的消息, 也就是过了一分钟的死信消息, 参考orderReleaseOrderBinging()方法 * 这个方法会给order.release.order.queue队列推送一个路由键为 order.release.order的消息, * * 4, * 然后通过监听order.release.order.queue的队列, 接收到, 已经超时一分钟的死信消息了 * * 5, * 也就是: * Producer * 生产--> 路由键为order.create.order 的消息 * --> 到order-event-exchange交换机 * --> 将这个消息变成时效性为1分钟的消息 * -->推送给 路由键order.delay.queue的消息, delay的消息是具有时效性的, 这里相当于把没有时效的消息, 转换成了有时效的消息, 就是create到delay的过度 * --> delay消息时效一过, 还是经过order-event-exchange交换机, 推送一个路由键为order.release.order的消息 * --> 把order.release.order的消息推送给order.release.order.queue队列 * --> 监听order.release.order.queue队列的消费者进行消费处理 * 总结说就是, * 生产者生create的普通消息, exchange将其变成具有时效性的delay消息 * delay时效一过, 把时效性的消息变成普通消息, release消息,然后推送给order.release.order.queue队列 * 消费者监听release队列, 进行消费 * */ @Configuration public class MyMQConfig { /* @Bean 容器中的Binding, Queue, Exchange都会自动创建, 前提是RabbitMQ没有的情况下 如果RabbitMQ里已经存在了, @Bean新设置的属性, 不会覆盖, 还是使用rabbitMQ原来有的 */ /** * 创建死信队列 * 创建延时队列, * 其实就是配置消息什么时候变成死信, 死信之后, 交给那个交换机, 交给那个路由键 * @return */ @Bean public Queue orderDelayQueue(){ /** * 设置消息, 1分钟过期, */ Map<String,Object> args = new HashMap<>(); args.put("x-dead-letter-exchange", "order-event-exchange"); //指定死信路由 args.put("x-dead-letter-routing-key", "order.release.order"); //指定死信路由键 args.put("x-message-ttl", 60000); // 消息过期时间 1分钟 Queue queue = new Queue("order.delay.queue", true, false, false,args); return queue; } @Bean public Queue orderReleaseQueue(){ Map<String,Object> args = new HashMap<>(); Queue queue = new Queue("order.release.order.queue", true, false, false,args); return queue; } @Bean public Exchange orderEventExchange(){ return new TopicExchange("order-event-exchange",true,false); } /** * 设置队列和交换机的绑定关系 */ @Bean public Binding orderCreateOrderBinging(){ return new Binding( "order.delay.queue", Binding.DestinationType.QUEUE, "order-event-exchange" ,"order.create.order" ,null ); } @Bean public Binding orderReleaseOrderBinging(){ return new Binding( "order.release.order.queue", Binding.DestinationType.QUEUE, "order-event-exchange" ,"order.release.order" ,null ); } @RabbitListener(queues = "order.release.order.queue") public void listener(OrderEntity order, Channel channel, Message msg) throws IOException { System.out.println("收到过期的订单信息:"+order.getOrderSn()+"时间是: "+order.getModifyTime()); channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false); //手动签收 } }
@Autowired RabbitTemplate rabbitTemplate; @ResponseBody @GetMapping("/test/createOrder") public String createOrderTest(){ //模拟订单下单成功 OrderEntity order = new OrderEntity(); order.setOrderSn(UUID.randomUUID().toString()); order.setModifyTime(new Date()); //给MQ发送消息 rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",order); return "ok"; }
主要就是第一步的最后一个方法
@RabbitListener(queues = "order.release.order.queue")
public void listener(OrderEntity order, Channel channel, Message msg) throws IOException {
System.out.println("收到过期的订单信息:"+order.getOrderSn()+"时间是: "+order.getModifyTime());
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false); //手动签收
}
如果rabbitMQ里没有交换机和队列啥的, 系统运行就会自动创建, 如果MQ里已经有了, 就不会创建, 使用MQ里边的
流程就是:
1,生产者生产 路由键为: order.create.order 的消息, 根据这个消息, 找到对应的交换机 order-event-exchange
参考orderCreateOrderBinging()方法, 这个交换机内部, 将order.create.order 和 order.delay.queue 绑定
相当于又转发到了, order.delay.queue 这个路由键,
通过order.delay.queue 这个路由键, 找到对应的交换机, 还是order-event-exchange 参考orderCreateOrderBinging()
2,参考orderDelayQueue()方法, 上述过程已经到了order.delay.queue 这个路由键
可以看到这个方法给这个路由键设置了, ttl等参数, 而且对应的交换机是order-event-exchange
重要的是, 指定了另外一个路由的路由键 order.release.order
3, 等到1分钟一过, order.release.order 就会接收到新的消息, 也就是过了一分钟的死信消息, 参考orderReleaseOrderBinging()方法
这个方法会给order.release.order.queue队列推送一个路由键为 order.release.order的消息,
4,然后通过监听order.release.order.queue的队列, 接收到, 已经超时一分钟的死信消息了
5,
也就是:
生产--> 路由键为order.create.order 的消息
--> 到order-event-exchange交换机
--> 将这个消息变成时效性为1分钟的消息
-->推送给 路由键order.delay.queue的消息, delay的消息是具有时效性的, 这里相当于把没有时效的消息, 转换成了有时效的消息, 就是create到delay的过度
--> delay消息时效一过, 还是经过order-event-exchange交换机, 推送一个路由键为order.release.order的消息
--> 把order.release.order的消息推送给order.release.order.queue队列
--> 监听order.release.order.queue队列的消费者进行消费处理
总结说就是,
1,生产者生create的普通消息, exchange将其变成具有时效性的delay消息
2,delay时效一过, 把时效性的消息变成普通消息, release消息,然后推送给order.release.order.queue队列
3,消费者监听release队列, 进行消费
想要保证消息的可靠投递, 以及延时投递, 都得结合任务场景去做, 而且不能单方面的使用延时投递, 就保证万无一失了; 因为有时候由于网络, 宕机, 导致你延时投递的消息, 和解锁的消息, 并不是顺序执行, 比如解锁的消息先执行了, 这条消息已经消费了, 结果你延时投递的消息才刚到, 这样子, 这笔消息就再也没办法解锁了
// try-catch保证消息必然发送出去
try{
rabbitTemplate.convertAndSend("order-event-exchange","order.release.other",orderTo);
}catch (Exception e){
//TODO 设法将消息成功发出去给MQ
/*
可以将失败的消息记录在日志, 然后定时扫描, 重新发送消息给MQ
*/
}
重复原因:
解决方式:
//redelivered 可以判断出, 当前消息是否被第二次及以后重新派发过来的
Boolean redelivered = message.getMessageProperties().getRedelivered();
解决方式:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。