赞
踩
课程内容
MQ全称为Message Queue,即消息队列. 它也是一个队列,遵循FIFO原则 。RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue Protocol高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开 发中应用非常广泛。官方地址:http://www.rabbitmq.com/
开发中消息队列通常有如下应用场景: 消峰,解耦,提速,大数据处理
消除峰值:用于高并发场景消除峰值,让并发请求在mq中进行排队
大数据处理:由于数据量太大,程序一时处理不过来,可以通过把数据放入MQ,多开几个消费者去处理消息,比如:日志收集等
服务异步/解耦 :服务之间通过RPC进行通信的方式是同步方式,服务消费方需要等到服务提供方相应结果后才可以继续执行,使用MQ之后的服务通信是异步的,服务之间没有直接的调用关系,而是通过队列进行服务通信, 应用程序解耦合 MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。
排序保证 FIFO :遵循队列先进先出的特点,可以保证数据按顺序消费
除此之外使用MQ还可以达到:提高系统响应速度,提高系统稳定性的目的。 将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。 提高了应用程序的响应时间。另外如果系统挂了也没关系,数据放到消息队列.后续可以继续消费
但是需要注意的是:对数据的一致性要求较高的业务场景不适合使用MQ,因为MQ具有一定的数据延迟
AMQP是一套公开的消息队列协议,最早在2003年被提出,它旨在从协议层定义消息通信数据的标准格式, 为的就是解决MQ市场上协议不统一的问题,RabbitMQ就是遵循AMQP标准协议开发的MQ服务。 官方:http://www.amqp.org
JMS是Java消息服务,是java提供的一套消息服务API标准,其目的是为所有的java应用程序提供统一的消息通信的标准,类似java的 jdbc,只要遵循jms标准的应用程序之间都可以进行消息通信。它和AMQP有什么 不同,jms是java语言专属的消 息服务标准,它是在api层定义标准,并且只能用于java应用;而AMQP是在协议层定义的标准,是跨语言的 。
Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue。
Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。exchange有下面四种(先了解:fanout,direct,topics,header)
Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。
Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。
Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。
消息发布接收流程:
1.发送消息
1、生产者和Broker建立TCP连接。
2、生产者和Broker建立通道。
3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
4、Exchange将消息转发到指定的Queue(队列)
消息接收消息
1、消费者和Broker建立TCP连接
2、消费者和Broker建立通道
3、消费者监听指定的Queue(队列)
4、当有消息到达Queue时Broker默认将消息推送给消费者。
5、消费者接收到消息。
docker pull rabbitmq:3-management
需要开放端口:5672是程序连接的端口,15672是可视化界面接口
docker run -id --name=rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
安装好之后,访问:15672界面如下,账号和密码都是 guest
第一步需要导入mq的基础依赖,SpringBoot使用的是2.6.13
<!--SpringBoot依赖--> <parent> <groupId> org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.6.13</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies>
第二步:配置mq,主要是配置连接信息
server:
port: 10200
spring:
application:
name: rabbitmq‐application
rabbitmq:
host: 60.204.187.34
port: 5672
username: guest
password: guest
virtualHost: /
编写启动类,省略…
rabbitMQ提供了7种消息模型。https://www.rabbitmq.com/tutorials
我们使用Hello World 案例来入门,这种模式比较简单,只需要一个生产者,一个队列,一个消费者即可
我们需要做如下事情
在SpringBoot中交换机和队列的创建都通过Bean的方式来进行,下面定义了一个队列,名字为hello:
@Configuration
public class RabbitmqConfig {
//定义消息队列的名字
public static final String NAME_HELLO = "queue_hello";
@Bean
public Queue queue() {
//创建一个队列队列,并指定队列的名字
return new Queue(NAME_HELLO,true);
}
}
rabbitmq通过@RabbitListener(queues = {队列名}) 来监听队列,从而消费消息
@Component
public class ReceiveHandler {
//监听NAME_HELLO队列
@RabbitListener(queues = {RabbitmqConfig.NAME_HELLO})
public void receiveHelloQueueMessage(String msg, Message message, Channel channel) {
System.out.println("消费者收到消息:"+msg);
}
}
通过注入:RabbitTemplate发送消息,以及消息内容。
@RestController public class SenderController { @Autowired private RabbitTemplate rabbitTemplate; @PostMapping("/sender/hello/{message}") public String senderHello(@PathVariable String message) { /** * 参数说明 * exchnage: 交换机,默认交换机指定为“”即可 * routingKey :发送消息的路由键,该模式下使用队列名即可 * message:消息的内容 */ rabbitTemplate.convertAndSend("", RabbitmqConfig.NAME_HELLO,message); return "success"; } }
注意:这个的交换机使用的是默认的交换机"" ,路由键直接指定为队列的名字。其实在MQ中是提供了几个默认的交换机,当我们把交换机指定为 “” , 就会使用默认的交换机来转发消息,而我们创建的队列会和默认的交换机进行绑定,如下:
下面是绑定关系图
启动程序访问controller进行测试,控制台可以看到消费者打印的日志,打开MQ的可视化界面可以看到创建的队列,之所以里面没有消息是因为消息被消费了。
Work queues,也被称为(Task queues),任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息
。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的
,即:同一个消息只会被一个消费者消费
WorkQueue和HelloWorld本身无区别,只是在HelloWorld的基础上多增加消费者而已,如下:
@Component
public class ReceiveHandler {
//监听NAME_HELLO队列
@RabbitListener(queues = {RabbitmqConfig.NAME_HELLO})
public void receive1(String msg, Message message, Channel channel) {
System.out.println("消费者1收到消息:"+msg);
}
//监听NAME_HELLO队列
@RabbitListener(queues = {RabbitmqConfig.NAME_HELLO})
public void receive2(String msg, Message message, Channel channel) {
System.out.println("消费者2收到消息:"+msg);
}
}
连续多次发送消息MQ会使用轮询方式把消息评价分配给多个消费者
这种消费模式有一个问题,当某个消费者消费能力偏弱会导致后续的消息阻塞,我们可以通过 prefetch
来指定消费者每次只能拉取一个消息,这样的话当某个消费者正在忙碌,那么MQ会把消息推送给别的消费者,防止消息在某个消费者身上发生阻塞。
spring:
rabbitmq:
listener:
simple:
prefetch: 1
在上面的案例中,我们采用一个队列来发送消息,及时同一个队列监听了多个消费者,同一个消息也只会给到其中一个消费者,而发布订阅模型允许一个消息向多个消费者投递
。而对于:fanout , direct , topics都属于发布订阅模型。
RabbitMQ的exchnage正好有4中类型,就对应了上述的几种订阅模型,源码如下:
public abstract class ExchangeTypes {
public static final String DIRECT = "direct";
public static final String TOPIC = "topic";
public static final String FANOUT = "fanout";
public static final String HEADERS = "headers";
public static final String SYSTEM = "system";
}
Fanout被叫做广播模型,它的特点是当生产者把消息投递给交换机,交换机会把消息投递给和它绑定的所有队列,而相应的所有的消费者都能收到消息,如上图。要实现Fanout模型我们要做如下几个事情
下面配置了一个fanout类型的交换机和2个队列,并把队列绑定到了交换机
@Configuration public class RabbitmqConfigFanout { //定义消息队列的名字 public static final String QUEUE_1 = "queue1"; public static final String QUEUE_2 = "queue2"; public static final String EXCHANGE_FANOUT = "exchnage-fanout"; @Bean public Exchange exchange(){ //定义一个fanout类型的交换机,并指定持久化 return ExchangeBuilder.fanoutExchange(EXCHANGE_FANOUT).durable(true).build(); } @Bean public Queue queue1() { //创建一个队列队列,并指定队列的名字和持久化 return new Queue(QUEUE_1,true); } @Bean public Queue queue2() { //创建一个队列队列,并指定队列的名字 return new Queue(QUEUE_2,true); } @Bean public Binding bindingQueue1() { //fanout模式不指定routingkey return BindingBuilder .bind(queue1()).to(exchange()).with("").noargs(); } @Bean public Binding bindingQueue2() { return BindingBuilder .bind(queue2()).to(exchange()).with("").noargs(); } }
消费者只需要监听不同的队列即可
@RabbitListener(queues = {RabbitmqConfigFanout.QUEUE_1})
public void receiveFanout1(String msg, Message message, Channel channel) {
System.out.println("fanout消费者1收到消息:"+msg);
}
@RabbitListener(queues = {RabbitmqConfigFanout.QUEUE_2})
public void receiveFanout2(String msg, Message message, Channel channel) {
System.out.println("fanout消费者2收到消息:"+msg);
}
生产者发送消息的时候需要指定exchange的名字,注意:routingkey不需要指定
@PostMapping("/sender/fanout/{message}")
public String senderFanout(@PathVariable String message) {
/**
* 参数说明
* exchnage: 交换机,使用自定义的交换机
* routingKey :发送消息的路由键,fanout模式指定为“”
* message:消息的内容
*/
rabbitTemplate.convertAndSend(RabbitmqConfigFanout.EXCHANGE_FANOUT, "",message);
return "success";
}
启动测试,发送一个消息2个消费者都能收到
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费,我们就要用的routing路由模式,这种模式是通过一个routingkey来收发消息。交换机的类型使用direct
如上图:不同的队列在绑定到交换机时指定的routingkey是不一样的,这样一来我们发送消息的时候,就可以通过不同的routingkey来把消息发送到不同的队列中,从而使不同的消费者去消费,该模型我们需要做如下几个步骤
这里定义了一个direct类型的交换机,以及2个队列,队列在绑定到交换机时采用了不同的routingkey.
@Configuration public class RabbitmqConfigDirect { //定义消息队列的名字 public static final String QUEUE_DIRECT_1 = "direct_queue1"; public static final String QUEUE_DIRECT_2 = "direct_queue2"; public static final String EXCHANGE_DIRECT = "exchnage-direct"; @Bean public Exchange exchange(){ //定义一个direct类型的交换机,并指定持久化 return ExchangeBuilder.directExchange(EXCHANGE_DIRECT).durable(true).build(); } @Bean public Queue queue1() { //创建一个队列队列,并指定队列的名字 return new Queue(QUEUE_DIRECT_1,true); } @Bean public Queue queue2() { //创建一个队列队列,并指定队列的名字 return new Queue(QUEUE_DIRECT_2,true); } @Bean public Binding bindingQueue1() { return BindingBuilder .bind(queue1()).to(exchange()).with("pay").noargs(); } @Bean public Binding bindingQueue2() { return BindingBuilder .bind(queue2()).to(exchange()).with("order").noargs(); } }
消费者消费不同队列中的消息即可
@RabbitListener(queues = {RabbitmqConfigDirect.QUEUE_DIRECT_1})
public void receiveDirect1(String msg, Message message, Channel channel) {
System.out.println("receiveDirect1消费者1收到消息:"+msg);
}
@RabbitListener(queues = {RabbitmqConfigDirect.QUEUE_DIRECT_2})
public void receiveDirect2(String msg, Message message, Channel channel) {
System.out.println("receiveDirect2消费者2收到消息:"+msg);
}
生产者需要指定自己的交换机,以及routingkey,指定不同的routingkey决定了消息会发送到不同的队列中
@PostMapping("/sender/direct/{message}")
public String senderDirect(@PathVariable String message) {
/**
* 参数说明
* exchnage: 交换机,使用自定义的交换机
* routingKey :发送消息的路由键,fanout模式指定为“”
* message:消息的内容
*/
rabbitTemplate.convertAndSend(RabbitmqConfigDirect.EXCHANGE_DIRECT, "pay",message);
rabbitTemplate.convertAndSend(RabbitmqConfigDirect.EXCHANGE_DIRECT, "order",message);
return "success";
}
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!这种模型Routingkey 一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: goods.insert
通配符规则:
#:匹配一个或多个词
*:匹配不多不少恰好1个词
举例:
audit.#:能够匹配audit.irs.corporate 或者 audit.irs
audit.*:只能匹配audit.irs
topic 和 direct 没有本质的区别,只是在绑定队列时可以使用通配符
需要定义topics类型的交换机和2个队列,绑定队列的时候指定routingkey,可以使用通配符来指定
@Bean
public Binding bindingQueue1() {
return BindingBuilder
.bind(queue1()).to(exchange()).with("#.pay").noargs();
}
@Bean
public Binding bindingQueue2() {
return BindingBuilder
.bind(queue2()).to(exchange()).with("#.order").noargs();
}
发送消息的时候指定的routingkey如果能命中绑定时的routingkey消息就可以发送到相应的队列中,比如:
rabbitTemplate.convertAndSend(RabbitmqConfigTopics.EXCHANGE_TOPIC, "good.pay",message);
rabbitTemplate.convertAndSend(RabbitmqConfigTopics.EXCHANGE_TOPIC, "account.pay",message);
rabbitTemplate.convertAndSend(RabbitmqConfigTopics.EXCHANGE_TOPIC, "good.order",message);
前2个消息的routingkey可以命中 #.pay, 第3条消息可以命中 #.order.
在RabbitMQ中包含手动签收和自动签收2钟模式,上述案例都采用的是自动签收,也就是当MQ吧消息投递给消费者后,消息默认被签收,MQ就会直接把消息删除掉。这种模式可能会导致消息丢失分享:比如消费者拿到消息并未成功消费,但是MQ已经把消息删除,从而造成了消息的丢失,所以在司机开发中尽量使用手动签收
手动签收模式意味着MQ不会自动签收消息,而是把消息推送给消费者后,等到消费者自己去签收消息后,再删除队列中的消息,这种模式可以防止消息丢失。我们可以通过下面配置来指定签收模式
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual #默认是 auto 自动签署
然后在消费者成功消费完消息后,触发手动签收,代码如下
@RabbitListener(queues = {RabbitmqConfigDirect.QUEUE_DIRECT_2})
public void receiveDirect2(String msg, Message message, Channel channel) throws IOException {
System.out.println("receiveDirect2消费者2收到消息:"+msg);
//拿到消息的tag
long deliveryTag = message.getMessageProperties().getDeliveryTag();
//签收消息:指定消息的tags ,以及不做批量签收
channel.basicAck(deliveryTag,false);
}
channel.basicAck : 签收消息
除此之外我们还可以不签收消息,或者拒绝消息.不签收的消息会一直重复消费,而被拒绝的消息会丢弃掉
//不签收
channel.basicNack(deliveryTag,false,false);
//拒绝消息
channel.basicReject(deliveryTag,false);
mq消息在内存中进行读写,如果MQ宕机那么消息有丢失的风险,我们需要通过持久化来防止消息丢失
创建交换机的时候,指定durable属性为true
@Bean
public Exchange exchange(){
//定义一个direct类型的交换机,并指定持久化
return ExchangeBuilder.directExchange(EXCHANGE_DIRECT).durable(true).build();
}
创建队列时指定durable属性为true
@Bean
public Queue queue1() {
//创建一个队列队列,并指定队列的名字
return new Queue(QUEUE_DIRECT_1,true);
}
当我们发送一个消息内容的时候,SpringBoot会自动帮我们持久化
rabbitTemplate.convertAndSend(RabbitmqConfigDirect.EXCHANGE_DIRECT, "order",message);
底层会自动构建Message对象,Messge对象中有一个MessageProperties属性,它包含了MessageDeliveryMode.PERSISTENT持久化和NON_PERSISTENT不持久化2中方式。
在RabbitTemplate中提供了2个接口
要开启上面2中回调需要在yaml中做如下配置
spring:
rabbitmq:
publisher-returns: true #开启returnCallback回调
template:
mandatory: true #消息会返回给发送者的回调,而不是丢弃
publisher-confirm-type: correlated #开启ConfirmCallback 回调
然后需要编写回调接口,通过实现 RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback
@Component @Slf4j public class RabbitMQCallback implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback { @Autowired private RabbitTemplate rabbitTemplate; @Override public void returnedMessage(ReturnedMessage returnedMessage) { //ReturnedMessage 消息对象中包括:交换机,路由key,消息内容等 log.info(returnedMessage.getExchange() +","+returnedMessage.getRoutingKey() +","+new String(returnedMessage.getMessage().getBody())); //把失败的消息再次发送 rabbitTemplate.convertAndSend(returnedMessage.getExchange(), returnedMessage.getRoutingKey(),returnedMessage.getMessage()); } /** * @param correlationData :消息的唯一标识 * @param ack :消息确认结果 * @param cause :错误原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info(correlationData.getId() +","+ack +","+cause); } }
然后自定义template,把2个回调设置给template
//以下配置RabbitMQ消息服务 @Autowired public ConnectionFactory connectionFactory; @Autowired private RabbitMQCallback rabbitMQCallback; @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory); // 这里的转换器设置实现了发送消息时自动序列化消息对象为message body template.setMandatory(true); template.setReturnsCallback(rabbitMQCallback); template.setConfirmCallback(rabbitMQCallback); return template; }
根据上面2个回调接口的特性,我们可以做一个可靠消息投递方案,方案如下:
在开发项目的时候我们通常会遇到这么一个问题,比如商城项目有一下单逻辑,下单成功数据保存在数据库中,下单成功后需要用户进行支付,如果在30分钟内支付失败,需要修改订单的支付状态为“支付超时”并关闭订单以及回退库存操作,那如何在下单30后准时检查支付结果处理订单状态呢?
你可能想到了一个最简单的方法,就是使用定时任务扫描订单表,判断时间是否支付超时,这样的方式无疑是一种很消耗性能的做法,你试想一下,定时扫描一张数据量很大的表去判断时间和状态,而且99%的扫描都是无效的操作。
那么该如何优雅的解决上述问题呢?我们可以采用延迟队列来实现,Redis和MQ都可以做到,本文章采用RabbitMQ的延迟队列来实现。
说到延迟队列就要说一说消息的过期时间(存活时间)TTL,RabbitMQ可以给队列设置过期时间,也可以单独给每个消息设置过期时间,如果到了过期时间消息没被消费该消息就会标记为死信消息。
除此之外还有那些消息会成为死信消息?
成为死信的消息会进入一个死信交换机(Dead Letter Exchange)中,死信交换机也是一个普通的交换机而已,根据这一特点,我们可以准备一个队列来接收死信交换机中的死信消息,然后准备一个消费者来消费该队列中的消息,这样一来我们的延迟队列就有思路了,还是按照订单为例流程如下:
整体效果就是,消息进入order.message队列 延迟 10秒后就 会进入delay-message队列然后被消费者消费处理,这就是一个延迟队列的效果。
注意,这里的delay-exchange死信交换机其实就是一个普通的交换机而已,所以我们可以把上面的两个交换机合并成一个,如下:
第一步,定义交换机和队列
import org.springframework.amqp.core.*; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; //rabbitMQ的配置 @Configuration public class MQConfig { //交换机 public static final String EXCHNAGE_DELAY = "EXCHNAGE_DELAY"; //订单队列,该队列中的消息设置过期时间 public static final String QUEUE_ORDER = "QUEUE_ORDER"; //该队列用来接收死信交换机转发过来的消息 public static final String QUEUE_DELAY = "QUEUE_DELAY"; //队列的路由键,该路由键用来接收订单消息传出到订单队列 public static final String ROUTINGKEY_QUEUE_ORDER = "ROUTINGKEY_QUEUE_ORDER"; //该路由键用来接收死信交换机转发过来的消息 public static final String ROUTINGKEY_QUEUE_DELAY = "ROUTINGKEY_QUEUE_DELAY"; //定义交换机 @Bean public Exchange exchangeDelay(){ return ExchangeBuilder.topicExchange(EXCHNAGE_DELAY).durable(true).build(); } //该队列中的消息需要设置ttl @Bean public Queue queueOrder(){ Map<String,Object> map = new HashMap<>(); map.put("x-dead-letter-exchange", EXCHNAGE_DELAY); //过期的消息给哪个交换机的名字 map.put("x-dead-letter-routing-key", ROUTINGKEY_QUEUE_DELAY); //死信交换机把消息个哪个个routingkey map.put("x-message-ttl", 10000); //队列过期时间10s return new Queue(QUEUE_ORDER,true,false,false,map); } //该队列接收死信交换机转发过来的消息 @Bean public Queue queueDelay(){ return new Queue(QUEUE_DELAY,true); } @Bean public Binding queueOrderBinding(){ return BindingBuilder.bind(queueOrder()).to(exchangeDelay()).with(ROUTINGKEY_QUEUE_ORDER).noargs(); } @Bean public Binding queueDelayBinding(){ return BindingBuilder.bind(queueDelay()).to(exchangeDelay()).with(ROUTINGKEY_QUEUE_DELAY).noargs(); } @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } }
第二步,写一个消息发送者
System.out.println("发送消息:我是一个延迟消息,开始时间:"+System.currentTimeMillis());
rabbitTemplate.convertAndSend(
MQConfig.EXCHNAGE_DELAY,
MQConfig.ROUTINGKEY_QUEUE_ORDER,
"我是一个延迟消息"
);
第三步,写一个消费者
@Component
public class Consumer {
@RabbitListener(queues = MQConfig.QUEUE_DELAY)
public void handler(String message){
System.out.println("收到消息:"+message+",结束时间:"+System.currentTimeMillis());
}
}
第六步,测试效果
Producer: 发送消息:我是一个延迟消息,开始时间:1606295976347
Consumer: 收到消息:我是一个延迟消息,结束时间:1606295986418
发送消息到收到消息的时间差为 10071 , 忽略网络开销,延迟时间差不多就是我们设置的TTL时间
因为消息本身是有重试机制或者我们为了保证消息一定能投递成功可能会导致消息多次投递,那么对于消费者而言消息的重复消费处理就变得非常重要。通常我们可以使用消息的唯一标识来避免重复消费,大概思路如下
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。