赞
踩
消息队列产品有很多,比如说常见的有Kafka、RocketMQ、RabbitMQ和ActiveMQ:
RabbitMQ在系统中的作用(削峰填谷)
安装,通过docker即可快速安装
拉取镜像:
docker pull rabbitmq:3.7.7-management
启动:
根据下载的镜像创建和启动容器
docker run -d --name rabbitmq3.7.7 -p 5672:5672 -p 15672:15672 -v `pwd`/data:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin df80af9ca0c9
说明:
查看,启动成功:
五种队列模式
具体例子:RabbitMQ 详解 五种队列-SpiritMark_liu - 云+社区 - 腾讯云 (tencent.com)
在此次消息模块的功能使用(topic)主题模式实现
简单模式
一个生产者(发送方)对应一个消费者(接收方)
Work模式
一个生产者对应多个消费者,但是只能有一个消费者获得消息(排他)
发布/订阅模式
一个消费者将消息首先发送到fanout交换器,交换器绑定到多个队列,然后与之对应的所有消费者都能接收到消息(不排他)
路由模式
生产者将消息发送到direct交换器,交换器按照关键字(Key),把消息路由到某个队列
主题模式(√)
生产者将消息发送到Topic交换器,交换器按照复杂的规则,把消息路由到某个队列
消息持久化
消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保证消息可靠性的呢?答案就是消息持久化。持久化可以防止在异常情况下丢失数据。除了消息持久化之外,甚至交换器和队列都能持久化。
消息过期时间
默认情况下,消息是无限期存储在RabbitMQ上面的,但是我们可以设置消息过期时间,到期之后无论该消息是否已经被接收,都会被RabbitMQ删除。
Ack应答
消费者接收消息之后,必须返回一个Ack应答,那么RabbitMQ才会认为这条消息接收成功。如果想要删除这条消息,消费者发送Ack应答的时候,附带一个deliveryTag
标志位就可以了。
在pom.xml
文件中添加RabbitMQ的依赖库
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
使用我们采用【 异步线程同步收发消息】实现消息推送功能:
异步收发配置yaml文件,同步收发则需配置ConnectionFactory对象:
@Configuration
public class RabbitMQConfig {
@Bean
public ConnectionFactory getFactory() {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("***"); //Linux主机的IP地址
factory.setPort(5672); //RabbitMQ端口号
return factory;
}
}
创建线程任务类:
@Slf4j
@Component
public class MessageTask {
@Autowired
private ConnectionFactory factory;
@Autowired
private MessageService messageService;
/**
* 同步发送消息
*
* @param topic
* @param entity
*/
public void send(String topic, MessageEntity entity) {
//向MongoDb保存消息数据,返回消息主键
String id = messageService.insertMessage(entity);
//向rabbitMQ发送消息
try {
//类比jdbc:创建连接,创建statement,执行sql
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//连接到某个Topic
channel.queueDeclare(topic, true, false, false, null);
//存放属性数据:
HashMap header = new HashMap();
header.put("messageId", id);
//创建AMQP协议参数对象,添加附加属性
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().headers(header).build();
channel.basicPublish("", topic, properties, entity.getMsg().getBytes());
log.debug("消息发送成功");
} catch (Exception e) {
log.error("执行异常", e);
throw new EmosException("向MQ发送消息失败");
}
}
/**
* 异步发送消息
*
* @param topic
* @param entity
*/
@Async
//messageTask.sendAsync(userid + "", entity);
public void sendAsync(String topic, MessageEntity entity) {
send(topic, entity);
}
/**
* 同步接收数据
*
* @param topic
* @return
*/
public int receive(String topic) {
int i = 0;
try {
//创建mq连接
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//从队列中获取消息,不自动确认
channel.queueDeclare(topic, true, false, false, null);
//Topic中有多少条数据位置,所以使用死循环接收数据,直到接收不到数据,退出死循环
while (true) {
//创建响应接收数据,禁止自动发送ack应答
GetResponse response = channel.basicGet(topic, false);
if (response != null) {
AMQP.BasicProperties properties = response.getProps();
Map<String, Object> headers = properties.getHeaders();
String messageId = (String) headers.get("messageId");
byte[] body = response.getBody();
String message = new String(body);
log.debug("从RabbitMQ接收的消息" + message);
MessageRefEntity entity = new MessageRefEntity();
entity.setMessageId(messageId);
entity.setReceiverId(Integer.parseInt(topic));
entity.setReadFlag(false);
entity.setLastFlag(true);
//把消息存储在MongoDB中
messageService.insertRef(entity);
//数据保存到MongoDB后,才发送给ack应答 让topic删除这条消息
long deliveryTag = response.getEnvelope().getDeliveryTag();
channel.basicAck(deliveryTag, false);
i++;
} else {
break;//接收不到消息则退出循环
}
}
} catch (Exception e) {
log.error("执行异常", e);
throw new EmosException("获取消息出现异常");
}
return i;
}
/**
* 异步接收数据
*
* @param topic
* @return
*/
@Async
//异步接收消息
//messageTask.receiveAysnc(userId + "");
public int receiveAysnc(String topic) {
return receive(topic);
}
/**
* 同步删除消息队列
*
* @param topic 主题
*/
public void deleteQueue(String topic) {
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDelete(topic);
log.debug("消息队列成功删除");
} catch (Exception e) {
log.error("删除队列失败", e);
throw new EmosException("删除队列失败");
}
}
/**
* 异步删除消息队列
*
* @param topic 主题
*/
@Async
public void deleteQueueAsync(String topic) {
deleteQueue(topic);
}
}
throw new EmosException("删除队列失败");
}
}
/**
* 异步删除消息队列
*
* @param topic 主题
*/
@Async
public void deleteQueueAsync(String topic) {
deleteQueue(topic);
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。