赞
踩
MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法。
为什么使用MQ
开发中消息队列通常有如下应用场景:
AMQP高级消息队列协议,是一个进程间传递异步消息的网络协议,更准确的说是一种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。跨平台
JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
JMS规定了两种消息模式;而AMQP的消息模式更加丰富
JMS
①订阅模式
②点对点消息模式
Kafka 大数据领域使用较多
Apache下的一个子项目,使用scala实现的一个高性能分布式Publish/Subscribe消息队列系统。
**RocketMQ ** 原阿里产品后贡献给Apache
RocketMQ的前身是Metaq,当Metaq3.0发布时,产品名称改为RocketMQ。RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点 :
RabbitMQ 吞吐量不是特别高,网络延迟要比其他的好非常的低,使用Erlang编写使用可能比较麻烦些
RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。
RabbitMQ官方地址:http://www.rabbitmq.com/
RabbitMQ提供了6种模式:
官网对应模式介绍:https://www.rabbitmq.com/getstarted.html
安装后页面使用介绍:
用户角色:
RabbitMQ在安装好后,可以访问http://localhost:15672 ;其自带了guest/guest的用户名和密码;如果需要创建自定义用户;那么也可以登录管理界面后
角色说明**:
1、 超级管理员(administrator)
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
2、 监控者(monitoring)
可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
3、 策略制定者(policymaker)
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
4、 普通管理者(management)
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
5、 其他
无法登陆管理控制台,通常就是普通的生产者和消费者。
Virtual Hosts配置
像mysql拥有数据库的概念并且可以指定用户对库和表等操作的权限。RabbitMQ也有类似的权限管理;在RabbitMQ中可以虚拟消息服务器Virtual Host,每个Virtual Hosts相当于一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通。 相当于mysql的db。Virtual Name一般以/开头。
(1)创建Virtual Hosts
(2)设置Virtual Hosts权限
参数说明:
user:用户名
configure :一个正则表达式,用户对符合该正则表达式的所有资源拥有 configure 操作的权限
write:一个正则表达式,用户对符合该正则表达式的所有资源拥有 write 操作的权限
read:一个正则表达式,用户对符合该正则表达式的所有资源拥有 read 操作的权限
生产者
public class Producer { /*** * 消息生产者 * @param args * @throws IOException * @throws TimeoutException */ public static void main(String[] args) throws IOException, TimeoutException { //创建链接工厂对象 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置RabbitMQ服务主机地址,默认localhost connectionFactory.setHost("localhost"); //设置RabbitMQ服务端口,默认5672 connectionFactory.setPort(5672); //设置虚拟主机名字,默认/ connectionFactory.setVirtualHost("/rabbit"); //设置用户连接名,默认guest connectionFactory.setUsername("admin"); //设置链接密码,默认guest connectionFactory.setPassword("admin"); //创建链接 Connection connection = connectionFactory.newConnection(); //创建频道 Channel channel = connection.createChannel(); /** * 声明队列 * 参数1:队列名称 * 参数2:是否定义持久化队列 * 参数3:是否独占本次连接 * 参数4:是否在不使用的时候自动删除队列 * 参数5:队列其它参数 * **/ channel.queueDeclare("simple_queue",true,false,false,null); //创建消息 String message = "hello!welcome hello word!"; /** * 消息发送 * 参数1:交换机名称,如果没有指定则使用默认Default Exchage * 参数2:路由key,简单模式可以传递队列名称 * 参数3:消息其它属性 * 参数4:消息内容 */ channel.basicPublish("","simple_queue",null,message.getBytes()); //关闭资源 channel.close(); connection.close(); } }
消费者
public class Consumer { /*** * 消息消费者 * @param args * @throws IOException * @throws TimeoutException */ public static void main(String[] args) throws IOException, TimeoutException { //创建链接工厂对象 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置RabbitMQ服务主机地址,默认localhost connectionFactory.setHost("localhost"); //设置RabbitMQ服务端口,默认5672 connectionFactory.setPort(5672); //设置虚拟主机名字,默认/ connectionFactory.setVirtualHost("/rabbit"); //设置用户连接名,默认guest connectionFactory.setUsername("admin"); //设置链接密码,默认guest connectionFactory.setPassword("admin"); //创建链接 Connection connection = connectionFactory.newConnection(); //创建频道 Channel channel = connection.createChannel(); //创建队列 channel.queueDeclare("simple_queue",true,false,false,null); //创建消费者,并设置消息处理 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ /*** * @param consumerTag 消息者标签,在channel.basicConsume时候可以指定 * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送) * @param properties 属性信息 * @param body 消息 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //路由的key String routingKey = envelope.getRoutingKey(); //获取交换机信息 String exchange = envelope.getExchange(); //获取消息ID long deliveryTag = envelope.getDeliveryTag(); //获取消息信息 String message = new String(body,"UTF-8"); System.out.println("routingKey:"+routingKey+",exchange:"+exchange+",deliveryTag:"+deliveryTag+",message:"+message); } }; /** * 消息监听 * 参数1:队列名称 * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认 * 参数3:消息接收到后回调 */ channel.basicConsume("simple_queue",true,defaultConsumer); //关闭资源(不建议关闭,建议一直监听消息) //channel.close(); //connection.close(); } }
Work Queues与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
发布订阅模式:
1.每个消费者监听自己的队列。
2.生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息
生产者需要注意如下3点
1.声明交换机
2.声明队列
3.队列需要绑定指定的交换机
交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。
发布订阅模式与work队列模式的区别
Routing模式要求队列在绑定交换机时要指定routing key,消息会转发到符合routing key的队列。
图解:
P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
C1:消费者,其所在队列指定了需要routing key 为 error 的消息
C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
路由模式特点:
绑定Routing key的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: `item.insert
通配符规则:
#
:匹配一个或多个词
*
:匹配不多不少恰好1个词
举例:
item.#`:能够匹配`item.insert.abc` 或者 `item.insert
item.*`:只能匹配`item.insert
Topic主题模式可以实现 Publish/Subscribe发布与订阅模式 和 Routing路由模式 的功能;只是Topic在配置routing key 的时候可以使用通配符,显得更加灵活。
远程过程调用
请求—回复
发布者确认是RabbitMQ扩展,可以实现可靠的发布。在通道上启用发布者确认后,代理将异步确认客户端发布的消息,这意味着它们已在服务器端处理
与发布者进行可靠的发布确认
RabbitMQ工作模式:
1、简单模式 HelloWorld
一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)
2、工作队列模式 Work Queue
一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)
3、发布订阅模式 Publish/subscribe
需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列
4、路由模式 Routing
需要设置类型为direct的交换机,交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
5、通配符模式 Topic
需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
在spring boot项目中只需要引入对应的amqp启动器依赖即可,方便的使用RabbitTemplate发送消息,使用注解接收消息。
生产者:
1.application.yml中配置
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /rabbitmq
username: admin
password: admin
2.绑定交换机和队列
@SpringBootApplication public class ProducerApplication { public static void main(String[] args) { SpringApplication.run(ProducerApplication.class,args); } /*** * 声明交换机 */ @Bean(name = "itemTopicExchange") public Exchange topicExchange(){ return ExchangeBuilder.topicExchange("item_topic_exchange").durable(true).build(); } /*** * 声明队列 */ @Bean(name = "itemQueue") public Queue itemQueue(){ return QueueBuilder.durable("item_queue").build(); } /*** * 队列绑定到交换机上 */ @Bean public Binding itemQueueExchange(@Qualifier("itemQueue")Queue queue, @Qualifier("itemTopicExchange")Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("item.#").noargs(); } }
消费者:
1.yml中配置RabbitMQ
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /rabbit
username: admin
password: admin
2.创建消息监听处理类
@Component //交给spring容器管理 public class MessageListener { //用于发送MQ消息 @Autowired private RabbitTemplate rabbitTemplate; /** * 监听某个队列的消息 * @param message 接收到的消息 */ @RabbitListener(queues = "item_queue") public void myListener1(String message){ System.out.println("消费者接收到的消息为:" + message); } }
测试,可注入RabbitTemplate发送消息
@RunWith(SpringRunner.class) @SpringBootTest public class RabbitMQTest { //用于发送MQ消息 @Autowired private RabbitTemplate rabbitTemplate; /*** * 消息生产测试 */ @Test public void testCreateMessage(){ rabbitTemplate.convertAndSend("item_topic_exchange", "item.insert", "商品新增,routing key 为item.insert"); rabbitTemplate.convertAndSend("item_topic_exchange", "item.update", "商品修改,routing key 为item.update"); rabbitTemplate.convertAndSend("item_topic_exchange", "item.delete", "商品删除,routing key 为item.delete"); } }
MQ投递消息的流程:
生产者将消息发送至队列会出现的问题
生产者发送到交换机时出错
交换机转发消息给队列时出错
returnCallback模式
1.创建配置交换机队列和绑定
@SpringBootApplication public class RabbitmqDemo01Application { public static void main(String[] args) { SpringApplication.run(RabbitmqDemo01Application.class,args); } //创建队列 @Bean public Queue createqueue(){ return new Queue("queue_demo01"); } //创建交换机 @Bean public DirectExchange createExchange(){ return new DirectExchange("exchange_direct_demo01"); } //创建绑定 @Bean public Binding createBinding(){ return BindingBuilder.bind(createqueue()).to(createExchange()).with("item.insert"); } }
2.application.yml中开启confirm机制
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
# 开启confirm机制
publisher-confirms: true
server:
port: 8080
3.创建回调函数,创建一个类实现ConfirmCallback接口,重写方法
@Component public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback { /** * * @param correlationData 消息信息 * @param ack 确认标识:true,MQ服务器exchange表示已经确认收到消息 false 表示没有收到消息 * @param cause 如果没有收到消息,则指定为MQ服务器exchange消息没有收到的原因,如果已经收到则指定为null */ @Override public void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause) { if(ack){ System.out.println("发送消息到交换机成功,"+cause); }else{ System.out.println("发送消息到交换s机失败,原因是:"+cause); } } }
4.使用回调函数
@RestController @RequestMapping("/test") public class TestController { @Autowired private RabbitTemplate rabbitTemplate; //注入回调方法 @Autowired private RabbitTemplate.ConfirmCallback myConfirmCallback; /** * 发送消息 * * @return */ @RequestMapping("/send1") public String send1() { //设置回调函数 rabbitTemplate.setConfirmCallback(myConfirmCallback); //发送消息 rabbitTemplate.convertAndSend("exchange_direct_demo01", "item.insert", "hello insert"); return "ok"; } }
1.配置yml开启returncallback
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
# 开启confirm机制
publisher-confirms: true
# 开启return机制
publisher-returns: true
server:
port: 8080
2.创建一个类实现ReturnCallback方法
@Component public class MyReturnCallBack implements RabbitTemplate.ReturnCallback { /** * * @param message 消息信息 * @param replyCode 退回的状态码 * @param replyText 退回的信息 * @param exchange 交换机 * @param routingKey 路由key */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("退回的消息是:"+new String(message.getBody())); System.out.println("退回的replyCode是:"+replyCode); System.out.println("退回的replyText是:"+replyText); System.out.println("退回的exchange是:"+exchange); System.out.println("退回的routingKey是:"+routingKey); } }
3.注入调用回调函数
@Autowired
private RabbitTemplate.ReturnCallback myReturnCallback;
@RequestMapping("/send2")
public String send2() {
//设置return模式
rabbitTemplate.setReturnCallback(myReturnCallback);
//发送消息
rabbitTemplate.convertAndSend("exchange_direct_demo01", "item.insert1234", "hello insert");
return "ok";
}
confirm模式用于在消息发送到交换机时机使用
return模式用于在消息被交换机路由到队列中发送错误时使用
还有一种事务机制可以保证消息传递的可靠性,但是性能很差,是同步阻塞的,confirm模式是异步的方式
可两种模式结合使用:
@Autowired
private RabbitTemplate.ReturnCallback myReturnCallback;
@RequestMapping("/send2")
public String send2() {
//设置return模式
rabbitTemplate.setReturnCallback(myReturnCallback);
//设置confirm模式
rabbitTemplate.setConfirmCallback(myConfirmCallback);
//发送消息
rabbitTemplate.convertAndSend("exchange_direct_demo01", "item.insert1234", "hello insert");
return "ok";
}
@RabbitListener 和 @RabbitHandler 搭配使用
ACK代码实现:
1.创建普通消息监听器
@Component
@RabbitListener(queues = "queue_demo01")
public class MyRabbitListener {
@RabbitHandler
public void msg(String message) {
System.out.println("消费Duang接收消息:" + message);
}
}
2.设置yml设置为手动确认模式
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest # 开启confirm机制 publisher-confirms: true # 开启return机制 publisher-returns: true #手动确认方式 listener: simple: acknowledge-mode: manual #设置监听端消息ACK确认模式为手动模式 server: port: 8080
ack确认方式有几种:
以上可以根据不同的业务进行不同的选择。需要注意的是,如果拒绝签收,下一次启动又会自动的进行消费。形成死循环。
不要让他重回队列会造成死循环,可以进入死信队列
第一种:手动签收
channel.basicAck()
第二种:拒绝签收 批量处理
channel.basicNack()
第三种:拒绝签收 不批量处理
channel.basicReject()
@RabbitHandler public void msg(Message message, Channel channel, String msg) { //接收消息 System.out.println("消费Duang接收消息:" + msg); try { //处理本地业务 System.out.println("处理本地业务开始======start======"); Thread.sleep(2000); int i=1/0; System.out.println("处理本地业务结束======end======"); //手动签收消息 参数一:指定消息的序号 参数二:是否批量处理 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { e.printStackTrace(); //如果出现异常,则拒绝消息 可以重回队列 也可以丢弃 可以根据业务场景来 try { //不要让他重回队列会造成死循环,可以进入死信队列 //拒绝签收 批量处理 参数一:指定消息的序号 参数二:是否批量处理 参数三:是否需要重回队列 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); //拒绝签收 不批量处理 参数一:指定消息的序号 参数二:是否需要重回队列 //channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); } catch (Exception e1) { e1.printStackTrace(); } } }
流程:
如果并发量大的情况下,生产方不停的发送消息,可能处理不了那么多消息,此时消息在队列中堆积很多,当消费端启动,瞬间就会涌入很多消息,消费端有可能瞬间垮掉,这时我们可以在消费端进行限流操作,每秒钟放行多少个消息。这样就可以进行并发量的控制,减轻系统的负载,提供系统的可用性,这种效果往往可以在秒杀和抢购中进行使用。
配置:
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest # 开启confirm机制 publisher-confirms: true # 开启return机制 publisher-returns: true #手动确认方式 listener: simple: acknowledge-mode: manual #设置监听端消息ACK确认模式为手动模式 prefetch: 1 #设置每个消费端可以处理未确认的消息最大数量 默认250个 server: port: 8080
TTL 全称 Time To Live(存活时间/过期时间)。当消息到达存活时间后,还没有被消费,会被自动清除。
RabbitMQ设置过期时间有两种:
针对某一个队列设置过期时间 ;队列中的所有消息在过期时间到之后,如果没有被消费则被全部清除
@Configuration public class TtlConfig { //创建过期队列 @Bean public Queue createqueuettl1(){ //设置队列过期时间为10000 10S钟 return QueueBuilder.durable("queue_demo02").withArgument("x-message-ttl",10000).build(); } //创建交换机 @Bean public DirectExchange createExchangettl(){ return new DirectExchange("exchange_direct_demo02"); } //创建绑定 @Bean public Binding createBindingttl(){ return BindingBuilder.bind(createqueuettl1()).to(createExchangettl()).with("item.ttl"); } } //测试 /** * 发送 ttl测试相关的消息 * @return */ @RequestMapping("/send4") public String send4() { //设置回调函数 //发送消息 rabbitTemplate.convertAndSend("exchange_direct_demo02", "item.ttl", "hello ttl哈哈"); return "ok"; } } //过10S钟之后,该数据就都被清0
针对某一个特定的消息设置过期时间;队列中的消息设置过期时间之后,如果这个消息没有被消费则被清除。
//通过MessagePostProcessor方法设置该消息的过期时间 对单独的一条信息设置过期时间 过期后一般让他进入死信队列
rabbitTemplate.convertAndSend("queue_order_queue1", (Object) "哈哈我要检查你是否有支付", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("10000");//设置该消息的过期时间
return message;
}
});
需要注意一点的是:针对某一个特定的消息设置过期时间时,一定是消息在队列中在队头的时候进行计算,如果某一个消息A 设置过期时间5秒,消息B在队头,消息B没有设置过期时间,B此时过了已经5秒钟了还没被消费。注意,此时A消息并不会被删除,因为它并没有再队头。
一般在工作当中,单独使用TTL的情况较少。一般配合延时队列使用。
死信队列:当消息成为Dead Letter后,可以被重新发送到另一个交换机,这个交换机就是Dead Letter Exchange(死信交换机 简写:DLX)。
成为死信的三种条件:
死信队列处理过程:
DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。
当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。
可以监听这个队列中的消息做相应的处理。
死信队列的设置
1.创建queue1 正常队列 用于接收死信队列过期之后转发过来的消息
2.创建queue2 可以针对他进行参数设置 死信队列
3.创建交换机 死信交换机
4.绑定正常队列到交换机
(1)创建配置类用于配置死信队列 死信交换机 死信路由 和正常队列
package com.rabbit.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class DlxConfig { //正常的队列 接收死信队列转移过来的消息 @Bean public Queue createqueuetdlq(){ return QueueBuilder.durable("queue_demo03").build(); } //死信队列 --->生产者将消息发送到这里 @Bean public Queue createqueuetdelq2(){ return QueueBuilder .durable("queue_demo03_deq") .withArgument("x-max-length",1) //设置队列最大消息长度 消息条数 超出进入死信队列 .withArgument("x-message-ttl",10000)//设置队列的消息过期时间 超时进入死信队列 拒绝签收也进入死信队列 .withArgument("x-dead-letter-exchange","exchange_direct_demo03_dlx")//设置死信交换机 .withArgument("x-dead-letter-routing-key","item.dlx")//设置死信路由key .build(); } //创建交换机 @Bean public DirectExchange createExchangedel(){ return new DirectExchange("exchange_direct_demo03_dlx"); } //创建绑定 将正常队列绑定到死信交换机上 @Bean public Binding createBindingdel(){ return BindingBuilder.bind(createqueuetdlq()).to(createExchangedel()).with("item.dlx"); } }
(2)添加controller的方法用于测试
/**
* 测试发送死信队列
* @return
*/
@RequestMapping("/send5")
public String send5() {
//发送消息到死信队列 可以使用默认的交换机 指定ourtingkey为死信队列名即可
rabbitTemplate.convertAndSend("queue_demo03_deq", "hello dlx哈哈");
return "ok";
}
业务模拟场景:用户下单后,有十分钟的支付时间,会将消息发送到延迟队列,延迟十分钟后让消费者进行消费,判断用户是否支付。
延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。在rabbitmq中,并没有延迟队列概念,但是我们可以使用ttl 和死信队列的方式进行达到延迟的效果。这种需求往往在某些应用场景中出现。当然还可以使用插件。
//通过MessagePostProcessor方法设置该消息的过期时间 对单独的一条信息设置过期时间
rabbitTemplate.convertAndSend("queue_order_queue1", (Object) "哈哈我要检查你是否有支付", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("10000");//设置该消息的过期时间
return message;
}
});
1.创建配置类
@Configuration public class DelayConfig { //正常的队列 接收死信队列转移过来的消息 @Bean public Queue createQueue2(){ return QueueBuilder.durable("queue_order_queue2").build(); } //死信队列 --->将来消息发送到这里 这里不设置过期时间,我们应该在发送消息时设置某一个消息(某一个用户下单的)的过期时间 @Bean public Queue createQueue1(){ return QueueBuilder .durable("queue_order_queue1") .withArgument("x-dead-letter-exchange","exchange_order_delay")//设置死信交换机 .withArgument("x-dead-letter-routing-key","item.order")//设置死信路由key .build(); } //创建交换机 @Bean public DirectExchange createOrderExchangeDelay(){ return new DirectExchange("exchange_order_delay"); } //创建绑定 将正常队列绑定到死信交换机上 @Bean public Binding createBindingDelay(){ return BindingBuilder.bind(createQueue2()).to(createOrderExchangeDelay()).with("item.order"); } }
2.发送信息
/** * 发送消息要发送到queue1,监听消息要监听queue2 * 发送下单 * * @return */ @RequestMapping("/send6") public String send6() { //发送消息到死信队列 可以使用默认的交换机 指定ourtingkey为死信队列名即可 System.out.println("用户下单成功,10秒钟之后如果没有支付,则过期,回滚订单"); System.out.println("时间:"+new Date()); rabbitTemplate.convertAndSend("queue_order_queue1", (Object) "哈哈我要检查你是否有支付", new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration("10000");//设置该消息的过期时间 return message; } }); return "用户下单成功,10秒钟之后如果没有支付,则过期,回滚订单"; }
3.设置监听类
@Component @RabbitListener(queues = "queue_order_queue2") public class OrderListener { @RabbitHandler public void orderhandler(Message message, Channel channel, String msg) { System.out.println("获取到消息:" + msg + ":时间为:" + new Date()); try { System.out.println("模拟检查开始=====start"); Thread.sleep(1000); System.out.println("模拟检查结束=====end"); System.out.println("用户没付款,检查没通过,进入回滚库存处理"); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { e.printStackTrace(); } } }
幂等性指一次和多次请求某一个资源,对于资源本身应该具有同样的结果。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。
在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。
解决方案:
对数据的设计进行改造
使用乐观锁,在表中加入版本号version
以转账为例:
1.发送消息
2.消息内容包含了id 和 版本和 金额
3.消费者接收到消息,则根据ID 和版本执行sql语句,
update account set money=money-?,version=version+1 where id=? and version=?
4.如果消费第二次,那么同一个消息内容是修改不成功的。
消息堆积的影响:
1.可能导致新消息无法进入队列。
2.消息等待消费的时间过长,超出业务时间范围。
产生堆积的情况
1.生产者突然大量发布消息
2.消费者消费失败
3.消费者宕机
3.消费者性能出现瓶颈
解决方法
1.增加消费者的多线程处理,
2.部署多个消费者
3.排查消费者的性能瓶颈,设置每秒的处理请求多一些
RabbitMQ这款消息队列中间件产品本身是基于Erlang编写,Erlang语言天生具备分布式特性(通过同步Erlang集群各节点的magic cookie来实现)。因此,RabbitMQ天然支持Clustering。集群是保证可靠性的一种方式,同时可以通过水平扩展以达到增加消息吞吐量能力的目的,这里只需要保证erlang_cookie的参数一致集群即可通信。
rabbimtq集群包括两种:普通集群和镜像集群。
普通集群有缺点也有优点,镜像集群有缺点也有优点。
大致上,
如果是普通集群:那么每一个节点的数据,存储了另外一个节点的元数据,当需要使用消息时候,从另外一台节点 拉取数据,这样性能很高,但是性能瓶颈发生在单台服务器上。而且宕机有可能出现消息丢失。
如果镜像集群,那么在使用时候,每个节点都相互通信互为备份,数据共享。那么这样一来使用消息时候,就直接获取,不再零时获取,但是缺点就是消耗很大的性能和带宽。
rabbitmq集群搭建,这里采用docker的方式来进行搭建
准备一个虚拟机 里面安装docker引擎。这里为了测试我们采用2台rabbitmq的实例,也就是两个docker容器来模拟2个rabbitmq服务器器。
准备一个虚拟机 里面安装docker引擎。这里为了测试我们采用2台rabbitmq的实例,也就是两个docker容器来模拟2个rabbitmq服务器器。
执行命令:
docker pull rabbitmq:3.6.15-management
docker run -d --hostname rabbit1 --name myrabbit1 -p 15672:15672 -p 5672:5672 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:3.6.15-management
docker run -d --hostname rabbit2 --name myrabbit2 -p 15673:15672 -p 5673:5672 --link=myrabbit1:rabbit1 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:3.6.15-management
解释:
--link <name or id>:alias
其中,name和id是源容器的name和id,alias是源容器在link下的别名。
--link 用于在容器中进行通信的时候需要使用到的。
-e RABBITMQ_ERLANG_COOKIE='rabbitcookie'
其中 -e 设置环境变量 变量名为:RABBITMQ_ERLANG_COOKIE 值为:rabbitcookie 该值可以任意。 但是一定要注意,两个容器的cookie值一定要一样才行。他的作用用于发现不同的节点,并通过该cookie进行自动校验和通信使用。
--hostname rabbit2
其中:--hostname 用于设置容器内部的hostname名称,如果不设置,那就会自动随机生成一个hostname字,如下图。
这里一定要设置。因为rabbitmq的节点数据进行通信加入集群的时候需要用hostname作为集群名称。
这里我们使用 集群名 rabbit@rabbit1 ,将节点2 加入到节点1号中。
docker exec -it myrabbit1 bash
rabbitmqctl stop_app --- 表示关闭节点
rabbitmqctl reset --- 重新设置节点配置
rabbitmqctl start_app --- 重新启动 (此处不需要设置 ,将该节点作为集群master,其他节点加入到该节点中)
exit ---退出容器
docker exec -it myrabbit2 bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster --ram rabbit@rabbit1
rabbitmqctl start_app
exit
解释:
rabbitmqctl join_cluster --ram rabbit@rabbit1
-- 用于将该节点加入到集群中
-- ram 设置为内存存储,默认为 disc 磁盘存储,如果为磁盘存储可以不用配置ram
-- rabbit@rabbit1 该 配置 为节点集群名称:集群名称为:rabbit@server 而server指定就是hostname的名称。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。