赞
踩
生产者发送消息时采用雪花算法给消息设置唯一的消息id,消费者接收消息处理失败时,根据消息的唯一id统计失败次数,若没有达到失败次数限制,则让消息重回队列(在开启手动签收的前提),此时队列会再次给消费者发送消息;若达到失败次数限制,则让消息不重回队列,进入死信队列,等待人工处理
/** * 定义正常队列 * @return */ @Bean public Queue confirmQueue(){ return QueueBuilder.durable("confirm-queue") //绑定死信交换机 .deadLetterExchange("dlx-exchange") //设置路由键 .deadLetterRoutingKey("error") .build(); } /** * 定义正常交换机 * @return */ @Bean public DirectExchange confirmExchange(){ return ExchangeBuilder.directExchange("confirm-exchange").build(); } @Bean public Binding confirmBinding(){ return BindingBuilder.bind(confirmQueue()).to(confirmExchange()).with("info"); } /** * 死信交换机 * @return */ @Bean public DirectExchange dlxExchange(){ return ExchangeBuilder.directExchange("dlx-exchange").build(); } /** * 死信队列 * @return */ @Bean public Queue dlxQueue(){ return QueueBuilder.durable("dlx-queue").build(); } /** * 死信队列与死信交换机绑定 * @return */ @Bean public Binding dlxBind1(){ return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("error"); } @Bean public RabbitAdmin confirmRabbitAdmin(ConnectionFactory connectionFactory){ RabbitAdmin rabbitAdmin=new RabbitAdmin(connectionFactory); rabbitAdmin.declareExchange(confirmExchange()); rabbitAdmin.declareExchange(dlxExchange()); rabbitAdmin.declareQueue(confirmQueue()); rabbitAdmin.declareQueue(dlxQueue()); return rabbitAdmin; }
private static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); /** * 组装消息 * @param msg * @return */ private static Map<String, Object> createMsg(Object msg) { String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32); Map<String,Object> message= Maps.newHashMap(); message.put("sendTime",sdf.format(new Date())); message.put("msg", msg); message.put("msgId",msgId); return message; } @GetMapping("confirm") @ApiOperation("消息确认") public String confirm(@RequestParam String msg){ Map<String, Object> map = createMsg(msg); rabbitTemplate.convertAndSend("confirm-exchange","info",map,message -> { MessageProperties properties = message.getMessageProperties(); //设置唯一的消息id properties.setMessageId(IdWorker.getNextId()); return message; }); return "ok"; }
private static final Map<String,Integer> MAP=new HashMap(); /** * ack * @param message 消息 * @param c 通道 * @param msg 消息内容 * @throws IOException */ //使用queuesToDeclare属性,如果不存在则会创建队列,注:此处声明的队列要和生产者属性保持一致 @RabbitListener(queuesToDeclare = @Queue(value = "confirm-queue",declare = "false")) public void ack(Message message,Channel c,Map msg) throws IOException { MessageProperties properties = message.getMessageProperties(); String routingKey = properties.getReceivedRoutingKey(); log.info("ack收到:{},路由键:{}",msg,routingKey); //手动回执,不批量签收,回执后才能处理下一批消息 long tag = properties.getDeliveryTag(); try { //模拟业务报错 Thread.sleep(1000); int i=1/0; c.basicAck(tag,false); }catch (Exception e){ log.error(e.getMessage(), e); //消息id String messageId = properties.getMessageId(); if (!MAP.containsKey(messageId)) { MAP.put(messageId,1); }else { MAP.put(messageId,MAP.get(messageId)+1); } Integer count = MAP.get(messageId); if (count<5) { log.error("错误:{}次",count); //拒绝签收并重回队列 c.basicNack(tag,false,true); }else { log.error("达到错误次数限制,放到死信队列"); MAP.remove(messageId); //拒绝签收并不重回队列,进入死信队列 c.basicNack(tag,false,false); } } } /** * 死信队列 * @param message 消息 * @param c 通道 * @param msg 消息内容 * @throws IOException */ //使用queuesToDeclare属性,如果不存在则会创建队列,注:此处声明的队列要和生产者属性保持一致 @RabbitListener(queuesToDeclare = @Queue(value = "dlx-queue")) public void dlxQueue(Message message,Channel c,Map msg) throws IOException { MessageProperties properties = message.getMessageProperties(); String routingKey = properties.getReceivedRoutingKey(); log.info("死信队列收到:{},路由键:{}",msg,routingKey); //手动回执,不批量签收,回执后才能处理下一批消息 long tag = properties.getDeliveryTag(); c.basicAck(tag,false); }
附录:
雪花算法
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。