赞
踩
spring: rabbitmq: host: ip port: 5672 username: guest password: guest ##消息发送确认回调 publisher-confirms: true #采用confirm以及return机制 发送返回监听回调 publisher-confirm-type: correlated publisher-returns: true listener: type: simple simple: #手动接收消息方式 acknowledge-mode: manual
@Configuration @Slf4j @AllArgsConstructor public class RabbitmqConfig { private final ConnectionFactory connectionFactory; private final RabbitLogsMapper rabbitLogsMapper; @Bean public RabbitTemplate rabbitTemplate(){ RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); //confirm确认 rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> { String msgId = correlationData.getId(); if (ack) { //发送成功 log.info("消息成功发送 , msgId: {}," ,msgId); //状态更新 消息发送成功 BiddingRabbitLogs biddingRabbitLogs = new BiddingRabbitLogs(); biddingRabbitLogs.setStatus(SendStatus.SEND_SUCCESS.getValue()); rabbitLogsMapper.update(biddingRabbitLogs, Wrappers.lambdaUpdate(BiddingRabbitLogs.class).eq(BiddingRabbitLogs::getId,msgId).notIn(BiddingRabbitLogs::getStatus,"4")); } else { //发送失败 log.error("消息发送失败, {}, cause: {}, msgId: {}", correlationData, cause, msgId); //状态更新 消息发送失败 BiddingRabbitLogs biddingRabbitLogs = new BiddingRabbitLogs(); biddingRabbitLogs.setStatus(SendStatus.SEND_FAILD.getValue()); rabbitLogsMapper.update(biddingRabbitLogs, Wrappers.lambdaUpdate(BiddingRabbitLogs.class).eq(BiddingRabbitLogs::getId,msgId).notIn(BiddingRabbitLogs::getStatus,"4")); } }); rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> { //触发回调 只有交换机找不到队列时才会触发 log.error("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message); //状态更新 消息发送失败 String msgId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation"); BiddingRabbitLogs biddingRabbitLogs = new BiddingRabbitLogs(); biddingRabbitLogs.setStatus(SendStatus.SEND_FAILD.getValue()); rabbitLogsMapper.update(biddingRabbitLogs, Wrappers.lambdaUpdate(BiddingRabbitLogs.class).eq(BiddingRabbitLogs::getId,msgId).notIn(BiddingRabbitLogs::getStatus,"4")); }); return rabbitTemplate; } @Bean public RabbitAdmin rabbitAdmin(RabbitTemplate rabbitTemplate){ RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitTemplate); rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } }
说明:
public String createQueue(String queueName) {
BiddingQueueConfig biddingQueueConfig = queueMapper.selectOne(Wrappers.lambdaQuery(BiddingQueueConfig.class).eq(BiddingQueueConfig::getQueue, queueName));
if (biddingQueueConfig == null) {
biddingQueueConfig = new BiddingQueueConfig();
biddingQueueConfig.setCreatetime(new Date());
biddingQueueConfig.setQueue(queueName);
biddingQueueConfig.setStatus("1");
int insert = queueMapper.insert(biddingQueueConfig);
//将队列持久化
rabbitAdmin.declareQueue(new Queue(queueName,true));
return queueName + "队列创建成功";
}
return queueName + "队列创建失败";
}
发送消息前首先将发送的数据插入数据库,状态变为发送中
// 启动自动创建队列 @RabbitListener(queuesToDeclare = { @Queue("queue_work_dontask") }) @RabbitHandler @SneakyThrows public void receiveDonTask(String data, Message message, Channel channel){ //消息id String msgId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation"); //根据消息id查询BiddingRabbitLogs日志表 BiddingRabbitLogs biddingRabbitLogs = remoteLogsService.get(msgId, SecurityConstants.FROM_IN).getData(); if (biddingRabbitLogs == null) { log.error("消息ID查询 biddingRabbitLogs:null"); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); return; } //状态:1.消息发送中 2.消息发送成功 3.消息发送失败 4.消费成功 5.消费失败 if (SendStatus.CONSUME_SUCCESS.getValue().equals(biddingRabbitLogs.getStatus()) || SendStatus.SEND_FAILD.getValue() == String.valueOf(biddingRabbitLogs.getTryTimes())) { //重复消费 log.info("消息ID:{},重复消费",msgId); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); return; } try { //处理业务逻辑 Map map = JSON.parseObject(data, Map.class); String dataString = (String) map.get("data"); String username = (String) map.get("username"); Integer tenantId = (Integer) map.get("tenantId"); ApproveParam approveParam = JSON.parseObject(dataString, ApproveParam.class); R<String> stringR = doneTask(approveParam,username,tenantId); //处理成功 更新状态 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); biddingRabbitLogs.setStatus(SendStatus.CONSUME_SUCCESS.getValue()); biddingRabbitLogs.setSuccesstime(new Date()); remoteLogsService.updateById(biddingRabbitLogs,SecurityConstants.FROM_IN); log.info("消费成功,消息ID:{}",msgId); } catch (Exception e) { e.printStackTrace(); if (biddingRabbitLogs.getTryTimes() >= Integer.parseInt(SendStatus.TRY_TIMES.getValue())) { //多次消费不成功 自动接收 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); log.error("多次消费失败,消息ID:{}",msgId); } else { //消费失败 拒收 重回队列 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true); log.error("消费失败,消息ID:{}",msgId); } biddingRabbitLogs.setStatus(SendStatus.CONSUME_FAILD.getValue()); biddingRabbitLogs.setTryTimes(biddingRabbitLogs.getTryTimes()+1); remoteLogsService.updateById(biddingRabbitLogs,SecurityConstants.FROM_IN); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。