赞
踩
面对日益复杂的信息平台,消息队列使用是解决什么问题呢?初步总结一下可以解决如下场景问题:
业务解耦:A系统
需要耦合B、C、D系统
,在消息队列之前可以通过共享数据、接口调用等方式来实现业务,现在可以通过消息中间件进行解耦。
削峰填谷:在互联网经常会出现流量突然飙升的情况,以前很多时候就是通过性能优化、加服务器等方式,可以通过消息中间件缓存相关任务,然后按计划的进行处理。
异步:可以通过消息推送及短信发送进行说明,业务平台并不关注具体消息的发送细则,完全可以通过消息队列的方式,直接下发任务,由任务消费者进行处理。
本文将通过Rabbit MQ
、Spring Boot
集成使用来进行分享。
如何安装rabbitMq以及解决在安装的过程中出现的问题,可以参考我的这篇博客:一文详解Windows安装RabbitMQ教程
RabbitMq
,如下图所示:guest
)和初始密码(guest
),登录rabbitmq
,如下图所示:现在队列是空的,因为没有往队列发送数据,
maven
引用Spring Boot AMQP
插件<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.properties
文件配置如下信息spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=study
如果你是application.yml
配置文件,可以如下配置
spring:
rabbitmq:
host: 127.0.0.1
password: guest
port: 5672
username: guest
virtual-host: study
RabbitFanoutExchangeConfig
类该类是用来配置rabbitmq
的交换机和消息队列(queue
),如下代码所示:
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author 念兮为美
* @datetime 2022/9/1 15:41
* @desc rabbitmq 配置类
*/
@Configuration
public class RabbitFanoutExchangeConfig {
/**
* 交换机初始化
*
* @author 念兮为美
* @datetime 2022/9/1:15:43
* @return
*/
public static final String DEMO_EXCHANGE = "demo.exchange";
@Bean(name = DEMO_EXCHANGE)
public FanoutExchange demoExchange() {
return new FanoutExchange(DEMO_EXCHANGE, true, false);
}
/**
* 队列初始化
*
* @author 念兮为美
* @datetime 2022/9/1:15:43
* @return
*/
public static final String DEMO_QUEUE = "demo.queue";
@Bean(name = DEMO_QUEUE)
public Queue demoQueue() {
return new Queue(DEMO_QUEUE, true, false, false);
}
/**
* 交换机队列绑定
*
* @author 念兮为美
* @datetime 2022/9/1:15:45
* @param demoQueue 消息队列
* @param fanoutExchange 扇形交换机
* @return 返回绑定的对象
*/
@Bean
public Binding bindingSimpleQueue1(
@Qualifier(DEMO_QUEUE) Queue demoQueue,
@Qualifier(DEMO_EXCHANGE) FanoutExchange fanoutExchange) {
return BindingBuilder.bind(demoQueue).to(fanoutExchange);
}
}
RabbitMqSenderService
类import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
/**
* @author 念兮为美
* @datetime 2022/9/1 15:59
* @desc rabbitmq发送者服务
*/
@Component
public class RabbitMqSenderService {
@Autowired private RabbitTemplate rabbitTemplate;
/**
* 消息发送者
*
* @author 念兮为美
* @datetime 2022/9/1:16:06
* @param exchange 交换机
* @param routingKey 路由键值
* @param message 消息信息
* @return
*/
public void send(String exchange, String routingKey, Message message) {
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
System.out.println("开始发送消息 : " + message);
rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationId);
System.out.println("结束发送消息 : " + message);
}
}
RabbitMqReceiver
类import org.springframework.amqp.rabbit.annotation.RabbitListener;
/**
* @author 念兮为美
* @datetime 2022/9/1 16:08
* @desc 消息接收者
*/
public class RabbitMqReceiver {
/**
* 消息接收者
*
* @author 念兮为美
* @datetime 2022/9/1:16:11
* @param msg 消息体
* @return
*/
@RabbitListener(queues = RabbitFanoutExchangeConfig.DEMO_QUEUE)
public void receiverLogAll(String msg) {
System.out.println("log.all:" + msg);
}
}
DemoApplicationTests
编写测试代码import com.alibaba.fastjson.JSONObject;
import com.slowcode.config.rabbitmq.RabbitFanoutExchangeConfig;
import com.slowcode.config.rabbitmq.RabbitMqSenderService;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
@ActiveProfiles("application.properties")
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class DemoApplicationTests {
@Autowired
private RabbitMqSenderService rabbitMqSenderService;
@Test
public void testRabbitMq() {
JSONObject jsonObject = new JSONObject();
jsonObject.put("name", "念兮为美");
jsonObject.put("age", 18);
jsonObject.put("address", "江苏省无锡市新吴区");
Message message = new Message(jsonObject.toJSONString().getBytes());
rabbitMqSenderService.send(RabbitFanoutExchangeConfig.DEMO_EXCHANGE, "demo", message);
}
}
启动DemoApplicationTests
类,输出日志如下:
开始发送消息 : (Body:'[B@233f52f8(byte[72])' MessageProperties [headers={}, contentType=application/octet-stream, contentLength=0, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])
2022-09-02 17:00:37.763 INFO 39020 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [127.0.0.1:5672]
2022-09-02 17:00:37.809 INFO 39020 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#10c47c79:0/SimpleConnection@75882261 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 55639]
结束发送消息 : (Body:'[B@233f52f8(byte[72])' MessageProperties [headers={spring_listener_return_correlation=5542ab7b-282e-4344-b86f-b90891ecf44c}, contentType=application/octet-stream, contentLength=0, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])
rabbitmq
后台消息是否写入到队列中rabbitmq
后台登录地址为:http://localhost:15672
由图可知,消息写入成功。
我们在 RabbitMqReceiver
类上加上@Component
注解,启动spring boot
项目,如下图所示:
由图可知,RabbitMqSenderService
发送的消息成功被RabbitMqReceiver
消费。
rabbitmq
后台管理为空,如下图所示:
Simple Work Queue
简单工作队列该模式在Spring boot
中是很少用到的一个场景,一般都会通过Exchange
进行消息分配到队列从而为以后扩展预留一个入口。
Publish/Subscribe
发布订阅模式该模式性能最好,拿到消息直接放入队列
Routing
路由模式该模式通过routing key
进行全字匹配,匹配上将相关消息放入相关队列。
Topics
主题模式该模式通过routng key
进行模糊匹配,匹配上将相关信息放入相关队列,具体匹配规则如下:
字符:匹配单个单词,比如log.
可以匹配log.all
、log.error
等
字符#:可以匹配0
个或多个标识符,如log.#
可以匹配log.business.all
,http://log.service.info
等
Header
模式通过message header
头部信息进行比对,可以根据定义全匹配、部分匹配等规则。
消息持久化需要对交换机持久化、队列持久化、消息持久化,代码如下:
//初始化·Exchange,name是交换机名,durable是否持久化,autoDelete当前会话结束删除
public AbstractExchange(String name, boolean durable, boolean autoDelete)
//初始化Queue, name是队列名,durable是否持久化,exclusive排他性,autodelete当前会话结束删除
public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete)
//发送消息持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
针对消息延迟发送可通过死信队列进行处理:
弊端第一条消息超时设置一分钟,第二条消息超时设置2秒
,那么第二条消息消费也要等第一条消息消费玩才能生效,时间不累加。
MessageProperties messageProperties=new MessageProperties();
messageProperties.setExpiration("2000"); //设置超时时间为2S
Message message=new Message(message_01.getBytes(),messageProperties);
队列超时就是任何一个消息放入都会在指定时间过期,如果消息自身设置了超时时间,那就看消息超时和队列超时那个时间短,以短时间为准。
@Bean(name = DELAY_QUEUE)
public Queue delayQueue() {
Map<String, Object> queueProperties = new HashMap<>();
queueProperties.put("x-dead-letter-exchange", DEAD_EXCHANGE);
queueProperties.put("x-dead-letter-routing-key", "lsf.dead.letter");
queueProperties.put("x-message-ttl",10000);#设置超时时间为10s
return new Queue(DELAY_QUEUE, true, false, false, queueProperties);
}
rabbitmq
官网插件下载页面:https://www.rabbitmq.com/community-plugins.html
安装步骤如下:
选择rabbitmq_delayed_message_exchange
进入具体插件下载页,选择相关版本号:rabbitmq_delayed_message_exchange-3.8.0.ez
copy
文件到路径/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.7/plugins
启用插件:rabbitmq-plugins enable rabbitmq_delay_message_exchange
重启Rabbitmq
服务
在~SpringBoot·中实现,交换机声明为·x-delayed-message·,然后消息发送设置·delay·属性,代码如下:
//声明交换机为x-delayed-message
public static final String CUSTOM_DELAY_EXCHANGE = "lsf.custom.delay.exhange";
@Bean(name = CUSTOM_DELAY_EXCHANGE)
public Exchange customDelayExchange() {
Map<String, Object> exchangeArgs = new HashMap<>();
exchangeArgs.put("x-delayed-type","direct");
return new CustomExchange(CUSTOM_DELAY_EXCHANGE,
"x-delayed-message",
true,
false,
exchangeArgs);
}
//消息发送
MessageProperties messageProperties=new MessageProperties();
messageProperties.setDelay(150000);
String message_01="第一条消息延迟:15s "+sdf.format(date);
Message message=new Message(message_01.getBytes(),messageProperties);
rabbitmqService.send(RabbitCustomDeleyConfig.CUSTOM_DELAY_EXCHANGE,
RabbitCustomDeleyConfig.CUSTOM_DELAY_MESSAGE,
message);
消息会在交换机中等待,设置时间到了就会被消费,比消息过期机制好,并不会出现第一条消息过期时间长,第二条消息过期时间短还需要等第一条消费后才消费第二条。
Exchange
相关处理方法Exchange
的回调,开启回调,如下代码所示://连接开启回调
connectionFactory.setPublisherConfirms(true);
//绑定回调触发方法
rabbitTemplate.setConfirmCallback(new RabbitConfirmCallback());
RabbitTemplate.ConfirmCallback
封装实现方法,如下代码所示:package lsf.rabbit.config;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
public class RabbitConfirmCallback implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息发送成功:" + correlationData);
} else {
System.out.println("消息发送失败:" + cause);
}
}
}
//开启消息没有传递到队列回调
connectionFactory.setPublisherReturns(true);
//开启回调和绑定回调方法
rabbitTemplate.setMandatory(true);//开启强制委托模式
rabbitTemplate.setReturnCallback(new RabbitReturnsCallback());
继承接口RabbitTemplate.ReturnCallback
实现消息未到队列业务处理
package lsf.rabbit.config;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
public class RabbitReturnsCallback implements RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
String result = String.format("消息发送ReturnCallback-未到达消息队列:{%s}--{%s}--{%s}--{%s}--{%s}",
message, replyCode, replyText, exchange, routingKey);
System.out.println(result);
}
}
//配置确认模式为手动的
@RabbitListener(queues = RabbitRetryConfig.RETRY_NORMAL_QUEUE, ackMode = "MANUAL")
#手动拒绝消息,其中requeue为否再次入队
channel.basicReject(msg.getMessageProperties().getDeliveryTag(), false);
#手动否定确认,其中requeue为否再次入队
channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, false);
请参考我的这篇博文解决该问题:https://blog.csdn.net/lvoelife/article/details/126657171
请参考我的这篇博文解决该问题:https://blog.csdn.net/lvoelife/article/details/126659721
请参考我的这篇博客解决该问题:https://blog.csdn.net/lvoelife/article/details/126658695
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。