赞
踩
订阅发布机制,生产者发布消息到队列,消费者订阅该队列,内部的监听机制监听到订阅的队列有消息,就会调用相关的方法进行处理;
生产者、消费者、服务器Server(vhost虚拟主机)、exchange交换机、queue队列、routing key路由键、binding key绑定键
生产者发送消息带有routing key的消息头,交换机和队列通过bandging key进行绑定,消息的消息头和banding key相匹配决定消息路由到哪个队列中;
RabbitMQ 的交换机有四种类型:fanout、direct、topic、headers。
direct:交换机的binding key 和消息的routing key相同;
topic:交换机的binding key 和消息的rounting key匹配 如:.orange. Routing key为三个单词,且中间的单词为 orange ;
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * RabbitMQ配置 */ @Slf4j @Configuration public class RabbitMQConfig { /** * 用户服务统一使用的交换机 */ public static final String EXCHANGE_ADMIN = "exchange.admin"; /** * 接收用户访问页面的消息,由admin-service服务统一处理 */ public static final String QUEUE_ADMIN_VISIT = "admin.visit"; public static final String ROUTING_KEY_ADMIN_VISIT = "admin.visit"; @Bean public Exchange topicExchangeAdmin() { log.info("【业务交换机 " + EXCHANGE_ADMIN + " 创建成功】"); return ExchangeBuilder.topicExchange(EXCHANGE_ADMIN).durable(true).build(); } /** * 创建队列 QUEUE_ADMIN_VISIT * * @return */ @Bean public Queue queueAdminVisit() { log.info("【业务队列 " + QUEUE_ADMIN_VISIT + " 创建成功】"); return QueueBuilder.durable(QUEUE_ADMIN_VISIT).build(); } /** * 声明交换机绑定队列 EXCHANGE_ADMIN ->【ROUTING_KEY_ADMIN_VISIT】-> QUEUE_ADMIN_VISIT * * @return */ @Bean public Binding bindingExchangeAdminVisit() { log.info("【业务交换机 " + EXCHANGE_ADMIN + " 与业务队列 " + QUEUE_ADMIN_VISIT + " 绑定成功】"); return BindingBuilder.bind(queueAdminVisit()).to(topicExchangeAdmin()).with(ROUTING_KEY_ADMIN_VISIT).noargs(); } //----------------------------我的统计数据发生变更后的消息队列,用于 advisory服务--------------------------------- /** * 我的统计信息 交换机 */ public static final String EXCHANGE_MY_STATISTICS = "exchange.myStatistics"; /** * 我的统计信息 --消息的路由key */ public static final String ROUTING_KEY_STATISTICS_ADVISORY_MODIFY = "myStatistics.advisory.modify"; /** * 我的统计信息 --消息队列 */ public static final String QUEUE_STATISTICS_ADVISORY_MODIFY = "myStatistics.advisory.modify"; /** * 实例化交换机 * * @return */ @Bean public Exchange topicExchangeMyStatistics() { log.info("【业务交换机 " + EXCHANGE_MY_STATISTICS + " 创建成功】"); return ExchangeBuilder.topicExchange(EXCHANGE_MY_STATISTICS).durable(true).build(); } /** * 创建队列 QUEUE_PRODUCT_MODIFY * * @return */ @Bean public Queue queueStatisticsAdvisoryModify() { log.info("【业务队列 " + QUEUE_STATISTICS_ADVISORY_MODIFY + " 创建成功】"); return QueueBuilder.durable(QUEUE_STATISTICS_ADVISORY_MODIFY).build(); } /** * 声明交换机绑定队列 * * @return */ @Bean public Binding bindingExchangeStatisticsAdvisoryModify() { log.info("【业务交换机 " + EXCHANGE_MY_STATISTICS + " 与业务队列 " + QUEUE_STATISTICS_ADVISORY_MODIFY + " 绑定成功】"); return BindingBuilder.bind(queueStatisticsAdvisoryModify()).to(topicExchangeMyStatistics()).with(ROUTING_KEY_STATISTICS_ADVISORY_MODIFY).noargs(); } }
import java.util.Date; @Service @Slf4j public class VisitResRecordService { @Autowired private IDGenerator idGenerator; @Autowired private RabbitTemplate rabbitTemplate; /** * 发送访问记录消息到MQ中 * * @param visitResourceVo */ public void sendVisitResRecordMessage(VisitResourceVo visitResourceVo) { if (null == visitResourceVo) { return; } User user = UserContextHolder.currentUser(); //由于某些页面特别长,如访问cms的视频时,会导致MQ接收消息的字段不够长,造成报错,所以这里进行拦截 if (visitResourceVo.getRequestUrl() != null && visitResourceVo.getRequestUrl().length() > 200) { visitResourceVo.setRequestUrl(visitResourceVo.getRequestUrl().substring(0, 200)); } if (!StringUtils.isEmpty(visitResourceVo.getInvitationCode())) { //前端传入的邀请码是加密后的,所以这里需要进行解码 try { visitResourceVo.setInvitationCode(DesHelper.decrypt(visitResourceVo.getInvitationCode())); } catch (Exception e) { e.printStackTrace(); } } VisitRecordRecordMessage visitRecordRecordMessage = null; //构造消息对象,根据当前请求中是否有客户信息进行判断 if (user != null) { visitRecordRecordMessage = VisitRecordRecordMessage.builder() .userIdentifyId(visitResourceVo.getUserIdentifyId()) .userToken(visitResourceVo.getUserToken()) .userType(visitResourceVo.getUserType()) .userId(user.getUserId()) .userName(user.getUserName()) .userLoginType(user.getUserLoginType()) .visitDate(new Date()) .resourceType(visitResourceVo.getResourceType()) .requestUrl(visitResourceVo.getRequestUrl()) .requestParams(visitResourceVo.getRequestParams()) .previousPageUrl(visitResourceVo.getPreviousPageUrl()) .weixinOpenId(visitResourceVo.getWeixinOpenId()) .invitationCode(visitResourceVo.getInvitationCode()) .iamSource(visitResourceVo.getIamSource()) .build(); } else { visitRecordRecordMessage = VisitRecordRecordMessage.builder() .userIdentifyId(visitResourceVo.getUserIdentifyId()) .userToken(visitResourceVo.getUserToken()) .userType(visitResourceVo.getUserType()) .visitDate(new Date()) .resourceType(visitResourceVo.getResourceType()) .requestUrl(visitResourceVo.getRequestUrl()) .requestParams(visitResourceVo.getRequestParams()) .previousPageUrl(visitResourceVo.getPreviousPageUrl()) .weixinOpenId(visitResourceVo.getWeixinOpenId()) .invitationCode(visitResourceVo.getInvitationCode()) .iamSource(visitResourceVo.getIamSource()) .build(); } long id = idGenerator.nextId(); MessageObject messageObject = MessageObject.builder().id(id) .messageTypeEnum(MessageTypeEnum.VisitResourceRecord) .messageBase(visitRecordRecordMessage) .user(UserContextHolder.currentUser()) .company(CompanyContextHolder.currentCompany()) .dept(DeptContextHolder.currentDept()).build(); //为消息设置消息ID,以便在回调时通过该ID来判断是对哪个消息的回调 CorrelationData correlationData = new CorrelationData(String.valueOf(id)); log.info("访问资源记录,消息发送到交换机:{} 消息id:{}, msg:{}", RabbitMQConfig.EXCHANGE_ADMIN, correlationData.getId(), messageObject); //发送消息 this.rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_ADMIN, RabbitMQConfig.ROUTING_KEY_ADMIN_VISIT, JSON.toJSONString(messageObject, SerializerFeature.WriteMapNullValue, SerializerFeature.WriteClassName, SerializerFeature.DisableCircularReferenceDetect), correlationData); } /** * 接收MQ中的访问记录消息,往将数据发给ES,后续客户访问的数据全部从ES中获取 */ public void receiveVisitResRecordMessage() { } }
/** * 接收商城订单服务的请求,根据健康服务商品的数量生成工单 */ @Slf4j @Component @RabbitListener(queues = {RabbitMQConfig.QUEUE_ADMIN_VISIT, RabbitMQConfig.QUEUE_PRODUCT_APPROVE, RabbitMQConfig.QUEUE_PRODUCT_UNPUBLISH, RabbitMQConfig.QUEUE_PRODUCT_PUBLISH, RabbitMQConfig.QUEUE_PRODUCT_RECEIVE, RabbitMQConfig.QUEUE_CUSTOMER_REWARD, RabbitMQConfig.QUEUE_CUSTOMER_CPR_REGIST_EXTERNAL, RabbitMQConfig.QUEUE_CUSTOMER_GIVE_VOTE_CHANCE }) public class ReceiveMessageListener { @Autowired private MessageQueueProcessMainService messageQueueProcessMainService; @Autowired private MessageQueueProcessService messageQueueProcessService; @Autowired private MessageQueueAbnormalService messageQueueAbnormalService; @Autowired private IDGenerator idGenerator; /** * 处理异常消息的线程池 */ private static ThreadPoolExecutor threadPoolExecutorForAbnormalMessage; static { threadPoolExecutorForAbnormalMessage = new ThreadPoolExecutor( //线程池维护线程的最少数量 2, //线程池维护线程的最大数量 10, //线程池维护线程所允许的空闲时间 120, TimeUnit.SECONDS, //线程池所使用的缓冲队列 new ArrayBlockingQueue<Runnable>(100), //加入失败,则在调用的主线程上执行 new ThreadPoolExecutor.CallerRunsPolicy()); } /** * 监听MQ,消息推送至APP端(采用第三方友盟的服务推送) * * @param messageObj * @param deliveryTag * @param channel * @throws IOException */ @RabbitHandler public void process(@Payload Object messageObj, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException { try { if (messageObj instanceof Message) { //获取MQ中的完整消息 String messageStr = new String(((Message) messageObj).getBody(), "utf-8"); MessageObject myMessageObject = JSONObject.parseObject(messageStr, new TypeReference<MessageObject>() { }); //获取MQ中完整消息的 业务消息 JSONObject messageJsonObject = JSONObject.parseObject(messageStr); String messageBaseStr = messageJsonObject.get("messageBase").toString(); log.info("消息id: " + myMessageObject.getId()); log.info("用户: " + myMessageObject.getUser()); log.info("收到消息: " + messageBaseStr); log.info("完整的消息: " + myMessageObject); long id = idGenerator.nextId(); //对于可能存在大量用户触发且不重要的消息,不用通过DB进行记录和控制 if (MessageQueueUtils.isNeedRecordInDB(myMessageObject)) { //插入一笔状态为未处理的记录 int inserted = messageQueueProcessService.insert(id , myMessageObject.getId(), myMessageObject.getMessageTypeEnum() , messageBaseStr); //判断这个消息之前是否已经处理过 if (inserted <= 0) { //由于消息已经存在,需要通过状态确认消息是否处理完成 MessageQueueProcess messageQueueProcess = messageQueueProcessService.findByMqId(myMessageObject.getId()); //如果是消息已处理,则直接返回 if (MessageQueueProcessConstants.Processed.equals(messageQueueProcess.getStatus())) { log.info("----重复消息,不用处理----" + id); channel.basicAck(deliveryTag, false); return; } id = messageQueueProcess.getId(); } } //设置用户上下文,解决服务间RPC调用的权限问题 UserContextHolder.set(myMessageObject.getUser()); CompanyContextHolder.set(myMessageObject.getCompany()); DeptContextHolder.set(myMessageObject.getDept()); //由于需要使用DB自带的行锁,需要在之前把要锁的数据插入到DB中,否则会引起表锁 boolean process = messageQueueProcessMainService.process(id, myMessageObject, messageBaseStr); channel.basicAck(deliveryTag, false); if (process) { log.info("消息处理完成 " + id); } } } catch (Exception e) { log.error("消息处理出错 " + e.getMessage()); try { //异常消息放入队列 if (messageObj instanceof Message) { //获取MQ中的完整消息 String messageStr = new String(((Message) messageObj).getBody(), "utf-8"); //获取消息对象 MessageObject myMessageObject = JSONObject.parseObject(messageStr, new TypeReference<MessageObject>() { }); //对于可能存在大量用户触发且不重要的消息,不用通过DB进行记录和控制 if (!MessageQueueUtils.isNeedRecordInDB(myMessageObject)) { return; } //获取消息id Long mqId = myMessageObject.getId(); //判断异常消息是否已经在DB中,如果不存在则insert boolean isExists = messageQueueAbnormalService.checkIsExists(mqId); if (!isExists) { //异常消息存入DB messageQueueAbnormalService.insert(idGenerator.nextId(), mqId, myMessageObject.getMessageTypeEnum(), messageStr); } } } catch (Exception ex) { ex.printStackTrace(); log.error("异常消息记录失败 " + e.getMessage()); } //发现异常后,通过新开辟的线程,将消息重新入队 //通过线程睡眠的方式,避免造成消息不断重复投递的死循环,造成CPU资源浪费,同时可以避免不断打印日志,防止日志文件撑爆服务器 threadPoolExecutorForAbnormalMessage.execute(() -> { try { //10秒后重新入MQ,防止不断的循环 Thread.sleep(10 * 1000); //消息重新放入队列 channel.basicNack(deliveryTag, false, true); } catch (Exception ex) { ex.printStackTrace(); } }); } } }
/** * 队列消息处理主服务 */ @Service @Slf4j public class MessageQueueProcessMainService { @Autowired private MessageQueueProcessService messageQueueProcessService; @Autowired private MessageQueueAbnormalService messageQueueAbnormalService; @Autowired private VisitRecordForESService visitRecordForESService; @Autowired private OrganizationService organizationService; @Autowired private RewordService rewordService; @Autowired private CprService cprService; @Autowired private MarketingCompetitionService marketingCompetitionService; @Transactional(rollbackFor = Exception.class) public boolean process(Long id, MessageObject myMessageObject, String messageBase) { //对于可能存在大量用户触发且不重要的消息,不用通过DB进行记录和控制 if (MessageQueueUtils.isNeedRecordInDB(myMessageObject)) { log.info("锁定记录ID:" + id); //通过DB的行锁,进行加锁处理(事物完成或回滚后会自动释放锁) messageQueueProcessService.lockById(id); //再次判断是否处理成功,使用双重判断加锁机制 MessageQueueProcess messageQueueProcess = messageQueueProcessService.findById(id); if (MessageQueueProcessConstants.Processed.equals(messageQueueProcess.getStatus())) { log.info("----重复消息,不用处理----" + id); return false; } } SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); log.info("MessageQueueProcessMainService.process 方法中,内嵌业务逻辑 开始处理时间 " + simpleDateFormat.format(new Date())); //处理页面访问资源的消息 if (MessageTypeEnum.VisitResourceRecord.equals(myMessageObject.getMessageTypeEnum())) { VisitRecordRecordMessage visitRecordRecordMessage = JSONObject.parseObject(messageBase, new TypeReference<VisitRecordRecordMessage>() { }); log.info("调用visitRecordForESService.record方法,记录前端访问的页面"); visitRecordForESService.record(visitRecordRecordMessage); } else { throw new BaseException("MessageQueueProcessMainService.process方法传入的队列参数值有误"); } log.info("MessageQueueProcessMainService.process 方法中,内嵌业务逻辑 结束处理时间 " + simpleDateFormat.format(new Date())); if (MessageQueueUtils.isNeedRecordInDB(myMessageObject)) { //更改状态 messageQueueProcessService.updateStatus2Finish(id); //删除消息异常表中对应的记录 messageQueueAbnormalService.delete(myMessageObject.getId()); } log.info("----完成业务处理----" + id); return true; } }
/** * 消息队列辅助类相关的服务 */ public interface MessageQueueMapper { //--------------------------消息处理相关--------------------------------- /** * 消息处理表中插入记录 * * @param messageQueueProcess * @return */ @Insert("insert into message_queue_process(id, create_by, create_time, mq_id, mq_type, message_part, status) " + " values(#{id}, #{createBy}, #{createTime}, #{mqId}, #{messageTypeEnum}, #{messagePart}, #{status})") public int insertMessageQueueProcess(MessageQueueProcess messageQueueProcess); /** * 通过消息mqId获取记录 * * @param mqId * @return */ @Select("select id, status from message_queue_process where mq_id=#{mqId}") public MessageQueueProcess findMessageQueueProcessByMqId(Long mqId); /** * 锁定id所在记录 * * @param id * @return */ @Select("select 1 from message_queue_process where id=#{id} for update") public MessageQueueProcess lockMessageQueueProcessById(Long id); /** * 通过id获取消息处理的状态 * * @param id * @return */ @Select("select status from message_queue_process where id=#{id}") public MessageQueueProcess findMessageQueueProcessById(Long id); /** * 通过id更新消息处理的状态 * * @param id * @param status * @return */ @Update("update message_queue_process set status=#{status} where id=#{id}") public int updateMessageQueueProcessStatus(Long id, String status); //--------------------------异常消息相关--------------------------------- /** * 通过消息mqId获取异常记录 * * @param mqId * @return */ @Select("select id from message_queue_abnormal where mq_id=#{mqId}") public MessageQueueAbnormal findMessageQueueAbnormalByMqId(Long mqId); /** * 消息异常表,插入数据 * * @param messageQueueAbnormal * @return */ @Insert("insert into message_queue_abnormal(id, create_time, mq_id, mq_type, message_all) " + " values(#{id}, now(), #{mqId}, #{messageTypeEnum}, #{messageAll})") public int insertMessageQueueAbnormal(MessageQueueAbnormal messageQueueAbnormal); /** * 根据消息mqId删除消息异常表的记录 * * @param mqId * @return */ @Delete("delete from message_queue_abnormal where mq_id = #{mqId}") public int deleteMessageQueueAbnormal(Long mqId); }
其实这个例子里面少了binding key,只是这里将binding key和routing key合为一个;
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。