当前位置:   article > 正文

大数据最新学成在线day13 支付通知

大数据最新学成在线day13 支付通知
publisher-confirm-type: correlated #correlated 异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
publisher-returns: false #开启publish-return功能,同样是基于callback机制,需要定义ReturnCallback
template:
  mandatory: false #定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息
listener:
  simple:
    prefetch: 1  #每次只能获取一条消息,处理完成才能获取下一个消息
    acknowledge-mode: none #auto:出现异常时返回unack,消息回滚到mq;没有异常,返回ack ,manual:手动控制,none:丢弃消息,不回滚到mq
    retry:
      enabled: true #开启消费者失败重试
      initial-interval: 1000ms #初识的失败等待时长为1秒
      multiplier: 1 #失败的等待时长倍数,下次等待时长 = multiplier * last-interval
      max-attempts: 3 #最大重试次数
      stateless: true #true无状态;false有状态。如果业务中包含事务,这里改为false
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

**D表示交换机&消息队列被持久化**


![](https://img-blog.csdnimg.cn/direct/a4e54ae217e441548a05471affa311c1.png)


![](https://img-blog.csdnimg.cn/direct/888e444610b14177b9695231404d0fbe.png)


**生产方配置ReturnCallback**



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

@Slf4j
@Configuration
public class PayNotifyConfig implements ApplicationContextAware {

//交换机
public static final String PAYNOTIFY_EXCHANGE_FANOUT = "paynotify_exchange_fanout";
//支付结果通知消息类型
public static final String MESSAGE_TYPE = "payresult_notify";
//支付通知队列
public static final String PAYNOTIFY_QUEUE = "paynotify_queue";

//声明交换机,且持久化
@Bean(PAYNOTIFY_EXCHANGE_FANOUT)
public FanoutExchange paynotify_exchange_fanout() {
    // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
    return new FanoutExchange(PAYNOTIFY_EXCHANGE_FANOUT, true, false);
}
//支付通知队列,且持久化
@Bean(PAYNOTIFY_QUEUE)
public Queue course_publish_queue() {
    return QueueBuilder.durable(PAYNOTIFY_QUEUE).build();
}

//交换机和支付通知队列绑定
@Bean
public Binding binding_course_publish_queue(@Qualifier(PAYNOTIFY_QUEUE) Queue queue, @Qualifier(PAYNOTIFY_EXCHANGE_FANOUT) FanoutExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange);
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    // 获取RabbitTemplate
    RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
    //消息处理service
    MqMessageService mqMessageService = applicationContext.getBean(MqMessageService.class);
    // 设置ReturnCallback
    rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
        // 投递失败,记录日志
        log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
                replyCode, replyText, exchange, routingKey, message.toString());
        MqMessage mqMessage = JSON.parseObject(message.toString(), MqMessage.class);
        //将消息再添加到消息表
        mqMessageService.addMessage(mqMessage.getMessageType(),mqMessage.getBusinessKey1(),mqMessage.getBusinessKey2(),mqMessage.getBusinessKey3());

    });
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

}


**生产方配置confirmCallBack**



  • 1
  • 2
  • 3
  • 4
  • 5

