赞
踩
问题描述:
xxx项目中异步处理与第三方系统对接的功能,例如:发短信、发邮件、app推送等第三方系统,业务要求往第三方系统发送的请求发生异常,系统需要有重试机制,例如:第一次发短信,短信平台响应异常,则需要过两分钟重一次,第二次还是失败,则再过两分钟再试一次,第三次失败就不需要再重试,只需要把发送失败的信息记录下来即可,让管理员从系统后台人工发送短信。
由于项目服务模块之间不可相互调用,并且redis过期key监听在项目中只能在一个服务模块中配置,所以实现这个业务要求借助了redis的过期key监听功能和借助redis实现发布订阅功能。
将应用部署到多台服务上,多台服务器上的应用连接同一个redis,这时候发布到redis中的数据,会被多台机器上的应用重复消费,导致发送多条同样的短信。
解决方案:
1.redis-config.xml配置如下:
<beans:bean id="redisChannelSMSEmailAppListener" class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter">
<beans:constructor-arg>
<beans:bean class="com.tesla.itop.base.util.RedisChannelSMSEmailAppListener"></beans:bean>
</beans:constructor-arg>
</beans:bean>
<beans:bean id="redisContainer" class="org.springframswork.data.redis.listener.RedisMessageListenerContainer">
<beans:property name="connectionFactory" ref="jedisConnectionFactory"></beans:property>
<beans:property name="messageListeners">
<beans:map>
<beans:entry key-ref="redisChannelSMSEmailAppListener">
<beans:list>
<beans:bean class="org.springframework.data.redis.listener.ChannelTopic">
<beans:constructor-arg value="redisChannelSMSEmailApp"/>
</beans:bean>
</beans:list>
</beans:entry>
</beans:map>
</beans:property>
</beans:bean>
2.AbstractRedisChannelMessageListener.java内容如下:
/** * 单机 redis channel 集群控制类 * 备注:使用redis 渠道 需要集群控制时,需要继承该类, * 1.实现抽象方法 * 2.使用 redisTemplateStr, * 3.存放到channel 里边 必须是string类型的值 * add by chenxiaoli * @since 20200117 */ @SuppressWarnings("all") public abstract class AbstractRedisChannelMessageListener implements MessageListener { Logger logger = LoggerFactory.getLogger(this.getClass()); @Override public void onMessage(Message message, byte[] pattern) { String nowDateString = DateFormatUtil.getNowDateString(); byte[] channel = message.getChannel(); byte[] body = message.getBody(); String channelStr = new String(channel); logger.info("AbstractRedisChannelMessageListener channel :" + nowDateString + ":" + channelStr + "----start"); Object redisChannelValue = RedisChannelConstants.convertBodyByteToObject(body); logger.info("AbstractRedisChannelMessageListener body :" + nowDateString + ":" + redisChannelValue); if (redisChannelValue == null) { logger.info("AbstractRedisChannelMessageListener channel :" + nowDateString + ":" + channelStr + "----获取到的redisChannelValue为空--错误"); return; } if(redisChannelValue instanceof String){ String redisChannelValueStr = (String) redisChannelValue; //该处的key 加上aaa 只是为了和过期监听里边放置的自动过期key做区分,去除耦合,否则业务到此就全被阻断 String timeoutKey = redisChannelValueStr + "_aaa" + "_ONLYTIMEOUTUSE"; Boolean ifAbsent = getRedisTemplateStr().opsForValue().setIfAbsent(timeoutKey, "阻拦其他客户端消费该消息"); if (!ifAbsent) { return; } getRedisTemplateStr().opsForValue().set(timeoutKey, "超时key", 60, TimeUnit.SECONDS); doMessage(channelStr, redisChannelValueStr, nowDateString); }else{ logger.info("AbstractRedisChannelMessageListener channel :" + nowDateString + ":" + channelStr + "---非String类型的value,目前未做兼容,不能处理"); } } /** * 处理消息--由子类实现 * @param channelStr 渠道标识 * @param redisChannelValueStr 业务值 * @param nowDateString 时间戳 */ abstract void doMessage(String channelStr, String redisChannelValueStr, String nowDateString); /** * 使用子类的 redisTemplate 必须有一个可用的redisTemplateStr * @return */ abstract RedisTemplate getRedisTemplateStr(); }
4.子类业务实现
public class RedisChannelZeroReportListener extends AbstractRedisChannelMessageListener { public Logger loggerThirdSystem = LoggerFactory.getLogger("THIRD_SYSTEM_CONN_REQ"); @Autowired private RedisTemplate redisTemplateStr; @Autowired private ZeroReportManager zeroReportManager; @Autowired private Properties httpReqConf; @Autowired private ItGroupRuleManager itGroupRuleManager; @Autowired private NoticeService noticeService; @Override public void doMessage(String channelStr, String redisChannelValueStr, String nowDateString) { loggerThirdSystem.info("---------->>>>>往ITOMS推送零报告信息:" + nowDateString + "<<<<<----------"); loggerThirdSystem.info("消费redis中的channel:"+ channelStr +",redisChannelValueStr信息:" + redisChannelValueStr); if (redisChannelValueStr.contains(SessionAndRedisKeyContants.ZERO_REPORT_SENDTO_ITMOS_KEY)) { String zeroReportId = "";//零报告ID String[] split = redisChannelValueStr.split(SessionAndRedisKeyContants.ZERO_REPORT_SENDTO_ITMOS_KEY); if (split != null && split.length > 0) { for (String str : split) { if (!StringUtil.isBlank(str) && str.contains("ZR")) { zeroReportId = str; } } } loggerThirdSystem.info("零报告唯一标识:" + zeroReportId); //获取零报告信息,推送给ITOMS if (!StringUtil.isBlank(zeroReportId)) { ZeroReport zeroReport = zeroReportManager.getZeroReport(zeroReportId); String orgId = zeroReport.getOrgId(); String orgName = zeroReport.getOrgName(); String status = zeroReport.getStatus(); String title = zeroReport.getTitle(); String content = zeroReport.getContent(); Date startTime = zeroReport.getStartTime(); String sendItomsStatus = zeroReport.getSendItomsStatus(); String startTimeStr = ""; if (startTime != null) { startTimeStr = DateFormatUtil.getAppointDateString(startTime); } Date endTime = zeroReport.getEndTime(); String endTimeStr = ""; if (endTime != null) { endTimeStr = DateFormatUtil.getAppointDateString(endTime); } Date subTime = zeroReport.getCreateTime(); String subTimeStr = ""; if (subTime != null) { subTimeStr = DateFormatUtil.getAppointDateString(subTime); } String subUser = zeroReport.getSubUser(); String subUserName = zeroReport.getSubUserName(); String subUserOaName = zeroReport.getSubUserOaName(); if (zeroReport != null) { String itomsZeroReportUrl = httpReqConf.getProperty("itomsZeroReportUrl"); ZeroReportItomsModel zeroReportItomsModel = new ZeroReportItomsModel(orgId, orgName, status, title, content, startTimeStr, endTimeStr, subTimeStr, subUser, subUserName, subUserOaName); //请求Itoms次数 String reqItomsNum = sendItomsStatus; if (ItOperationProjectContants.USER_STATUS_DISABLED.equals(sendItomsStatus)){ reqItomsNum = "1"; }else if ("1".equals(sendItomsStatus)) { reqItomsNum = "2"; }else if ("2".equals(sendItomsStatus)){ reqItomsNum = "3"; } loggerThirdSystem.info("第 " + reqItomsNum + " 请求ITOMS系统"); loggerThirdSystem.info("请求ITOMS地址:" + itomsZeroReportUrl); loggerThirdSystem.info("请求ITOMS业务参数:" + zeroReportItomsModel.toString()); //测试方法 Map<String, Object> inas = new HashMap<String, Object>(); List<ZeroReportItomsModel> liet = new ArrayList<ZeroReportItomsModel>(); liet.add(zeroReportItomsModel); inas.put("records", liet); String dataParams = JSONObject.toJSONString(inas); loggerThirdSystem.info("请求ITOMS完整参数:" + dataParams); //往ITOMS发送请求 String code = "";//请求结果标识 try { //itoms请求地址 String postReqResult = HttpUtil.doPost(itomsZeroReportUrl, dataParams); loggerThirdSystem.info("请求ITOMS响应结果:" + postReqResult); if (!StringUtil.isEmpty(postReqResult)) { JSONObject jsonObject = JSONObject.parseObject(postReqResult); if (jsonObject != null) { JSONObject reply = jsonObject.getJSONObject("reply"); if (reply != null) { Boolean data = reply.getBoolean("data"); JSONObject returnCode = reply.getJSONObject("returnCode"); code = returnCode.getString("code"); } } } } catch (Exception e) { loggerThirdSystem.info("请求ITOMS响应结果:" + e.getMessage()); } loggerThirdSystem.info("请求ITOMS响应code:" + code); //尝试推送ITOMS的次数 List<String> strings = Arrays.asList("1", "2"); if (ItOperationProjectContants.CONN_ITOMS_SUCCESS_RESULT_FLAG.equals(code)) {//成功更新推送到ITOMS的状态 sendItomsStatus = ItOperationProjectContants.USER_STATUS_NORMAL; }else {//继续往redis中存放过期Key再次重试推送,只推送三次 if (ItOperationProjectContants.USER_STATUS_DISABLED.equals(sendItomsStatus)) { sendItomsStatus = "1"; }else { if (strings.contains(sendItomsStatus)) { sendItomsStatus = (Integer.parseInt(sendItomsStatus) + 1) + ""; } } } //尝试再次推送 if (strings.contains(sendItomsStatus)) { redisTemplateStr.opsForValue().set(SessionAndRedisKeyContants.zeroReportSendToITMOSRedisKey(zeroReportId),"第"+ sendItomsStatus +"次推送到ITOMS" + zeroReportId,120, TimeUnit.SECONDS); loggerThirdSystem.info("此次请求ITOMS失败, 120后再次请求ITOMS系统"); } //更新零报告推送到ITOMS的状态 zeroReport.setSendItomsStatus(sendItomsStatus); zeroReportManager.updateZeroReport(zeroReport); //推送三次依旧失败给数据字典中配置的人员发通知 if ("3".equals(sendItomsStatus)) { loggerThirdSystem.info("请求ITOMS系统失败3次,给总行人员发通知!"); //从数据字典中获取需要通知的人 Map<String, Object> paramMap = new HashMap<String, Object>(); paramMap.put("type", ItOperationProjectContants.ZERO_REPORT_NOTICE_USER);//分行上报零报告推送到ITOMS失败通知人员key paramMap.put("status", "Y");//获取有效的数据字典值 List<SysDictModel> sysDictByMapParams = itGroupRuleManager.getSysDictByMapParams(paramMap); if (sysDictByMapParams != null && !sysDictByMapParams.isEmpty()) { SysDictModel sysDictModel = sysDictByMapParams.get(0); if (sysDictModel != null) { String confValue = sysDictModel.getConfValue(); loggerThirdSystem.info("请求ITOMS系统失败3次,接收通知人员:" + confValue); if (!StringUtil.isBlank(confValue)) { //封装接收通知的人 List<String> noticeUserIdList = new ArrayList<String>(); String[] split1 = confValue.split(","); for (String userId : split1) { if (!StringUtil.isBlank(userId)) { noticeUserIdList.add(userId); } } //发送通知 if (!noticeUserIdList.isEmpty()) { //封装通知内容 String noticeType = ItMessageTemplateContants.ZERO_REPORT_NOTICE;//通知模版唯一key Object obj = new ZeroReportNoticeModel(orgName, orgId, title, "1".equals(status) ? "正常" : "异常", startTimeStr, endTimeStr, subTimeStr, subUserName, subUserOaName);//替换通知模版的业务数据 List<String> sysUserChList = noticeUserIdList;//通知的人员 String bussName = "零报告推送ITOMS失败,零报告标题:" + title;//邮件标题 String noticeWay = ItOperationProjectContants.NOTICE_WAY;//通知方式 String noticeOrgId = "9999"; String serviceType = BusinessMessageContants.ZERO_REPORT_NOTICE_FAIL + ""; String serviceId = zeroReportId;//业务ID noticeService.sendNotice(noticeType, obj, sysUserChList, bussName, noticeWay, noticeOrgId, serviceType, serviceId); } } } } } } } } } @Override RedisTemplate getRedisTemplateStr() { return redisTemplateStr; } }
重点提示:
监听redis消费key的onMessage方法提供成公共抽象父类,控制重复消费在公共抽象父类进行控制,子类中只需要实现具体业务就行。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。