赞
踩
先自我介绍一下,小编浙江大学毕业,去过华为、字节跳动等大厂,目前阿里P7
深知大多数程序员,想要提升技能,往往是自己摸索成长,但自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!
因此收集整理了一份《2024年最新大数据全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友。
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!
由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新
如果你需要这些资料,可以添加V获取:vip204888 (备注大数据)
/** * 发送通知结果 * * @param message */ @Override public void notifyPayResult(MqMessage message) { //1、消息体,转json String msg = JSON.toJSONString(message); //设置消息持久化 Message msgObj = MessageBuilder.withBody(msg.getBytes(StandardCharsets.UTF_8)) .setDeliveryMode(MessageDeliveryMode.PERSISTENT) .build(); // 2.全局唯一的消息ID,需要封装到CorrelationData中 CorrelationData correlationData = new CorrelationData(message.getId().toString()); // 3.添加callback 回调函数 correlationData.getFuture().addCallback( result -> { if(result.isAck()){ // 3.1.ack,消息成功 log.debug("通知支付结果消息发送成功, ID:{}", correlationData.getId()); //删除消息表中的记录 mqMessageService.completed(message.getId()); }else{ // 3.2.nack,消息失败 log.error("通知支付结果消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason()); } }, ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage()) ); // 发送消息 因为是广播模式所以路由键为空 rabbitTemplate.convertAndSend(PayNotifyConfig.PAYNOTIFY_EXCHANGE_FANOUT, "", msgObj,correlationData); }
在本项目中,生产者消息投递成功到交换机时,执行confirmCallBack删除mq_message表中的记录。如果消息从交换机投递到队列失败时,则把消息再次添加回mq_message表,并且进行重试(重试次数在配置文件中配置)。
spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtual-host: / publisher-confirm-type: correlated #correlated 异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback publisher-returns: true #开启publish-return功能,同样是基于callback机制,需要定义ReturnCallback template: mandatory: true #定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息 listener: simple: prefetch: 1 #每次只能获取一条消息,处理完成才能获取下一个消息 acknowledge-mode: manual #auto:出现异常时返回unack,消息回滚到mq;没有异常,返回ack ,manual:手动控制,none:丢弃消息,不回滚到mq retry: enabled: true #开启消费者失败重试 initial-interval: 1000ms #初识的失败等待时长为1秒 multiplier: 1 #失败的等待时长倍数,下次等待时长 = multiplier * last-interval max-attempts: 3 #最大重试次数 stateless: true #true无状态;false有状态。如果业务中包含事务,这里改为false direct: acknowledge-mode: manual
Q1:如果消费者未收到消息或者出现系统错误时,是否会触发回调函数?
在 RabbitMQ 中,当消费者未收到消息或者出现系统错误时,不会触发回调函数。回调函数是在消息生产者发送消息到 RabbitMQ 服务器后触发的,用于确认消息是否成功发送到 Exchange 或者是否被正确路由到 Queue。
如果消费者未收到消息,可能有以下几种原因:
生产者业务层:
/** * 请求支付宝查询支付结果 * * @param payNo 支付记录id * @return 支付记录信息 */ @Override public PayRecordDto queryPayResult(String payNo) { //获得初始化的AlipayClient AlipayClient alipayClient = new DefaultAlipayClient(AlipayConfig.URL, APP_ID,APP_PRIVATE_KEY, "json", AlipayConfig.CHARSET, ALIPAY_PUBLIC_KEY, AlipayConfig.SIGNTYPE); AlipayTradeQueryRequest request = new AlipayTradeQueryRequest(); JSONObject bizContent = new JSONObject(); bizContent.put("out_trade_no", payNo); request.setBizContent(bizContent.toString()); AlipayTradeQueryResponse response = null; try { response = alipayClient.execute(request); } catch (AlipayApiException e) { log.error("{}:查询支付宝支付结果错误!",payNo); return null; } if (!response.isSuccess()) { log.error("{}:查询支付宝支付结果失败!",payNo); return null; } String resultJson = response.getBody(); //转map Map resultMap = JSON.parseObject(resultJson, Map.class); Map alipay_trade_query_response = (Map) resultMap.get("alipay_trade_query_response"); //交易状态 String tradeStatus = (String) alipay_trade_query_response.get("trade_status"); //支付宝交易号 String tradeNo = (String) alipay_trade_query_response.get("trade_no"); PayStatusDto payStatusDto = new PayStatusDto(); payStatusDto.setTrade_status(tradeStatus); payStatusDto.setTrade_no(tradeNo); payStatusDto.setOut_trade_no(payNo); payStatusDto.setApp_id(APP_ID); //处理订单状态 return this.handlePayStatus(payStatusDto); } /** * 处理订单状态,更新xc_pay_record表 * @return */ public PayRecordDto handlePayStatus(PayStatusDto dto) { PayRecordDto payRecordDto = new PayRecordDto(); String payNo = dto.getOut_trade_no(); String tradeNo = dto.getTrade_no(); String tradeStatus = dto.getTrade_status(); XcPayRecord xcPayRecord = xcPayRecordMapper.selectOne(new LambdaQueryWrapper<XcPayRecord>().eq(XcPayRecord::getPayNo, payNo)); if (null == xcPayRecord ){ log.error("{}:查询订单记录不存在!",tradeNo); XueChengPlusException.cast("查询订单记录不存在!"); } if (xcPayRecord.getStatus().equals("601002")){ BeanUtils.copyProperties(xcPayRecord,payRecordDto); return payRecordDto; } //修改xc_pay_record和xc_orders 交易状态 switch (tradeStatus) { case "TRADE_CLOSED": xcPayRecord.setStatus("601003"); break; case "TRADE_SUCCESS": case "TRADE_FINISHED": xcPayRecord.setStatus("601002"); xcPayRecord.setPaySuccessTime(LocalDateTime.now()); break; } xcPayRecord.setOutPayNo(tradeNo); xcPayRecord.setOutPayChannel("alipay"); xcPayRecordMapper.updateById(xcPayRecord); BeanUtils.copyProperties(xcPayRecord,payRecordDto); //通过orderId查询订单表信息 Long orderId = xcPayRecord.getOrderId(); XcOrders xcOrders = ordersMapper.selectOne(new LambdaQueryWrapper<XcOrders>().eq(XcOrders::getId, orderId)); if (null == xcOrders ){ log.error("{}:查询订单不存在!",tradeNo); XueChengPlusException.cast("查询订单不存在!"); } //存入mq_message表 MqMessage message = mqMessageService.addMessage("payresult_notify", xcOrders.getOutBusinessId(), xcOrders.getOrderType(), null); //将消息发送至队列,通知learning服务 this.notifyPayResult(message); return payRecordDto; } /** * 发送通知结果 * * @param message */ @Override public void notifyPayResult(MqMessage message) { //1、消息体,转json String msg = JSON.toJSONString(message); //设置消息持久化 Message msgObj = MessageBuilder.withBody(msg.getBytes(StandardCharsets.UTF_8)) .setDeliveryMode(MessageDeliveryMode.PERSISTENT) .build(); // 2.全局唯一的消息ID,需要封装到CorrelationData中 CorrelationData correlationData = new CorrelationData(message.getId().toString()); // 3.添加callback 回调函数,接收方接受成功或失败后回调 correlationData.getFuture().addCallback( result -> { if(result.isAck()){ // 3.1.ack,消息成功 log.debug("通知支付结果消息发送成功, ID:{}", correlationData.getId()); //删除消息表中的记录 mqMessageService.completed(message.getId()); }else{ // 3.2.nack,消息失败 log.error("通知支付结果消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason()); } }, ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage()) ); // 发送消息 因为是广播模式所以路由键为空 rabbitTemplate.convertAndSend(PayNotifyConfig.PAYNOTIFY_EXCHANGE_FANOUT, "", msgObj,correlationData); }
支付宝异步回调接口同理也调用了handlePayStatus方法
消费者业务层:
@Service @Slf4j @Transactional public class ReceviceOrderMessageServiceImpl implements ReceviceOrderMessageService { /* 选课记录表 */ @Autowired private XcChooseCourseMapper chooseCourseMapper; /* 课程表 */ @Autowired private XcCourseTablesMapper courseTablesMapper; @Autowired private MyCourseTables myCourseTables; @Autowired private ReceviceOrderMessageService receviceOrderMessageService; @Resource private RedisTemplate<String,Integer> redisTemplate; @Resource private RedissonClient redissonClient; private static final String QUEUE_KEY = "learning:"; private static final String REDISSON_KEY = "redisson:"; @Override @RabbitListener(queues = PayNotifyConfig.PAYNOTIFY_QUEUE) public void receiveMessage(Message message, Channel channel) { byte[] body = message.getBody(); long deliverTag = message.getMessageProperties().getDeliveryTag(); String jsonStr = new String(body); MqMessage mqMessage = JSON.parseObject(jsonStr, MqMessage.class); //得到选课id String businessKey1 = mqMessage.getBusinessKey1(); //得到课程类型 String businessKey2 = mqMessage.getBusinessKey2(); //处理课程信息 Integer count = 0; try { count = redisTemplate.opsForValue().get(QUEUE_KEY + businessKey1); if (!ObjectUtils.isEmpty(count) && count == 3){ log.error("消息已达到最大重试次数:{},作丢弃处理",count); channel.basicNack(deliverTag,false,false); return; } this.receviceOrderMessageService.handleCourseDBData(businessKey1,businessKey2); int i = 1/0; channel.basicAck(deliverTag,true); log.info("消息手动确认成功!"); } catch (Exception e) { try { channel.basicNack(deliverTag,false,true); RLock lock = redissonClient.getLock(REDISSON_KEY + businessKey1); if (lock.tryLock()){ try { redisTemplate.opsForValue().set(QUEUE_KEY + businessKey1, count == null ? 1 : count + 1,60, TimeUnit.SECONDS); } finally { lock.unlock(); } } } catch (IOException ex) { log.error("重新放入队列失败,失败原因:{}",e.getMessage(),e); } log.error("消费者出错,mq参数:{},错误信息:{}",message,e.getMessage(),e); } } /** * 处理课程表,选课记录表 * @param businessKey1 选课表id * @param businessKey2 课程类型 * @return */ public boolean handleCourseDBData(String businessKey1, String businessKey2) { //根据课程id查询选课记录表 XcChooseCourse xcChooseCourse = chooseCourseMapper.selectOne(new LambdaQueryWrapper<XcChooseCourse>(). eq(XcChooseCourse::getId, businessKey1)); if (ObjectUtils.isEmpty(xcChooseCourse)){ log.error("根据课程ID:{}查询到的选课记录为空!",businessKey1); return false; } if (!businessKey2.equals("60201")){ return false; } //修改状态为701001选课成功 xcChooseCourse.setStatus("701001"); int i = chooseCourseMapper.updateById(xcChooseCourse); if (i<1){ log.error("更新选课记录表失败!选课ID:{}",businessKey1); XueChengPlusException.cast("更新选课记录表失败!"); **网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。** **需要这份系统化的资料的朋友,可以添加V获取:vip204888 (备注大数据)** ![img](https://img-blog.csdnimg.cn/img_convert/f86f9de5337397a6135468dd714e4ecd.png) **一个人可以走的很快,但一群人才能走的更远!不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!** log.error("更新选课记录表失败!选课ID:{}",businessKey1); XueChengPlusException.cast("更新选课记录表失败!"); **网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。** **需要这份系统化的资料的朋友,可以添加V获取:vip204888 (备注大数据)** [外链图片转存中...(img-94fSzcgX-1713189491874)] **一个人可以走的很快,但一群人才能走的更远!不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!**
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。