赞
踩
Springboot 1.5.6.RELEAS
Springcloud Dalston.SR2
交换机是用来发送消息的AMQP实体。交换机拿到一个消息之后将它路由给一个或零个队列。它使用哪种路由算法是由交换机类型和被称作绑定(bindings)的规则所决定的。AMQP 0-9-1的代理提供了四种交换机
Name(交换机类型) Default pre-declared names(预声明的默认名称)
Direct exchange(直连交换机) (Empty string) and amq.direct
Fanout exchange(扇型交换机) amq.fanout
Topic exchange(主题交换机) amq.topic
Headers exchange(头交换机) amq.match (and amq.headers in RabbitMQ)
绑定(Binding)是交换机(exchange)将消息(message)路由给队列(queue)所需遵循的规则。如果要指示交换机“E”将消息路由给队列“Q”,那么“Q”就需要与“E”进行绑定。绑定操作需要定义一个可选的路由键(routing key)属性给某些类型的交换机。路由键的意义在于从发送给交换机的众多消息中选择出某些消息,将其路由给绑定的队列。
打个比方:
队列(queue)是我们想要去的位于纽约的目的地
交换机(exchange)是JFK机场
绑定(binding)就是JFK机场到目的地的路线。能够到达目的地的路线可以是一条或者多条
拥有了交换机这个中间层,很多由发布者直接到队列难以实现的路由方案能够得以实现,并且避免了应用开发者的许多重复劳动。
如果AMQP的消息无法路由到队列(例如,发送到的交换机没有绑定队列),消息会被就地销毁或者返还给发布者。如何处理取决于发布者设置的消息属性。
RabbitMq默认是ack机制:no-ack的方式
执行一个任务可能需要花费几秒钟,你可能会担心如果一个消费者在执行任务过程中挂掉了。一旦RabbitMQ将消息分发给了消费者,就会从内存中删除。在这种情况下,如果正在执行任务的消费者宕机,会丢失正在处理的消息和分发给这个消费者但尚未处理的消息。
实际项目需要手动ack机制 - 见下问实战代码
用户在停止查询时,会导致消费者进程被杀死,因此ACK状态码未反馈至MQ,从而消息一直存留在MQ中,当新的消费者启动时会重新消费;
接受消息后-消费消息前,db或者redis nosql检查消息消费状态 - 见下问实战代码
<!-- rabbbit.mq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
package com.ttd.trustProduct.mq.rabbit; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * rabbit 配置 * * @author wolf */ @Configuration @EnableRabbit public class RabbitConfiguration { //===============以下是验证topic Exchange的队列========== @Bean public Queue productMessage() { return new Queue("ttd.trust.product"); } @Bean public Queue allMessages() { return new Queue("ttd.all"); } @Bean TopicExchange exchange() { return new TopicExchange("exchange"); } /** * 将队列topic.message与exchange绑定,binding_key为topic.message,就是完全匹配 * * @param queueMessage * @param exchange * @return */ @Bean Binding bindingExchangeMessage(Queue productMessage, TopicExchange exchange) { return BindingBuilder.bind(productMessage).to(exchange).with("ttd.trust.product"); } /** * 将队列topic.messages与exchange绑定,binding_key为topic.#,模糊匹配 * * @param queueMessage * @param exchange * @return */ @Bean Binding bindingExchangeMessages(Queue allMessages, TopicExchange exchange) { return BindingBuilder.bind(allMessages).to(exchange).with("ttd.#"); } //===============以上是验证topic Exchange的队列========== //===============以下是验证Fanout Exchange的队列========== @Bean public Queue AMessage() { return new Queue("fanout.A"); } @Bean public Queue BMessage() { return new Queue("fanout.B"); } @Bean public Queue CMessage() { return new Queue("fanout.C"); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } @Bean Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(AMessage).to(fanoutExchange); } @Bean Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(BMessage).to(fanoutExchange); } @Bean Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(CMessage).to(fanoutExchange); } //===============以上是验证Fanout Exchange的队列========== @Bean public Queue helloQueue() { return new Queue("newhelloQueue"); } }
package com.ttd.sdk.common; import java.io.Serializable; import java.util.Date; import java.util.Map; import com.alibaba.fastjson.JSON; import com.ttd.sdk.util.DateUtil; import com.ttd.sdk.util.RandomUtils; /** * 消息体 * * @author wolf * */ public class MQMessage implements Serializable { private static final long serialVersionUID = 1L; private Integer productCode; // 生产者代码 private Integer consumerCode; // 消费者代码 private String messageId; // 消息唯一标识 private Integer event; // 消息监听事件 private Integer action; //操作:1加,2减 private Date created; // 消息发送时间 private Map<String, Object> bussinessBody; // 消息体,封装业务数据 private MQMessage() { super(); } private MQMessage(Integer productCode, Integer consumerCode, String messageId, Integer event, Date created, Map<String, Object> bussinessBody, Integer action) { super(); this.productCode = productCode; this.consumerCode = consumerCode; this.messageId = messageId; this.event = event; this.created = created; this.bussinessBody = bussinessBody; this.action = action; } private MQMessage(Integer productCode, Integer consumerCode, Integer event, Map<String, Object> bussinessBody, Integer action) { super(); this.productCode = productCode; this.consumerCode = consumerCode; this.event = event; this.bussinessBody = bussinessBody; this.action = action; } public static String productMQMessage(Integer productCode, Integer consumerCode, Integer event, Map<String, Object> bussinessBody, Integer action) { MQMessage mqObj = new MQMessage(productCode, consumerCode, event, bussinessBody, action); mqObj.setCreated(new Date()); mqObj.setMessageId(generatSeriaeNo()); return JSON.toJSONString(mqObj); } //生成消息唯一标识 private static String generatSeriaeNo() { return DateUtil.dateFormat("yyyyMMddHHmmss") + RandomUtils.randomCode(2); } public Integer getProductCode() { return productCode; } public void setProductCode(Integer productCode) { this.productCode = productCode; } public Integer getConsumerCode() { return consumerCode; } public void setConsumerCode(Integer consumerCode) { this.consumerCode = consumerCode; } public String getMessageId() { return messageId; } public void setMessageId(String messageId) { this.messageId = messageId; } public Integer getEvent() { return event; } public void setEvent(Integer event) { this.event = event; } public Date getCreated() { return created; } public void setCreated(Date created) { this.created = created; } public Map<String, Object> getBussinessBody() { return bussinessBody; } public void setBussinessBody(Map<String, Object> bussinessBody) { this.bussinessBody = bussinessBody; } public Integer getAction() { return action; } public void setAction(Integer action) { this.action = action; } @Override public String toString() { return "MQMessage [productCode=" + productCode + ", consumerCode=" + consumerCode + ", messageId=" + messageId + ", event=" + event + ", action=" + action + ", created=" + created + ", bussinessBody=" + bussinessBody + "]"; } }
生产者与消费者数据协议为json,定义统一数据传输实体。 package com.ttd.trustProduct.mq.rabbit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class ProductTopicSender { @Autowired private AmqpTemplate rabbitTemplate; private static final Logger logger = LoggerFactory.getLogger(ProductTopicSender.class); public void send(String msg) { System.out.println("ProductTopicSender : " + msg); this.rabbitTemplate.convertAndSend("exchange", "ttd.trust.product", msg); } }
package com.ttd.trustProduct.mq.rabbit; import javax.annotation.Resource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import com.alibaba.fastjson.JSON; import com.rabbitmq.client.Channel; import com.ttd.sdk.common.MQMessage; import com.ttd.sdk.common.enumerate.ActionCodeEnum; import com.ttd.sdk.common.enumerate.EventCodeEnum; import com.ttd.trustProduct.domain.LogMqMessage; import com.ttd.trustProduct.domain.SaleInfo; import com.ttd.trustProduct.service.LogMqMessageService; import com.ttd.trustProduct.service.SaleInfoService; import com.ttd.trustProduct.utils.IntegerUtils; @Component @RabbitListener(queues = "ttd.trust.product") public class ProductMessageReceiver { @Resource private SaleInfoService saleInfoService; @Resource private LogMqMessageService logMqMessageService; private final Logger logger = LoggerFactory.getLogger(ProductMessageReceiver.class); @Autowired private ConnectionFactory connectionff; /* * Map<String, Object> bussiness = Maps.newHashMap(); **公共必填:** bussiness.put("productId", 11); bussiness.put("companyId", 100); //B公司id bussiness.put("isSmallPerson", 1); //1 or 0 bussiness.put("assignType", 1) //'1指定派发,2抢购派发', **预约事件必填** bussiness.put("bookNum", 1); bussiness.put("bookAmount", 100); **报单事件必填** bussiness.put("formNum", 1); bussiness.put("formAmount", 100); **募集事件必填** bussiness.put("raiseNum", 1); bussiness.put("raiseAmount", 100); **签章事件必填** bussiness.put("signedNum", 1); //电子签章数 bussiness.put("paperSignedNum", 1);//纸质签章数 **双录事件必填** bussiness.put("signingNum", 100); */ @Bean public SimpleMessageListenerContainer messageContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionff); container.setQueueNames("ttd.trust.product"); container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(1); container.setConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认 container.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { byte[] body = message.getBody(); String jsonString = new String(body); logger.info("|============ProductMessageReceiver : " + jsonString); MQMessage msg = JSON.parseObject(jsonString, MQMessage.class); boolean preRet = preEventHandler(msg); if (preRet == false) { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); return ; } boolean postRet = postEventHandler(msg); if (postRet == false) { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); return ; } afterEventHandler(msg); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费 //channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); } }); return container; } /* @RabbitHandler public void process(String msg) { System.out.println("|============ProductMessageReceiver : " +msg); MQMessage message = JSON.parseObject(msg, MQMessage.class); boolean preRet = preEventHandler(message); if (preRet == false) return ; boolean postRet = postEventHandler(message); if (postRet == false) return ; afterEventHandler(message); }*/ private void recordLogMQ(MQMessage message, Integer state) { LogMqMessage log = new LogMqMessage(); log.setMessageId(message.getMessageId()); log.setProductCode(message.getProductCode()); log.setConsumerCode(message.getConsumerCode()); log.setEvent(message.getEvent()); log.setBussinessBody(JSON.toJSONString(message)); log.setState(state); logMqMessageService.insertEntry(log); } /** * 消息体检查 * @param message * @return */ private boolean preEventHandler(MQMessage message) { //不能重复消费 LogMqMessage logMQ = new LogMqMessage(); logMQ.setMessageId(message.getMessageId()); int count = logMqMessageService.selectEntryListCount(logMQ); System.out.println(count); if (count > 0) { return false; } //消息体格式错误 if (message.getEvent() == null || message.getAction() == null || message.getBussinessBody() == null || !IntegerUtils.greatThanZero((Integer) message.getBussinessBody().get("productId")) || !IntegerUtils.greatThanZero((Integer) message.getBussinessBody().get("companyId")) || (Integer) message.getBussinessBody().get("isSmallPerson") == null ) { recordLogMQ(message, -1); return false; } //业务类型检查 //预约事件 if (IntegerUtils.equals(message.getEvent(), EventCodeEnum.BOOK_EVENT.getValue())) { if (!IntegerUtils.greatThanZero((Integer) message.getBussinessBody().get("bookNum")) || !IntegerUtils.greatThanZero((Integer) message.getBussinessBody().get("bookAmount"))) { recordLogMQ(message, -1); return false; } //报单事件 } else if (IntegerUtils.equals(message.getEvent(), EventCodeEnum.FORM_EVENT.getValue())) { if (!IntegerUtils.greatThanZero((Integer) message.getBussinessBody().get("formNum")) || !IntegerUtils.greatThanZero((Integer) message.getBussinessBody().get("formAmount"))) { recordLogMQ(message, -1); return false; } //双录事件 } else if (IntegerUtils.equals(message.getEvent(), EventCodeEnum.RECORD_EVENT.getValue())) { if (!IntegerUtils.greatThanZero((Integer) message.getBussinessBody().get("signingNum"))) { recordLogMQ(message, -1); return false; } //签章事件 } else if (IntegerUtils.equals(message.getEvent(), EventCodeEnum.RECORD_EVENT.getValue())) { if (!IntegerUtils.greatThanZero((Integer) message.getBussinessBody().get("signedNum")) || !IntegerUtils.greatThanZero((Integer) message.getBussinessBody().get("paperSignedNum"))) { recordLogMQ(message, -1); return false; } //募集事件 } else if (IntegerUtils.equals(message.getEvent(), EventCodeEnum.RECORD_EVENT.getValue())) { if (!IntegerUtils.greatThanZero((Integer) message.getBussinessBody().get("raiseNum")) || IntegerUtils.greatThanZero((Integer) message.getBussinessBody().get("raiseAmount"))) { recordLogMQ(message, -1); return false; } } return true; } /** * 业务处理 * @param message * @return */ private boolean postEventHandler(MQMessage message) { Integer productId = (Integer) message.getBussinessBody().get("productId"); Integer companyId = (Integer) message.getBussinessBody().get("companyId"); Integer isSmallPerson = (Integer) message.getBussinessBody().get("isSmallPerson"); Integer assignType = (Integer) message.getBussinessBody().get("assignType"); //查询 SaleInfo saleInfo = new SaleInfo(); saleInfo.setCompanyId(IntegerUtils.parseLong(companyId)); saleInfo.setProductId(IntegerUtils.parseLong(productId)); saleInfo.setAssignType(assignType); SaleInfo cond = saleInfoService.selectEntryOne(saleInfo); //预约事件 if (IntegerUtils.equals(message.getEvent(), EventCodeEnum.BOOK_EVENT.getValue())) { Integer bookNum = (Integer) message.getBussinessBody().get("bookNum"); Integer bookAmount = (Integer) message.getBussinessBody().get("bookAmount"); //insert and plus if (cond == null && IntegerUtils.equals(ActionCodeEnum.PLUS_ACTION.getValue(), message.getAction())) { saleInfo.setBookTotalNum(bookNum); saleInfo.setBookTotalAmount(bookAmount); if (IntegerUtils.greatThanZero(isSmallPerson)) { saleInfo.setSmallPersonBookNum(bookNum); saleInfo.setSmallPersonBookAmount(bookAmount); } saleInfoService.insertEntry(saleInfo); //update } else { if (IntegerUtils.equals(ActionCodeEnum.PLUS_ACTION.getValue(), message.getAction())) { cond.setBookTotalNum(cond.getBookTotalNum() + bookNum); cond.setBookTotalAmount(cond.getBookTotalAmount() + bookAmount); if (IntegerUtils.greatThanZero(isSmallPerson)) { cond.setSmallPersonBookNum(cond.getSmallPersonBookNum() + bookNum); cond.setSmallPersonBookAmount(cond.getSmallPersonBookAmount() + bookAmount); } } else { cond.setBookTotalNum(cond.getBookTotalNum() - bookNum); cond.setBookTotalAmount(cond.getBookTotalAmount() - bookAmount); if (IntegerUtils.greatThanZero(isSmallPerson)) { cond.setSmallPersonBookNum(cond.getSmallPersonBookNum() - bookNum); cond.setSmallPersonBookAmount(cond.getSmallPersonBookAmount() - bookAmount); } } saleInfoService.updateByKey(cond); } //报单事件 } else if (IntegerUtils.equals(message.getEvent(), EventCodeEnum.FORM_EVENT.getValue())) { Integer formNum = (Integer) message.getBussinessBody().get("formNum"); Integer formAmount = (Integer) message.getBussinessBody().get("formAmount"); //insert and plus if (cond == null && IntegerUtils.equals(ActionCodeEnum.PLUS_ACTION.getValue(), message.getAction())) { saleInfo.setFormTotalNum(formNum); saleInfo.setFormTotalAmount(formAmount); if (IntegerUtils.greatThanZero(isSmallPerson)) { saleInfo.setSmallPersonFormNum(formNum); saleInfo.setSmallPersonFormAmount(formAmount); } saleInfoService.insertEntry(saleInfo); //update } else { if (IntegerUtils.equals(ActionCodeEnum.PLUS_ACTION.getValue(), message.getAction())) { cond.setFormTotalNum(cond.getFormTotalNum() + formNum); cond.setFormTotalAmount(cond.getFormTotalAmount() + formAmount); if (IntegerUtils.greatThanZero(isSmallPerson)) { cond.setSmallPersonFormNum(cond.getSmallPersonFormNum() + formNum); cond.setSmallPersonFormAmount(saleInfo.getSmallPersonFormAmount() + formAmount); } } else { cond.setFormTotalNum(cond.getFormTotalNum() - formNum); cond.setFormTotalAmount(cond.getFormTotalAmount() - formAmount); if (IntegerUtils.greatThanZero(isSmallPerson)) { cond.setSmallPersonFormNum(cond.getSmallPersonFormNum() - formNum); cond.setSmallPersonFormAmount(cond.getSmallPersonFormAmount() - formAmount); } } saleInfoService.updateByKey(cond); } //双录事件 } else if (IntegerUtils.equals(message.getEvent(), EventCodeEnum.RECORD_EVENT.getValue())) { Integer signingNum = (Integer) message.getBussinessBody().get("signingNum"); //insert and plus if (cond == null && IntegerUtils.equals(ActionCodeEnum.PLUS_ACTION.getValue(), message.getAction())) { saleInfo.setSigningContractNum(signingNum); saleInfoService.insertEntry(saleInfo); //update } else { if (IntegerUtils.equals(ActionCodeEnum.PLUS_ACTION.getValue(), message.getAction())) { cond.setSigningContractNum(cond.getSigningContractNum() + signingNum); } else { cond.setSigningContractNum(cond.getSigningContractNum() - signingNum); } saleInfoService.updateByKey(cond); } //签章事件 } else if (IntegerUtils.equals(message.getEvent(), EventCodeEnum.RECORD_EVENT.getValue())) { Integer signedNum = (Integer) message.getBussinessBody().get("signedNum"); Integer paperSignedNum = (Integer) message.getBussinessBody().get("paperSignedNum"); //insert and plus if (cond == null && IntegerUtils.equals(ActionCodeEnum.PLUS_ACTION.getValue(), message.getAction())) { saleInfo.setSignedContractNum(signedNum); saleInfo.setPaperSignedContractNum(paperSignedNum); saleInfoService.insertEntry(saleInfo); //update } else { if (IntegerUtils.equals(ActionCodeEnum.PLUS_ACTION.getValue(), message.getAction())) { cond.setSignedContractNum(cond.getSignedContractNum() + signedNum); cond.setPaperSignedContractNum(cond.getPaperSignedContractNum() + paperSignedNum); } else { cond.setSignedContractNum(cond.getSignedContractNum() - signedNum); cond.setPaperSignedContractNum(cond.getPaperSignedContractNum() - paperSignedNum); } saleInfoService.updateByKey(cond); } //募集事件 } else if (IntegerUtils.equals(message.getEvent(), EventCodeEnum.RECORD_EVENT.getValue())) { Integer raiseNum = (Integer) message.getBussinessBody().get("raiseNum"); Integer raiseAmount = (Integer) message.getBussinessBody().get("raiseAmount"); //insert and plus if (cond == null && IntegerUtils.equals(ActionCodeEnum.PLUS_ACTION.getValue(), message.getAction())) { saleInfo.setRaiseTotalNum(raiseNum); saleInfo.setRaiseTotalAmount(raiseAmount); saleInfoService.insertEntry(saleInfo); //update } else { if (IntegerUtils.equals(ActionCodeEnum.PLUS_ACTION.getValue(), message.getAction())) { cond.setRaiseTotalNum(cond.getRaiseTotalNum() + raiseNum); cond.setRaiseTotalAmount(cond.getRaiseTotalAmount() + raiseAmount); } else { cond.setRaiseTotalNum(cond.getRaiseTotalNum() - raiseNum); cond.setRaiseTotalAmount(cond.getRaiseTotalAmount() - raiseAmount); } saleInfoService.updateByKey(cond); } } return true; } /** * 消息日志处理 * @param message */ private void afterEventHandler(MQMessage message) { recordLogMQ(message, 1); } }
package com.ttd.test; import java.util.Map; import javax.annotation.Resource; import org.junit.Test; import com.google.common.collect.Maps; import com.ttd.sdk.common.MQMessage; import com.ttd.sdk.common.enumerate.ActionCodeEnum; import com.ttd.sdk.common.enumerate.EventCodeEnum; import com.ttd.sdk.common.enumerate.ServiceCodeEnum; import com.ttd.trustProduct.mq.kafka.MsgProducer; import com.ttd.trustProduct.mq.rabbit.ProductTopicSender; public class TestMsg extends BaseTestService{ @Resource private MsgProducer producer; @Resource private ProductTopicSender topicSender; // @Resource private FanoutSender fanoutSender; // @Resource private CallBackSender callBackSender; // @Resource private HelloSender1 hellSender1; //kafka // @Test public void testSendMsg() { String seriNo = System.currentTimeMillis()/1000 + ""; producer.send("edwintest", "{\"seriNo\":" + seriNo + ",\"code\":200, \"msg\":\"发送一条消息,1-topic-8partion-1-replication\"}"); } // topic exchange @Test public void testTopicRabbit() throws InterruptedException { Map<String, Object> bussiness = Maps.newHashMap(); bussiness.put("productId", 15); bussiness.put("companyId", 100); //B公司id bussiness.put("isSmallPerson", 1); //1 or 0 bussiness.put("assignType", 1); bussiness.put("bookNum", 1); bussiness.put("bookAmount", 100); /*bussiness.put("formNum", 1); bussiness.put("formAmount", 100); bussiness.put("raiseNum", 1); bussiness.put("raiseAmount", 100); bussiness.put("signedNum", 1); bussiness.put("signingNum", 100); */ String messageBody = MQMessage.productMQMessage( ServiceCodeEnum.TTD_TRUST_ORDER.getValue(), ServiceCodeEnum.TTD_TRUST_PRODUCT.getValue(), EventCodeEnum.BOOK_EVENT.getValue(), bussiness, ActionCodeEnum.PLUS_ACTION.getValue()); topicSender.send(messageBody); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。