赞
踩
在工作时接到一个需求,需要对公司的产品做侵权排查,涉及到的侵权的产品要插入到另外一张表做记录
于是写出了一个基于xxl-job的定时任务,代码如下
package com.ebuy.cloud.service.product.serviceproduct.service.jobhandler; import com.alibaba.druid.support.json.JSONUtils; import com.alibaba.fastjson.JSONObject; import com.ebuy.cloud.service.product.serviceproduct.common.ResponseMsg; import com.ebuy.cloud.service.product.serviceproduct.entity.YibaiProdSkuGbcRecord; import com.ebuy.cloud.service.product.serviceproduct.entity.bo.YibaiProdSkuEditBasicBo; import com.ebuy.cloud.service.product.serviceproduct.feign.ElasticsearchFeignClient; import com.ebuy.cloud.service.product.serviceproduct.mapper.YibaiProdSkuEditBasicMapper; import com.ebuy.cloud.service.product.serviceproduct.mapper.YibaiProdSkuGbcRecordMapper; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.handler.annotation.JobHandler; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @Author: carter * @Date: 2019/8/6 16:38 * @Version 1.0 */ @JobHandler(value = "prodSkuEditBasicJobHandler") @Component public class YibaiProdSkuEditBasicJobHandler extends IJobHandler { private static final Logger log = LoggerFactory.getLogger(YibaiProdSkuEditBasicJobHandler.class); @Autowired private YibaiProdSkuEditBasicMapper skuEditBasicMapper; @Autowired private YibaiProdSkuGbcRecordMapper skuGbcRecordMapper; @Autowired private ElasticsearchFeignClient elasticsearchFeignClient; @Override public ReturnT<String> execute(String s) throws Exception { log.info("=========开始执行SKU侵权排查定时任务=========="); //统计需要侵权排查的sku总条数 Integer total = skuEditBasicMapper.getTotal(); if (total > 0) { Integer pageSize = 5000;//每页查询数量 Integer pages;//总分页数据 if (total % pageSize == 0) { pages = total / pageSize; } else { pages = total / pageSize + 1; } log.info("共计"+total+"条数据,分为"+pages+"页,每页5000条数据。"); ExecutorService executor = Executors.newFixedThreadPool(15); for (Integer i = 1; i <= pages; i++) { Integer currentPage = i; Integer limitStart = (currentPage - 1) * pageSize; List<YibaiProdSkuEditBasicBo> skuEditBasics = skuEditBasicMapper.getSkuEditBasic(limitStart, pageSize); for (YibaiProdSkuEditBasicBo skuEditBasic : skuEditBasics) { executor.execute(new Runnable() { public void run() { //调用侵权排查接口 做业务处理 getSkuEditBasic(skuEditBasic); } }); } } executor.shutdown();//该方法在加入线程队列的线程执行完之前不会执行 while (true) { if (executor.isTerminated()) { log.info("=========SKU侵权排查定时任务执行完毕,共有"+total+"条数据,已全部排查完成========="); return SUCCESS; } Thread.sleep(2000);//每隔几秒再去验证队列执行完毕没 } } return SUCCESS; }
ExecutorService executor = Executors.newFixedThreadPool(15) 声明一个大小为15的线程池
executor.shutdown() 此方法在线程池的所有线程为空才会执行
executor.isTerminated() 判断executor.shutdown()是否执行
/** * 业务代码 */ private void getSkuEditBasic(YibaiProdSkuEditBasicBo skuEditBasic){ JSONObject data = new JSONObject(); data.put("keyword", skuEditBasic.getSkuTitle()); //查询是否标题侵权 ResponseMsg skuTitleMsg = elasticsearchFeignClient.listKeyword(data); /*if (null == skuTitleMsg) { throw new Exception("标题侵权排查调用service-elasticsearch服务失败 ===> sku="+skuEditBasic.getSku()); } // 调用服务报错 if (skuTitleMsg.getCode() != Code.SUCCESS) { throw new Exception("标题侵权排查失败 sku==>"+skuEditBasic.getSku()+";msg==>"+skuTitleMsg.getMsg()); }*/ data.put("keyword", skuEditBasic.getProductKeyWords()); //查询是否关键词侵权 ResponseMsg productKeyWordsMsg = elasticsearchFeignClient.listKeyword(data); data.put("keyword", skuEditBasic.getNoticeRemark()); //查询是否注意事项侵权 ResponseMsg noticeRemarkMsg = elasticsearchFeignClient.listKeyword(data); data.put("keyword", skuEditBasic.getPackList()); //查询是否sku包装清单侵权 ResponseMsg packListMsg = elasticsearchFeignClient.listKeyword(data); data.put("keyword", skuEditBasic.getGoodsParams()); //查询是否商品参数(富文本大字段)侵权 ResponseMsg goodsParamsMsg = elasticsearchFeignClient.listKeyword(data); data.put("keyword", skuEditBasic.getProductDifference()); //查询是否卖点侵权 ResponseMsg productDifferenceMsg = elasticsearchFeignClient.listKeyword(data); List<String> skuTitleList = (List<String>) skuTitleMsg.getData(); List<String> productKeyWordsList = (List<String>) productKeyWordsMsg.getData(); List<String> noticeRemarkList = (List<String>) noticeRemarkMsg.getData(); List<String> packListList = (List<String>) packListMsg.getData(); List<String> goodsParamsList = (List<String>) goodsParamsMsg.getData(); List<String> productDifferenceList = (List<String>) productDifferenceMsg.getData(); //只要有一个list不为空则查询出有侵权现象 插入侵权sku列表 if ((null != skuTitleList && !skuTitleList.isEmpty()) || (null != productKeyWordsList && !productKeyWordsList.isEmpty()) || (null != noticeRemarkList && !noticeRemarkList.isEmpty()) || (null != packListList && !packListList.isEmpty()) || (null != goodsParamsList && !goodsParamsList.isEmpty()) || (null != productDifferenceList && !productDifferenceList.isEmpty())) { if(null==productKeyWordsList && null==productDifferenceList){ productKeyWordsList = new ArrayList<>(); }else if(null==productKeyWordsList && null!=productDifferenceList){ productKeyWordsList=productDifferenceList; }else if(null!=productKeyWordsList && null!=productDifferenceList){ productKeyWordsList.addAll(productDifferenceList); } String gbcs = ""; Integer gbcType = null; Map<String, Object> gbcField = new HashMap<>(16); gbcField.put("sku_title",skuTitleList==null?new ArrayList<>():skuTitleList); gbcField.put("text_value", productKeyWordsList); gbcField.put("notice_remark", noticeRemarkList==null?new ArrayList<>():noticeRemarkList); gbcField.put("pack_list", packListList==null?new ArrayList<>():packListList); gbcField.put("goods_params", goodsParamsList==null?new ArrayList<>():goodsParamsList); if (null != skuTitleList && !skuTitleList.isEmpty()) { gbcType = 1; gbcs = gbcs + StringUtils.join(skuTitleList, "§"); } if (null != productKeyWordsList && !productKeyWordsList.isEmpty()) { gbcType = 2; if (StringUtils.isBlank(gbcs)) { gbcs = gbcs + StringUtils.join(productKeyWordsList, "§"); } else { gbcs = gbcs + "§" + StringUtils.join(productKeyWordsList, "§"); } } if (null != noticeRemarkList && !noticeRemarkList.isEmpty()) { gbcType = 3; if (StringUtils.isBlank(gbcs)) { gbcs = gbcs + StringUtils.join(noticeRemarkList, "§"); } else { gbcs = gbcs + "§" + StringUtils.join(noticeRemarkList, "§"); } } if (null != packListList && !packListList.isEmpty()) { gbcType = 3; if (StringUtils.isBlank(gbcs)) { gbcs = gbcs + StringUtils.join(packListList, "§"); } else { gbcs = gbcs + "§" + StringUtils.join(packListList, "§"); } } if (null != goodsParamsList && !goodsParamsList.isEmpty()) { gbcType = 3; if (StringUtils.isBlank(gbcs)) { gbcs = gbcs + StringUtils.join(goodsParamsList, "§"); } else { gbcs = gbcs + "§" + StringUtils.join(goodsParamsList, "§"); } } if (null != productDifferenceList && !productDifferenceList.isEmpty()) { gbcType = 3; if (StringUtils.isBlank(gbcs)) { gbcs = gbcs + StringUtils.join(productDifferenceList, "§"); } else { gbcs = gbcs + "§" + StringUtils.join(productDifferenceList, "§"); } } YibaiProdSkuGbcRecord skuGbcRecord=new YibaiProdSkuGbcRecord(); skuGbcRecord.setSku(skuEditBasic.getSku()); skuGbcRecord.setGbcs(gbcs); skuGbcRecord.setGbcField(JSONUtils.toJSONString(gbcField)); skuGbcRecord.setGbcType(gbcType); skuGbcRecord.setCreateUser(skuEditBasic.getCreateUser()); skuGbcRecord.setCreateTime(LocalDateTime.now()); skuGbcRecord.setIsDel(0); skuGbcRecordMapper.insert(skuGbcRecord); log.info("侵权的sku===================>"+skuEditBasic.getSku()); } } }
上面是业务代码 , 需要做侵权排查的有6个字段 , 每次都需要去调用另外一个服务的接口,通过返回值判断是否有侵权,若是侵权插入侵权排查表。
启动项目
执行定时任务
后台抛出异常
... ... ... ... ... Exception in thread "pool-16-thread-12" com.netflix.hystrix.exception.HystrixRuntimeException: ElasticsearchFeignClient#listKeyword(JSONObject) could not be queued for execution and no fallback available. at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:822) at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:807) at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError(OperatorOnErrorResumeNextViaFunction.java:140) at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87) at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87) at com.netflix.hystrix.AbstractCommand$DeprecatedOnFallbackHookApplication$1.onError(AbstractCommand.java:1472) at com.netflix.hystrix.AbstractCommand$FallbackHookApplication$1.onError(AbstractCommand.java:1397) at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87) at rx.observers.Subscribers$5.onError(Subscribers.java:230) at rx.internal.operators.OnSubscribeThrow.call(OnSubscribeThrow.java:44) at rx.internal.operators.OnSubscribeThrow.call(OnSubscribeThrow.java:28) at rx.Observable.unsafeSubscribe(Observable.java:10327) at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:51) at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35) at rx.Observable.unsafeSubscribe(Observable.java:10327) at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41) at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) at rx.Observable.unsafeSubscribe(Observable.java:10327) at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41) at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30) at rx.Observable.unsafeSubscribe(Observable.java:10327) at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41) at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) at rx.Observable.unsafeSubscribe(Observable.java:10327) at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41) at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) at rx.Observable.unsafeSubscribe(Observable.java:10327) at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$4.onError(OperatorOnErrorResumeNextViaFunction.java:142) at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87) at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onError(OnSubscribeDoOnEach.java:87) at com.netflix.hystrix.AbstractCommand$HystrixObservableTimeoutOperator$3.onError(AbstractCommand.java:1194) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:54) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) at rx.Observable.unsafeSubscribe(Observable.java:10327) at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41) at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30) at rx.Observable.unsafeSubscribe(Observable.java:10327) at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41) at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) at rx.Observable.unsafeSubscribe(Observable.java:10327) at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41) at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30) at rx.Observable.unsafeSubscribe(Observable.java:10327) at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41) at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30) at rx.Observable.unsafeSubscribe(Observable.java:10327) at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41) at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) at rx.Observable.unsafeSubscribe(Observable.java:10327) at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:51) at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35) at rx.Observable.unsafeSubscribe(Observable.java:10327) at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48) at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33) at rx.Observable.unsafeSubscribe(Observable.java:10327) at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41) at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) at rx.Observable.unsafeSubscribe(Observable.java:10327) at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41) at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30) at rx.Observable.unsafeSubscribe(Observable.java:10327) at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:51) at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30) at rx.Observable.subscribe(Observable.java:10423) at rx.Observable.subscribe(Observable.java:10390) at rx.internal.operators.BlockingOperatorToFuture.toFuture(BlockingOperatorToFuture.java:51) at rx.observables.BlockingObservable.toFuture(BlockingObservable.java:410) at com.netflix.hystrix.HystrixCommand.queue(HystrixCommand.java:378) at com.netflix.hystrix.HystrixCommand.execute(HystrixCommand.java:344) at feign.hystrix.HystrixInvocationHandler.invoke(HystrixInvocationHandler.java:159) at com.sun.proxy.$Proxy216.listKeyword(Unknown Source) at com.ebuy.cloud.service.product.serviceproduct.service.jobhandler.YibaiProdSkuEditBasicJobHandler.getSkuEditBasic(YibaiProdSkuEditBasicJobHandler.java:107) at com.ebuy.cloud.service.product.serviceproduct.service.jobhandler.YibaiProdSkuEditBasicJobHandler.access$000(YibaiProdSkuEditBasicJobHandler.java:36) at com.ebuy.cloud.service.product.serviceproduct.service.jobhandler.YibaiProdSkuEditBasicJobHandler$1.run(YibaiProdSkuEditBasicJobHandler.java:74) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@6e7c096d rejected from java.util.concurrent.ThreadPoolExecutor@4f5ff722[Running, pool size = 10, active threads = 10, queued tasks = 0, completed tasks = 0] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112) at com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler$ThreadPoolWorker.schedule(HystrixContextScheduler.java:172) at com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler$HystrixContextSchedulerWorker.schedule(HystrixContextScheduler.java:106) at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:50) at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:30) at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48) ... 53 more
在网上搜了一大堆关于此问题的解决方案都没能解决这个问题,最终在项目组的大佬指引和查找资料,解决此问题。
解决方案:
1.关闭熔断 也就是将feign.hystrix.enabled属性改为false
2.在声明线程池的时候大小改为5,ExecutorService executor = Executors.newFixedThreadPool(5)
3.出现此问题的原因还是在于熔断器的默认属性 , 需对熔断器的属性进行修改,熔断器配置如下
feign.hystrix.enabled=true # 设置HystrixCommand.run()的执行是否有超时限制 hystrix.command.default.execution.timeout.enabled=true hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds=60000 # 设置熔断器失败的个数,默认为20个 hystrix.command.default.circuitBreaker.requestVolumeThreshold=1000 #设置feign的核心线程池的数量 hystrix.threadpool.default.coreSize=50 # 设置回退的最大线程数 hystrix.command.default.fallback.isolation.semaphore.maxConcurrentRequests=25 ribbon.ReadTimeout=10000 ribbon.ConnectTimeout=9000
出于业务考虑我选择第三种方案,解决此问题。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。