赞
踩
RabbitMQ是一种开源的消息代理软件,它实现了高级消息队列协议(AMQP)标准,可用于在应用程序之间传递消息。RabbitMQ最初由LShift开发,现在由Pivotal Software维护。
RabbitMQ可以在多个平台上运行,包括Windows、Mac OS X和各种Linux发行版。它提供了多种编程语言的客户端库,如Java、Python、Ruby、.NET等等。RabbitMQ的主要特点包括:
RabbitMQ的工作原理主要包括生产者(Producer)、消息队列(Queue)和消费者(Consumer)三个部分。
安装步骤如下
需要注意的是,安装 RabbitMQ 之前需要先安装 Erlang,而且版本要匹配。另外,如果在安装过程中出现问题,可以参考 RabbitMQ 的官方文档或者社区论坛来解决。
安装成功后 访问127.0.0.1:15672 出现登录页面安装成功。
首先通过idea准备springboot的项目,添加rabbitMQ的依赖
<!-- SpringBoot web启动器 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- SpringBoot amqp启动器 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!-- SpringBoot 测试启动器 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <!-- 数据库连接--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> <!-- mybatis 连接--> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <scope>provided</scope> </dependency>
配置rabbitMq如下
spring: rabbitmq: #配置文件 host: ##### #ip port: 5672 username: ### password: #### virtual-host: / connection-timeout: 15000 publisher-confirm-type: correlated #开启 confirms 回调 P → Exchange publisher-returns: true # 开启 returnedMessage 回调 Exchange → Queue template: mandatory: true # 抵达队列异步发送有效回调 listener: simple: acknowledge-mode: manual # 表示消费者消费成功消息以后需要手工的进行签收(ack),默认为auto concurrency: 5 #当前线线程数 max-concurrency: 10 # 最大线程数 prefetch: 10 retry: enabled: true max-attempts: 5 max-interval: 10000ms # 重试最大间隔时间10s initial-interval: 2000ms # 重试初始间隔时间2s multiplier: 2 # 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间,重试时间依次是2s,4s,8s,10s
#消息message
com.acrdpm.smart_topic_queue=smart.queue
com.acrdpm.smart_topic_exchange=smart.exchange
com.acrdpm.smart_topic_routingKey=smart.routing.key
#延迟队列
com.acrdpm.delayed_queue=delayed.queue
com.acrdpm.delayed_exchange=delayed.exchange
com.acrdpm.delayed_routingKey=delayed.routing.key
配置rabbitMq配置文件
@Configuration @Slf4j //启用rabbitmQ @EnableRabbit @Getter public class RabbitConfig { private final RabbitTemplate rabbitTemplate; // 将配置文件封装成工具类 private final RabbitPropertiesConfig rabbitPropertiesConfig; // 消息备份类 private final MsgLogService msgLogService; public RabbitConfig(RabbitTemplate rabbitTemplate, RabbitPropertiesConfig rabbitPropertiesConfig, MsgLogService msgLogService) { this.rabbitTemplate = rabbitTemplate; this.rabbitPropertiesConfig = rabbitPropertiesConfig; this.msgLogService = msgLogService; } /** * 定义硬件需要的topic * @return */ @Bean public Queue smartQueue() { return new Queue(rabbitPropertiesConfig.getSmart_topic_queue(), true); } @Bean public TopicExchange smartExchange() { return new TopicExchange(rabbitPropertiesConfig.getSmart_topic_exchange(), true, false); } @Bean public Binding smartBinding() { return BindingBuilder.bind( smartQueue()).to(smartExchange()).with(rabbitPropertiesConfig.getSmart_topic_routingKey()); } /** * 定义延迟队列 */ @Bean public Queue delayedQueue(){ return new Queue(rabbitPropertiesConfig.getDelayed_queue()); } @Bean public CustomExchange delayedExchange(){ Map<String, Object> args = new HashMap<>(); //自定义交换机的类型 args.put("x-delayed-type", "direct"); return new CustomExchange(rabbitPropertiesConfig.getDelayed_exchange(), "x-delayed-message", true, false, args); } @Bean public Binding bindingDelayedQueue(){ return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(rabbitPropertiesConfig.getDelayed_routingKey()).noargs(); } /** * 定制RabbitTemplate * 1、服务收到消息就会回调 * 1、spring.rabbitmq.publisher-confirms: true * 2、设置确认回调 * 2、消息正确抵达队列就会进行回调 * 1、spring.rabbitmq.publisher-returns: true * spring.rabbitmq.template.mandatory: true * 2、设置确认回调ReturnCallback * <p> * 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息) * MyRabbitConfig对象创建完成以后,执行这个方法 */ @PostConstruct public void initRabbitTemplate() { /** * 1、只要消息抵达Broker就ack=true * correlationData:当前消息的唯一关联数据(这个是消息的唯一id) * ack:消息是否成功收到 * cause:失败的原因 */ rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { log.info("confirm...correlationData[,{}",correlationData); log.info("ack是:{}",ack); log.info("case是:{}",cause); System.out.println("confirm...correlationData[" + correlationData + "]==>ack:[" + ack + "]==>cause:[" + cause + "]"); if (ack) { log.info("消息成功发送到Exchange"); String msgId = correlationData.getId(); msgLogService.updateStatus(msgId, MsgLogStatus.DELIVER_SUCCESS); } else { log.info("消息发送到Exchange失败, {}, cause: {}", correlationData, cause); } }); // 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调 rabbitTemplate.setMandatory(true); /** * 只要消息没有投递给指定的队列,就触发这个失败回调 * message:投递失败的消息详细信息 * replyCode:回复的状态码 * replyText:回复的文本内容 * exchange:当时这个消息发给哪个交换机 * routingKey:当时这个消息用哪个路邮键 * 修改数据库状态 */ rabbitTemplate.setReturnsCallback((returnCallback) -> { Message message = returnCallback.getMessage(); String exchange = returnCallback.getExchange(); int replyCode = returnCallback.getReplyCode(); String routingKey = returnCallback.getRoutingKey(); String replyText = returnCallback.getReplyText(); if(rabbitPropertiesConfig.getDelayed_exchange().equals(exchange)){ /** * 使用了x-delayed-message 延迟插件,结果每次都强制触发returnedMessage回调方法 * 因为发送方确实没有投递到队列上,只是在交换器上暂存,等过期时间到了 才会发往队列。 * 并非是BUG,而是有原因的,所以使用利用if去拦截这个异常,判断延迟队列交换机名称,然后break; */ log.info("如果是延迟队列那么break"); return; } log.info("Fail Message[" + message + "]==>replyCode[" + replyCode + "]" + "==>replyText[" + replyText + "]==>exchange[" + exchange + "]==>routingKey[" + routingKey + "]"); log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey,replyCode,replyText,message); //todo 没有发送到指定的队列 数据暂存到数据库认定消费失败 再次重新上传 }); } }
注意rabbitMq延迟队列需要安装插件,可参考官网
配置日志消息类
@Service @Slf4j public class MsgLogService { private final MsgLogMapper msgLogMapper; public MsgLogService(MsgLogMapper msgLogMapper) { this.msgLogMapper = msgLogMapper; } public void saveMsg(MsgLog msgLog){ msgLogMapper.insert(msgLog); } public void updateStatus(String msgId, Integer status) { log.info("执行"); msgLogMapper.updateStatus(msgId,status); } public MsgLog selectByMsgId(String msgId) { if (!ObjectUtils.isEmpty(msgId)){ return msgLogMapper.seletMsgFormsgId(msgId); } return null; } public List<MsgLog> selectTimeoutMsg() { return msgLogMapper.selectTimeOutMsg(); } public void updateTryCount(String msgId, Integer tryCount) { MsgLog msgLog = new MsgLog(); msgLog.setMsgId(msgId); msgLog.setTryCount(tryCount); msgLogMapper.updateByMsgId(msgLog); } }
@Data @NoArgsConstructor public class MsgLog { private static final long serialVersionUID = 4990197789742500403L; private String msgId; private JSONObject msg; private String exchange; private String routingKey; private Integer status; private Integer tryCount; private String nextTryTime; private String createTime; private String updateTime; private String msgCase; }
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="com.cvdmp.dao.MsgLogMapper"> <insert id="insert" parameterType="com.cvdmp.domain.entity.MsgLog"> insert into msg_log <trim prefix="(" suffix=")" suffixOverrides=","> <if test="msgId != null"> msg_id, </if> <if test="exchange != null"> exchange, </if> <if test="routingKey != null"> routing_key, </if> <if test="status != null"> status, </if> <if test="tryCount != null"> try_count, </if> <if test="nextTryTime != null"> next_try_time, </if> <if test="createTime != null"> create_time, </if> <if test="updateTime != null"> update_time, </if> <if test="msg != null"> msg, </if> <if test="msgCase!=null"> msg_case, </if> </trim> <trim prefix="values (" suffix=")" suffixOverrides=","> <if test="msgId != null"> #{msgId,jdbcType=VARCHAR}, </if> <if test="exchange != null"> #{exchange,jdbcType=VARCHAR}, </if> <if test="routingKey != null"> #{routingKey,jdbcType=VARCHAR}, </if> <if test="status != null"> #{status,jdbcType=INTEGER}, </if> <if test="tryCount != null"> #{tryCount,jdbcType=INTEGER}, </if> <if test="nextTryTime != null"> #{nextTryTime}, </if> <if test="createTime != null"> #{createTime}, </if> <if test="updateTime != null"> #{updateTime}, </if> <if test="msg != null"> #{msg,typeHandler=com.cvdmp.service.handler.JsonObjectTypeHandler}, </if> <if test="msgCase !=null"> #{msgCase}, </if> </trim> </insert> <update id="updateStatus" parameterType="map"> update msg_log set status = #{status}, update_time = now() where msg_id = #{msgId} </update> <select id="selectTimeOutMsg" resultType="com.cvdmp.domain.entity.MsgLog"> select * from msg_log where status = 0 and next_try_time <= now() </select> <select id="seletMsgFormsgId" parameterType="string" resultType="com.cvdmp.domain.entity.MsgLog"> select * from msg_log where msg_id = #{msgId} </select> <update id="updateByMsgId" parameterType="com.cvdmp.domain.entity.MsgLog"> update msg_log <set> <if test="exchange != null"> exchange = #{exchange,jdbcType=VARCHAR}, </if> <if test="routingKey != null"> routing_key = #{routingKey,jdbcType=VARCHAR}, </if> <if test="status != null"> status = #{status,jdbcType=INTEGER}, </if> <if test="tryCount != null"> try_count = #{tryCount,jdbcType=INTEGER}, </if> <if test="nextTryTime != null"> next_try_time = #{nextTryTime}, </if> <if test="createTime != null"> create_time = #{createTime}, </if> <if test="updateTime != null"> update_time = #{updateTime}, </if> <if test="msg != null"> msg = #{msg,typeHandler=com.cvdmp.service.handler.JsonObjectTypeHandler}, </if> </set> where msg_id = #{msgId,jdbcType=VARCHAR} </update> </mapper>
最后我们定义生产者和消费者
/** * mq消息推送策略 * 1、通过rabbitmq完成消息的推送保证消息推送成功 * @author daizhihua * @time 2023/4/25 */ @Component(value = "mqStrategy") public class MqStrategyService { private final RabbitConfig rabbitConfig; private final MsgLogService msgLogService; public MqStrategyService(RabbitConfig rabbitConfig, MsgLogService msgLogService) { this.rabbitConfig = rabbitConfig; this.msgLogService = msgLogService; } public void sendMessage(JSONObject map, HttpServletRequest request) { RabbitPropertiesConfig rabbitPropertiesConfig = rabbitConfig.getRabbitPropertiesConfig(); String msgId = RandomUtil.getRandomNumber(32); //设置消息id map.put("msgId",msgId); MsgLog msgLog = new MsgLog(); msgLog.setMsgId(msgId); msgLog.setMsg(map); msgLog.setExchange(rabbitPropertiesConfig.getSmart_topic_exchange()); msgLog.setRoutingKey(rabbitPropertiesConfig.getSmart_topic_routingKey()); msgLog.setNextTryTime(DateUtil.getNow()); msgLogService.saveMsg(msgLog); //生成消息的唯一id CorrelationData correlationData = new CorrelationData(msgId); RabbitTemplate rabbitTemplate = rabbitConfig.getRabbitTemplate(); // 发送消息 rabbitTemplate.convertAndSend(rabbitPropertiesConfig.getSmart_topic_exchange(), rabbitPropertiesConfig.getSmart_topic_routingKey(), map, correlationData); } /** * 发送延迟队列消息 * @param map * @param delayTime */ public void sendMessageDelay(JSONObject map,int delayTime){ RabbitPropertiesConfig rabbitPropertiesConfig = rabbitConfig.getRabbitPropertiesConfig(); String msgId = RandomUtil.getRandomNumber(32); //设置消息id map.put("msgId",msgId); MsgLog msgLog = new MsgLog(); msgLog.setMsgId(msgId); msgLog.setMsg(map); msgLog.setExchange(rabbitPropertiesConfig.getDelayed_exchange()); msgLog.setRoutingKey(rabbitPropertiesConfig.getDelayed_routingKey()); msgLog.setNextTryTime(DateUtil.getNow()); //生成消息的唯一id CorrelationData correlationData = new CorrelationData(msgId); RabbitTemplate rabbitTemplate = rabbitConfig.getRabbitTemplate(); rabbitTemplate.convertAndSend(rabbitPropertiesConfig.getDelayed_exchange(), rabbitPropertiesConfig.getDelayed_routingKey(), map, message -> { message.getMessageProperties().setDelay(delayTime); return message;},correlationData); } }
延迟队列的消费
@Slf4j @Component @RabbitListener(queues = "${com.acrdpm.delayed_queue}") public class MessageConsumer { private final MqStrategyService mqStrategyService; public MessageConsumer(MqStrategyService mqStrategyService) { this.mqStrategyService = mqStrategyService; } @RabbitHandler public void consume(Message message, JSONObject map, Channel channel) throws IOException { System.out.println("First Queue received msg : " ); log.info("数据是:{}",map); System.out.println(message); System.out.println(channel); long tag = message.getMessageProperties().getDeliveryTag(); channel.basicAck(tag, false); } }
订阅消息的消费者
@Slf4j @Component @RabbitListener(queues = {"${com.acrdpm.smart_topic_queue}"}) public class SmartConsumer { private MsgLogService msgLogService; @RabbitHandler public void consume(Message message, JSONObject mail, Channel channel) throws IOException { log.info("接收到消息了"); log.info("消息 {}",message); log.info("收到的消息是:{}",mail); long tag = message.getMessageProperties().getDeliveryTag(); channel.basicAck(tag, false); String msgId = mail.getMsgId(); MsgLog msgLog = msgLogService.selectByMsgId(msgId); if (null == msgLog || msgLog.getStatus().equals(MsgLogStatus.CONSUMED_SUCCESS)) { // 消费幂等性:确定不是重复的消息:及消费完成的消息 log.info("重复消费, msgId: {}", msgId); return; } //获取投送标签 long tag = message.getMessageProperties().getDeliveryTag(); // boolean success = false; // if (success) { // log.info("成功发送消息"); // msgLogService.updateStatus(msgId, MsgLogStatus.CONSUMED_SUCCESS); // // 消费确认手动ack // channel.basicAck(tag, false); // } else { // channel.basicAck(tag, false); // } // try { boolean success = EmailUtil.sendEmail(mail); // // } catch (EmailException e) { // log.error("email 发送异常" , e); // } catch (IOException e) { // log.error("消息处理异常" , e); // } } }
在发送消息的过程中,肯定会出现网络异常等情况所以我们定义了发送消息的持久化,为了保证一致性,可参考如下时序图
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。