/**
* 发送通知结果
*
* @param message
*/
@Override
public void notifyPayResult(MqMessage message) {
//1、消息体,转json
String msg = JSON.toJSONString(message);
//设置消息持久化
Message msgObj = MessageBuilder.withBody(msg.getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
// 2.全局唯一的消息ID,需要封装到CorrelationData中
CorrelationData correlationData = new CorrelationData(message.getId().toString());
// 3.添加callback 回调函数
correlationData.getFuture().addCallback(
result -> {
if(result.isAck()){
// 3.1.ack,消息成功
log.debug(“通知支付结果消息发送成功, ID:{}”, correlationData.getId());
//删除消息表中的记录
mqMessageService.completed(message.getId());
}else{
// 3.2.nack,消息失败
log.error(“通知支付结果消息发送失败, ID:{}, 原因{}”,correlationData.getId(), result.getReason());
}
},
ex -> log.error(“消息发送异常, ID:{}, 原因{}”,correlationData.getId(),ex.getMessage())
);
// 发送消息 因为是广播模式所以路由键为空
rabbitTemplate.convertAndSend(PayNotifyConfig.PAYNOTIFY_EXCHANGE_FANOUT, “”, msgObj,correlationData);
}



* **confirm callback(确认回调)**:当消息成功到达 Exchange 时,Broker 会发送一个确认给生产者。如果消息发送失败,则会触发 confirm callback 中的相应逻辑。确认回调通常用于确保消息被成功发送到 Exchange,以便生产者可以知道是否需要重新发送消息。
* **return callback(返回回调)**:当消息被路由到 Exchange 但无法路由到 Queue 时,Broker 会发送一个返回消息给生产者。返回回调通常用于处理无法路由到 Queue 的消息,生产者可以在返回回调中执行相应的逻辑,比如记录日志或者采取其他措施。


在本项目中,生产者消息投递成功到交换机时,执行confirmCallBack删除mq\_message表中的记录。如果消息从交换机投递到队列失败时,则把消息再次添加回mq\_message表,并且进行重试(重试次数在配置文件中配置)。



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
publisher-confirm-type: correlated #correlated 异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
publisher-returns: true #开启publish-return功能,同样是基于callback机制,需要定义ReturnCallback
template:
mandatory: true #定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息
listener:
simple:
prefetch: 1 #每次只能获取一条消息,处理完成才能获取下一个消息
acknowledge-mode: manual #auto:出现异常时返回unack,消息回滚到mq;没有异常,返回ack ,manual:手动控制,none:丢弃消息,不回滚到mq
retry:
enabled: true #开启消费者失败重试
initial-interval: 1000ms #初识的失败等待时长为1秒
multiplier: 1 #失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 #最大重试次数
stateless: true #true无状态;false有状态。如果业务中包含事务,这里改为false
direct:
acknowledge-mode: manual



> 
> Q1:如果消费者未收到消息或者出现系统错误时,是否会触发回调函数?
> 
> 
> 


在 RabbitMQ 中,当消费者未收到消息或者出现系统错误时,不会触发回调函数。回调函数是在消息生产者发送消息到 RabbitMQ 服务器后触发的,用于确认消息是否成功发送到 Exchange 或者是否被正确路由到 Queue。


如果消费者未收到消息,可能有以下几种原因:


* **消息丢失**:在消息生产者发送消息到 RabbitMQ 服务器后,由于网络问题或者其他原因导致消息丢失,这种情况下消费者是不会收到消息的。解决这个问题需要确保消息生产者发送消息时能够成功连接到 RabbitMQ 服务器,并且消息能够正确发送。
* **消费者无法处理消息**:如果消费者无法正确处理消息,可能是因为消费者代码出现了错误或者处理消息的逻辑不完善,这种情况下 RabbitMQ 不会触发回调函数。解决这个问题需要检查消费者代码逻辑是否正确,并确保能够处理各种异常情况。
* **队列配置问题**:如果队列配置不正确,比如消费者没有正确绑定到队列,或者队列设置了错误的参数,可能会导致消费者无法接收到消息。解决这个问题需要检查队列的配置,并确保消费者能够正确订阅到消息。


#### 2、接口定义


生产者业务层:



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
/**
 * 请求支付宝查询支付结果
 *
 * @param payNo 支付记录id
 * @return 支付记录信息
 */
@Override
public PayRecordDto queryPayResult(String payNo) {
    //获得初始化的AlipayClient
    AlipayClient alipayClient = new DefaultAlipayClient(AlipayConfig.URL,
            APP_ID,APP_PRIVATE_KEY,
            "json",
            AlipayConfig.CHARSET,
            ALIPAY_PUBLIC_KEY,
            AlipayConfig.SIGNTYPE);
    AlipayTradeQueryRequest request = new AlipayTradeQueryRequest();
    JSONObject bizContent = new JSONObject();
    bizContent.put("out_trade_no", payNo);
    request.setBizContent(bizContent.toString());
    AlipayTradeQueryResponse response = null;
    try {
        response = alipayClient.execute(request);
    } catch (AlipayApiException e) {
        log.error("{}:查询支付宝支付结果错误!",payNo);
        return null;
    }
    if (!response.isSuccess()) {
        log.error("{}:查询支付宝支付结果失败!",payNo);
        return null;
    }
    String resultJson = response.getBody();
    //转map
    Map resultMap = JSON.parseObject(resultJson, Map.class);
    Map alipay_trade_query_response = (Map) resultMap.get("alipay_trade_query_response");
    //交易状态
    String tradeStatus = (String) alipay_trade_query_response.get("trade_status");
    //支付宝交易号
    String tradeNo = (String) alipay_trade_query_response.get("trade_no");
    PayStatusDto payStatusDto = new PayStatusDto();
    payStatusDto.setTrade_status(tradeStatus);
    payStatusDto.setTrade_no(tradeNo);
    payStatusDto.setOut_trade_no(payNo);
    payStatusDto.setApp_id(APP_ID);
    //处理订单状态
    return this.handlePayStatus(payStatusDto);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

/**
* 处理订单状态,更新xc_pay_record表
* @return
*/
public PayRecordDto handlePayStatus(PayStatusDto dto) {
PayRecordDto payRecordDto = new PayRecordDto();
String payNo = dto.getOut_trade_no();
String tradeNo = dto.getTrade_no();
String tradeStatus = dto.getTrade_status();

    XcPayRecord xcPayRecord = xcPayRecordMapper.selectOne(new LambdaQueryWrapper<XcPayRecord>().eq(XcPayRecord::getPayNo, payNo));
    if (null == xcPayRecord ){
        log.error("{}:查询订单记录不存在!",tradeNo);
        XueChengPlusException.cast("查询订单记录不存在!");
    }
    if (xcPayRecord.getStatus().equals("601002")){
        BeanUtils.copyProperties(xcPayRecord,payRecordDto);
        return payRecordDto;
    }

    //修改xc_pay_record和xc_orders 交易状态
    switch (tradeStatus) {
        case "TRADE_CLOSED":
            xcPayRecord.setStatus("601003");
            break;
        case "TRADE_SUCCESS":
        case "TRADE_FINISHED":
            xcPayRecord.setStatus("601002");
            xcPayRecord.setPaySuccessTime(LocalDateTime.now());
            break;
    }
    xcPayRecord.setOutPayNo(tradeNo);
    xcPayRecord.setOutPayChannel("alipay");
    xcPayRecordMapper.updateById(xcPayRecord);
    BeanUtils.copyProperties(xcPayRecord,payRecordDto);
    //通过orderId查询订单表信息
    Long orderId = xcPayRecord.getOrderId();
    XcOrders xcOrders = ordersMapper.selectOne(new LambdaQueryWrapper<XcOrders>().eq(XcOrders::getId, orderId));
    if (null == xcOrders ){
        log.error("{}:查询订单不存在!",tradeNo);
        XueChengPlusException.cast("查询订单不存在!");
    }
    //存入mq_message表
    MqMessage message = mqMessageService.addMessage("payresult_notify", xcOrders.getOutBusinessId(), xcOrders.getOrderType(), null);
    //将消息发送至队列,通知learning服务
    this.notifyPayResult(message);
    return payRecordDto;
}

/**
 * 发送通知结果
 *
 * @param message
 */
@Override
public void notifyPayResult(MqMessage message) {
    //1、消息体,转json
    String msg = JSON.toJSONString(message);
    //设置消息持久化
    Message msgObj = MessageBuilder.withBody(msg.getBytes(StandardCharsets.UTF_8))
            .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
            .build();
    // 2.全局唯一的消息ID,需要封装到CorrelationData中
    CorrelationData correlationData = new CorrelationData(message.getId().toString());
    // 3.添加callback 回调函数,接收方接受成功或失败后回调
    correlationData.getFuture().addCallback(
            result -> {
                if(result.isAck()){
                    // 3.1.ack,消息成功
                    log.debug("通知支付结果消息发送成功, ID:{}", correlationData.getId());
                    //删除消息表中的记录
                    mqMessageService.completed(message.getId());
                }else{
                    // 3.2.nack,消息失败
                    log.error("通知支付结果消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());
                }
            },
            ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
    );
    // 发送消息 因为是广播模式所以路由键为空
    rabbitTemplate.convertAndSend(PayNotifyConfig.PAYNOTIFY_EXCHANGE_FANOUT, "", msgObj,correlationData);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72

支付宝异步回调接口同理也调用了handlePayStatus方法


消费者业务层:



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

@Service
@Slf4j
@Transactional
public class ReceviceOrderMessageServiceImpl implements ReceviceOrderMessageService {

/*
选课记录表
 */
@Autowired
private XcChooseCourseMapper chooseCourseMapper;

/*
课程表
 */
@Autowired
private XcCourseTablesMapper courseTablesMapper;

@Autowired
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

img
img
img

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!

由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新

需要这份系统化资料的朋友,可以戳这里获取

towired

[外链图片转存中…(img-n5ZM3jWb-1714790940638)]
[外链图片转存中…(img-2H6ZnTRF-1714790940639)]
[外链图片转存中…(img-WtWWMMZw-1714790940639)]

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!

由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新

需要这份系统化资料的朋友,可以戳这里获取

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/560281
推荐阅读
相关标签
  

闽ICP备14008679号