赞
踩
RabbitMQ简介
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
MQ能干嘛
应用解耦、异步、流量削锋、数据分发、错峰流控、日志收集等等…
RabbitMQ执行流程
生产者与消息代理Boker直接建立一连长连接的channal,然后发送消息给我们的交换机Exchange,交换机接收到生产者发送的消息,由消息中的routeKey入有间发送给指定队列,队列存储消息等待消费者获取
JAVA集成RabbitMQ
导入pom依赖
<!--消息队列依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
YAML配置
spring:
rabbitmq:
host: 127.0.0.1 #rabbitmq地址
port: 5672 #端口
virtual-host: /
username: guest
password: guest
配置完成后在我们主启动类上添加注解@EnableRabbit 开启对rabbltMQ的支持
@EnableRabbit
然后注入amqpAdmin来进行创建我们的交换机、队列、绑定等。
@Autowired
AmqpAdmin amqpAdmin;
Exchange交换机
交换机,用来接收生产者发送的消息并将这些消息路由给服务器中的队列
交换机类型:
@Test void createExchange(){ /** * 创建 direct类型交换机 * public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) { * super(name, durable, autoDelete, arguments); * } * name:交换机名 * durable:是否持久化 * autoDelete:是否自动删除 * arguments:参数列表 */ DirectExchange directExchange = new DirectExchange("oa-cloud-document-exchange",true,false); amqpAdmin.declareExchange(directExchange); System.out.println("交换机创建完成"); }
创建成功后可以查看我们的UI管理页面
QUEUE队列
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走
@Test
void createQueue(){
/**
* public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments) {
* super(arguments);
* name:队列名
* durable:是否持久化
* exclusive:是否排他(只能让一条声明的链接使用)
* autoDelete:是否自动删除
*/
Queue queue = new Queue("oa-cloud-document-notice-read-queue",true,false,false);
amqpAdmin.declareQueue(queue);
System.out.println("队列创建成功");
}
Binding绑定
用于消息队列和交换机之间的关联。一个绑定就是基于路由键将交换机和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表
@Test void createBinding(){ /** * public Binding(String destination, Binding.DestinationType destinationType, String exchange, String routingKey, @Nullable Map<String, Object> arguments) { * super(arguments); * destination:目的地 * destinationType:目的地类型 * exchange:交换机 * routingKey:路由键 * 将交换机exchange与指定的目的地进行绑定,使用routingKey指定路由键 */ Binding binding = new Binding( "oa-cloud-document-notice-read-queue", Binding.DestinationType.QUEUE, "oa-cloud-document-exchange", "notice-read", null); amqpAdmin.declareBinding(binding); System.out.println("绑定创建成功"); }
发送消息测试
接下来我们就可以给rabbitmq发送消息进行测试
使用rabbitTemplate来发送消息
@Autowired RabbitTemplate rabbitTemplate; @Test void sendMessage(){ NoticeParam msg = new NoticeParam(UUID.randomUUID().toString(),UUID.randomUUID().toString()); /** * exchage:交换机,给这个交换机发送消息 * routingKey:使用指定的路由键 路由到绑定的队列上 * msg:Object 发送的消息 * * 如果发送的消息是对象,则该对象必须实现序列化接口 * 发送消息会将该对象进行序列化后发送 * * 消息转换默认使用java的序列换,我们可以转换成我们自己的消息转换器 * 使用 */ rabbitTemplate.convertAndSend("oa-cloud-document-exchange","notice-read",msg); }
**查看我们发送的消息
如果发送的消息是对象,则必须让该对象实现Serializable接口
可以看到对象已经被序列化了,但是并不是我们想要的JSON格式
查看源码消息转换默认进行序列化,我们可以转换成我们自己的消息转换器
查看源码可以看到如果我们不设置消息转换器将默认使用他自己的
我们可以使用JSON格式的消息转换器
再次发送消息测试,可以看到我们发送的对象已经转换成对象了
监听消息队列
使用@RabbitListener注解
/**
* @Description: 监听消息 queues 就是我们需要监听的队列
* @Author: Huang
* @Date: 2021/1/10 21:24
* @Param: [message]
* @return: void
*/
@RabbitListener(queues = {"oa-cloud-document-notice-read-queue"})
public void linserQueue(Object message){
System.out.println(message);
}
查看控制台可以看到我们发送的消息
其中包含消息头和消息体
这样拿到的消息就是个Message类型的数据,我们也可以直接使用对象来进行接收
/**
* @Description: 监听消息 queues 就是我们需要监听的队列
* @Author: Huang
* @Date: 2021/1/10 21:24
* @Param: [message]
* @return: void
*/
@RabbitListener(queues = {"oa-cloud-document-notice-read-queue"})
public void linserQueue(Message message, NoticeParam content){
System.out.println(content);
}
这样我们就直接取到对象
还可以使用@RabbitHandler + @RabbitListener来实现多类型的方法重载
@RefreshScope @Slf4j @RabbitListener(queues = {"oa-cloud-document-notice-read-queue"}) @Service public class QyWechatDocumentInfoServiceImpl{ /** * @Description: 监听消息 queues 就是我们需要监听的队列 * @Author: Huang * @Date: 2021/1/10 21:24 * @Param: [message] * @return: void */ @RabbitHandler public void linserQueue(Message message, NoticeParam content){ System.out.println(content); } @RabbitHandler public void linserQueue2(Message message, QueryIdParam content){ System.out.println(content); } }
现在有一个业务场景,当我们阅读文章时,要去添加这个文章的访问量,按照我们传统的做法是在查询时再去调用修改数据库增加这篇文章的访问量,当并发量一上来,这这修改的操作就会浪费很多资源
使用消息队列来优化传统的解决方法,当我们查询这个文章时,给我们的消息队列发送一个消息,让我们直接返回查询到的文章内容,异步的添加文章的访问量
发送消息
/** * @Description: 获取通知详情 * @Author: Huang * @Date: 2021/1/9 16:25 * @Param: [httpServletRequest, queryIdParam] * @return: com.lgzyy.oacloudcommon.model.response.base.ResponseData */ @Override public ResponseData getNoticeInfo(HttpServletRequest request, QueryIdParam queryIdParam) { if (StringUtils.isBlank(queryIdParam.getId())){ return new ResponseData(ExceptionEnum.SYS_PARAM_INVALID.getCode(),ExceptionEnum.SYS_PARAM_INVALID.getMessage()); } Long readCount = 0L; boolean agree = false; Long agreeCount = 0L; //获取用户信息 String adminToken = request.getHeader(AuthenticationConstant.QYWECHAT_AUTHORIZATION_TOKEN); OaCloudSysUser oaCloudSysUser = GsonUtil.fromJson(redisUtil.get(AuthenticationConstant.QYWECHAT_AUTHORIZATION_TOKEN_PREFIX + adminToken).toString(), OaCloudSysUser.class); OaCloudDocumentInfo oaCloudDocumentInfo = oaCloudDocumentInfoMapper.selectById(queryIdParam.getId()); if (Objects.isNull(oaCloudDocumentInfo)){ return new ResponseData(ExceptionEnum.SELECT_FAIL.getCode(),ExceptionEnum.SELECT_FAIL.getMessage()); } //获取阅读数 readCount = redisUtil.getHashSize("notice-read-" + queryIdParam.getId()); //获取通知点赞数 agreeCount = redisUtil.getHashSize("notice-agree-" + queryIdParam.getId()); if (StringUtils.isNotBlank(oaCloudSysUser.getId())){ /** * 给消息队列发送消息 * RabbitEnum.NOTICE_READ.getExchange() //指定交换机 * RabbitEnum.NOTICE_READ.getRoutingKey() //指定路由键,可以找到与交换机绑定的队列 * new NoticeParam(queryIdParam.getId(),oaCloudSysUser.getId()) */ rabbitTemplate.convertAndSend(RabbitEnum.NOTICE_READ.getExchange(),RabbitEnum.NOTICE_READ.getRoutingKey(),new NoticeParam(queryIdParam.getId(),oaCloudSysUser.getId())); //获取当前用户是否点赞 agree = redisUtil.checkHashKey("notice-agree-" + queryIdParam.getId(), oaCloudSysUser.getId()); } JSONObject jsonObject = new JSONObject(); jsonObject.put("noticeInfo",oaCloudDocumentInfo); jsonObject.put("readCount",readCount); jsonObject.put("agree",agree); jsonObject.put("agreeCount",agreeCount); return new ResponseData(ExceptionEnum.SELECT_SUCCESS.getCode(),ExceptionEnum.SELECT_SUCCESS.getMessage(),jsonObject); }
接收消息
接收消息然后执行访问量添加
@RabbitHandler
public void rabbitInsertNoticeRead(Message message,NoticeParam content){
log.info("=============================MQ接收消息=============================");
log.info("使用的交换机:{}",message.getMessageProperties().getReceivedExchange());
log.info("使用的路由键:{}",message.getMessageProperties().getReceivedRoutingKey());
log.info("接收的队列:{}",message.getMessageProperties().getConsumerQueue());
log.info("消息内容:{}",content);
log.info("===================================================================");
//添加当前用户阅读记录
redisUtil.setHash("notice-read-"+content.getDocumentId(),content.getUserId());
}
运行效果
查看我们数据是否插入成功
可以看到我们的消息已成功接收并执行了业务
消息确认机制为防止我们的消息发送失败,保障消息可靠抵达。
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
virtual-host: /
username: admin
password: admin
#开启发送端确认
publisher-confirm-type: correlated
publisher-confirms: true
#开启发送端消息抵达队列确认
publisher-returns: true
#只要消息抵达队列,以异步发送优先回调publisher-returns
template:
mandatory: true
修改我们的RabbitConfig,给rabbitTemplate设置确认回调
/** * @ClassName: RabbitConfig * @Author: Huang * @Date: 2021/1/10 21:16 */ @Slf4j @Configuration public class RabbitConfig { @Autowired RabbitTemplate rabbitTemplate; @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } /** * 设置发送端确认回调 */ @PostConstruct //RabbitConfig对象创建完成后调用该方法 public void initRabbitTemplate(){ rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * 只要消息抵达broker就触发 * @param correlationData 消息的唯一关联数据(消息的ID) * @param ack 消息是否成功收到 * @param cause 失败的原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("=================================================================="); log.info("消息关联数据:{}",correlationData); log.info("消息是否成功:{}",ack); log.info("消息失败原因:{}",cause); log.info("=================================================================="); } }); /** * 设置消息抵达队列的确认回调 */ rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { /** * 只要消息没有抵达队列就触发,类似于失败回调 * @param message 投递失败的消息详细信息 * @param replyCode 回复的状态码 * @param replyText 回复的文本内容 * @param exchange 消息发送给的交换机 * @param routingKey 消息发送使用的路由键 */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("=================================================================="); log.info("-------------------------消息发送到队列失败--------------------------"); log.info("投递失败的消息:{}",message); log.info("回复的状态码:{}",replyCode); log.info("回复的内容:{}",replyText); log.info("使用的交换机:{}",exchange); log.info("使用的路由键:{}",routingKey); log.info("=================================================================="); } }); } }
在发送一条消息测试
可以看到我们的发送端的确认消息,已成功发给消息代理。只要发送了消息,都会进行我们的确认回调。
-消费端确认消息
默认是自动确认,只要消息接收到,消费端就会自动确认,服务端都会把这个消息删除
注意:如果发送多条消息,但消费端还没处理完所有消息,如果消费端宕机,那将会造成 消息丢失 也是就消息没有全部消费,但是默认确认,这样服务端就会移除所有消息
可以开启消费端手动确认默认
spring:
rabbitmq:
#开启手动消息签收
listener:
simple:
acknowledge-mode: manual
手动消息确认,只要我们没有确认消息。消息将一直处于unAcked状态,及时我们的消费端宕机了,消息也会一直保存,状态重新变为ready状态
可以看到虽然我们的消息发送成功,并成功被消费,但是我们没有确认,这个消息就是一直处于unacked状态,这时候我们再来关闭消费端,可以看到消息变成了ready状态
手动确认代码实现
@RabbitHandler public void rabbitInsertNoticeRead(Message message, NoticeParam content, Channel channel){ log.info("=============================MQ接收消息============================="); log.info("使用的交换机:{}",message.getMessageProperties().getReceivedExchange()); log.info("使用的路由键:{}",message.getMessageProperties().getReceivedRoutingKey()); log.info("接收的队列:{}",message.getMessageProperties().getConsumerQueue()); log.info("消息内容:{}",content); log.info("==================================================================="); //添加当前用户阅读记录 redisUtil.setHash("notice-read-"+content.getDocumentId(),content.getUserId()); try{ /** * 消息确认: * deliveryTag channel内按顺序自增的一个消息编号 * b:是否批量确认 */ long deliveryTag = message.getMessageProperties().getDeliveryTag(); channel.basicAck(deliveryTag,false); }catch (Exception e){ e.printStackTrace(); } }
修改完成之后,当我们重启消费端,可以看到刚才的消息已被获取,而且mQ控制页面的消息也被成功确认并移除
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。