赞
踩
相当于一个备忘录,感觉现在记忆不好了,自己做的过段时间可能就记不清楚了,所以写个笔记备忘一下
需要在Elasticsearch的配置文件elasticsearch.yml中添加如下配置:
script.engine.groovy.inline.update: on
/** 更新car脚本 */
if (ctx._source.car_target_id==null) {
ctx._source.car_target_id=car_target_id;
} else {
ctx._source.car_target_id+=car_target_id;
}
/** 跟新item脚本 */
if (ctx._source.item==null) {
ctx._source.item=item;
} else {
ctx._source.item+=item;
}
/** * IndexConstants * * @author littlehow * @time 2017-06-28 11:03 */ public abstract class IndexConstants { /** 索引类型 */ public static final String COMMON_INDEX_TYPE = "behavior"; /** 索引名称 */ public static final String INDEX_NAME_BEHAVIOR_LOG = "behavior_log"; public static final String INDEX_NAME_BEHAVIOR_STATISTIC = "behavior_statistic"; } /** * BehaviorLogIndex * * @author littlehow * @time 2017-06-27 17:52 */ public enum BehaviorLogIndex { trace_id("trace_id"),//索引id description("description"),//描述 terminal_type("terminal_type"),//终端类型 person_id("person_id"),//调用者身份标识 page_no("page_no"),//页码,如果分页的话需要知道当前页码 product_ids("product_ids"),//产品结果集id,number数组 product_info("product_info"),//产品信息object数组 item("item"),//商品信息 object数组 product_id("product_id"),//产品id item_info("item_info"),//商品信息object数组 result_id("result_id"),//结果信息 product_info.result_id,item_info.result_id label("label"),//标签信息string数组 product_info.label, item_info.label car_target_id("car_target_id"),//加入购物车的id number数组 order_target_id("order_target_id"),//下单的id number数组 keywords("keywords"),//关键字 real_search_words("real_search_words"),//可能经过了纠错 interface_name("interface_name"),//调用接口 call_time("call_time"),//调用时间 last_data_time("last_update_time"),//最后更新数据的数据时间 ; public final String value; BehaviorLogIndex(String value) { this.value = value; } }
/** * ESOperationDao * * @author littlehow * @time 2017-06-27 17:49 */ @Repository public class ESOperationDao implements InitializingBean { //日志记录 private Logger logger = LoggerFactory.getLogger(this.getClass()); @Autowired private ESClientFactory esClientFactory; private BulkProcessor bulkProcessor; //加入购物车脚本 private final String updateCarTransScript = "if (ctx._source."+ car_target_id.value +"==null) {ctx._source." + car_target_id.value + "=" + car_target_id.value + ";}else{ctx._source."+ car_target_id.value +"+=" + car_target_id.value + ";}"; //加入商品信息 private final String updateItemScript = "if (ctx._source." + item.value + "==null) {ctx._source." + item.value + "=" + item + ";}else{ctx._source." + item.value + "+=" + item.value + ";}"; /** * 插入更新索引数据 * @param indexMapList */ public void upsertBehaviorLogMapList(List<Map<String, Object>> indexMapList) { if (indexMapList == null || indexMapList.size() == 0) { return; } Client client = esClientFactory.getClient(); BulkRequestBuilder bulkRequestBuilder = client.prepareBulk(); for (Map<String, Object> indexMap : indexMapList) { String id = indexMap.get(trace_id.value).toString(); try { bulkRequestBuilder.add(new UpdateRequest(INDEX_NAME_BEHAVIOR_LOG, COMMON_INDEX_TYPE, id).docAsUpsert(true).doc(indexMap)); } catch (Throwable t) { t.printStackTrace(); } logger.info("upsert index[{}] 's doc; id={}", INDEX_NAME_BEHAVIOR_LOG, id); if(logger.isDebugEnabled()){ logger.debug(indexMap.toString()); } } final int num = indexMapList.size(); bulkRequestBuilder.execute(new ActionListener<BulkResponse>() { @Override public void onResponse(BulkResponse bulkItemResponses) { if (logger.isDebugEnabled()) { logger.debug("num [{}] bulk success!!!", num); } } @Override public void onFailure(Throwable e) { logger.error("bulk num [{}] , error msg : {}", num, e.getLocalizedMessage()); } }); } /** * 执行日志插入 * @param indexMap */ public void upsertBehaviorLogMap(Map<String, Object> indexMap) { String id = indexMap.get(trace_id.value).toString(); logger.info(INDEX_NAME_BEHAVIOR_LOG + " upsert id " + id); bulkProcessor.add(new UpdateRequest(INDEX_NAME_BEHAVIOR_LOG, COMMON_INDEX_TYPE, id).docAsUpsert(true).doc(indexMap)); } /** * 加入购物车转换 * @param paramMap -- 脚本修改参数 */ public void upsertBehaviorCarTrans(Map<String, Object> paramMap) { String id = (String)paramMap.get(trace_id.value); logger.info(INDEX_NAME_BEHAVIOR_LOG + " upsert id " + id); UpdateRequest updateRequest = new UpdateRequest(INDEX_NAME_BEHAVIOR_LOG, COMMON_INDEX_TYPE, id); updateRequest.script(new Script(updateCarTransScript, ScriptService.ScriptType.INLINE, null, paramMap)); bulkProcessor.add(updateRequest); } /** * 商品信息存储 * @param paramMap */ public void upsertBehaviorItem(Map<String, Object> paramMap) { String id = (String)paramMap.get(trace_id.value); logger.info(INDEX_NAME_BEHAVIOR_LOG + " upsert id " + id); UpdateRequest updateRequest = new UpdateRequest(INDEX_NAME_BEHAVIOR_LOG, COMMON_INDEX_TYPE, id); updateRequest.script(new Script(updateItemScript, ScriptService.ScriptType.INLINE, null, paramMap)); bulkProcessor.add(updateRequest); } /** * 主动刷新processor,将数据存入es */ public void flushBulkProcessor() { //该方法被标记为synchronized bulkProcessor.flush(); } /** * 创建产品索引 * @return */ public void createBehaviorLogIndex() { Client client = esClientFactory.getClient(); //索引名称 IndicesExistsRequest indicesExistsRequest = new IndicesExistsRequest(INDEX_NAME_BEHAVIOR_LOG); if(!client.admin().indices().exists(indicesExistsRequest).actionGet().isExists()) { //索引不存在则创建索引 String indexSource = null; try { indexSource = getBehaviorLogIndexSource(); logger.info(indexSource); } catch (IOException e) { logger.error("建立索引失败:" + INDEX_NAME_BEHAVIOR_LOG, e); throw new IllegalStateException(e.getMessage()); } client.admin().indices().prepareCreate(INDEX_NAME_BEHAVIOR_LOG).addMapping(COMMON_INDEX_TYPE, indexSource).get(); logger.info("建立索引成功" + INDEX_NAME_BEHAVIOR_LOG); } } /** * 删除索引信息 */ public void removeBehaviorLogIndex() { Client client = esClientFactory.getClient(); if (client.admin().indices().exists(new IndicesExistsRequest(INDEX_NAME_BEHAVIOR_LOG)).actionGet().isExists()) { logger.info("删除索引准备:" + INDEX_NAME_BEHAVIOR_LOG); client.admin().indices().prepareDelete(INDEX_NAME_BEHAVIOR_LOG).get(); logger.info("删除索引成功:" + INDEX_NAME_BEHAVIOR_LOG); } } /** * 行为日志索引的source * * @return * @throws IOException */ private String getBehaviorLogIndexSource() throws IOException { try (XContentBuilder builder = XContentFactory.jsonBuilder()) { builder.startObject().field(COMMON_INDEX_TYPE).startObject().field("properties").startObject(); builder.field(trace_id.value).startObject().field("type", "string").field("index", "not_analyzed").endObject(); builder.field(description.value).startObject().field("type", "string").field("index", "not_analyzed").endObject(); builder.field(terminal_type.value).startObject().field("type", "string").field("index", "not_analyzed").endObject(); builder.field(person_id.value).startObject().field("type", "string").field("index", "not_analyzed").endObject(); builder.field(page_no.value).startObject().field("type", "integer").field("index", "not_analyzed").endObject(); builder.field(product_ids.value).startObject().field("type", "integer").field("index", "not_analyzed").endObject(); // ------------ product_info嵌套类型start builder.field(product_info.value).startObject().field("type", "nested").field("properties").startObject(); builder.field(result_id.value).startObject().field("type", "integer").field("index", "not_analyzed").endObject(); builder.field(label.value).startObject().field("type", "string").field("index", "not_analyzed").endObject(); builder.endObject().endObject(); // ------------- product_info嵌套类型end // ------------ item嵌套类型start builder.field(item.value).startObject().field("type", "nested").field("properties").startObject(); builder.field(product_id.value).startObject().field("type", "integer").field("index", "not_analyzed").endObject(); // ------------ item_info嵌套类型start builder.field(item_info.value).startObject().field("type", "nested").field("properties").startObject(); builder.field(result_id.value).startObject().field("type", "integer").field("index", "not_analyzed").endObject(); builder.field(label.value).startObject().field("type", "string").field("index", "not_analyzed").endObject(); builder.endObject().endObject(); // ------------- item_info嵌套类型end builder.endObject().endObject(); // ------------ item嵌套类型end builder.field(car_target_id.value).startObject().field("type", "integer").field("index", "not_analyzed").endObject(); builder.field(order_target_id.value).startObject().field("type", "integer").field("index", "not_analyzed").endObject(); builder.field(keywords.value).startObject().field("type", "string").field("index", "not_analyzed").endObject(); builder.field(real_search_words.value).startObject().field("type", "string").field("index", "not_analyzed").endObject(); builder.field(interface_name.value).startObject().field("type", "string").field("index", "not_analyzed").endObject(); builder.field(call_time.value).startObject().field("type", "date").field("index", "not_analyzed").endObject(); builder.field(last_data_time.value).startObject().field("type", "date").field("index", "not_analyzed").endObject(); builder.endObject();//第一个properties的闭合 builder.startObject("_all").field("enabled", false).endObject(); builder.endObject();//索引类型的闭合 builder.endObject();//整个json的闭合 return builder.string(); } catch (IOException e) { logger.error("拼接索引映射信息失败:" + e.getMessage()); throw e; } } @Override public void afterPropertiesSet() throws Exception { bulkProcessor = BulkProcessor.builder(esClientFactory.getClient(), new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { logger.info("执行bulk开始executionId=" + executionId + ", time=" + System.currentTimeMillis()); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { logger.error("执行刷入失败executionId[" + executionId + "]", failure); } }).setBulkActions(500)//设置500请求进行刷入 .setConcurrentRequests(1)//设置并发支持 .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(200L), 3))//设置失败后每隔200毫秒执行一次,最多重试3次 .build(); } }
/** * EsOperationTest * * @author littlehow * @time 2017-06-29 14:01 */ @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = { "classpath*:spring/ApplicationContext-beans-search-base.xml", "classpath*:spring/ApplicationContext-elasticsearch.xml", "classpath*:spring/log/ApplicationContext-call-log.xml" }) public class EsOperationTest { @Autowired private DataOperationService dataOperationService; @Before public void initIndex() { dataOperationService.createBehaviorLogIndex(); } @Test public void test() { CommonLog commonLog = getCommonLog(); dataOperationService.upsertProductLog(commonLog); dataOperationService.upsertCarTransLog(16, commonLog.getMonitorTraceId()); dataOperationService.upsertItemLog(getItemLogDto(12, commonLog.getMonitorTraceId(), 1012, 1014)); dataOperationService.upsertItemLog(getItemLogDto(14, commonLog.getMonitorTraceId(), 1018, 1021)); //加入商品信息 dataOperationService.flushUpsertData(); } private ItemLogDto getItemLogDto(int productId, String traceId, int start, int end) { ItemLogDto itemLogDto = new ItemLogDto(); itemLogDto.setMonitorTraceId(traceId); itemLogDto.setProductId(productId); List<ItemDataDto> itemDataDtos = new ArrayList<>(); for (int i = start; i < end; i++) { itemDataDtos.add(getItemDataDto(i)); } itemLogDto.setItemDataDtos(itemDataDtos); return itemLogDto; } private ItemDataDto getItemDataDto(int itemId) { ItemDataDto itemDataDto = new ItemDataDto(); itemDataDto.setItemId(itemId); Set<String> set = new HashSet<>(); ItemLabel[] itemLabels = ItemLabel.values(); Random random = new Random(); set.add(itemLabels[random.nextInt(itemLabels.length)].value); set.add(itemLabels[random.nextInt(itemLabels.length)].value); itemDataDto.setLabel(new ArrayList<>(set)); return itemDataDto; } /** * 获取日志信息 * @return */ private CommonLog getCommonLog() { CommonLog commonLog = new CommonLog(); commonLog.setDescription("hello"); commonLog.setInterfaceName("ProductService.productSearch"); commonLog.setKeywords("阿莫西林"); commonLog.setMonitorTraceId("1313"); commonLog.setPageNo(1); commonLog.setPersonId("123123"); commonLog.setTerminalType("zhongduanbao"); commonLog.setTime(System.currentTimeMillis()); commonLog.setType(CollectionType.PRODUCT_SEARCH.value); List<OperateResult> operateResults = new ArrayList<>(); operateResults.add(getResult(12, ItemLabel.CONTROL_SALE.value, ItemLabel.ELECTRONIC_INVOICE.value)); operateResults.add(getResult(14, ItemLabel.NEAR_EXPIRY.value, ItemLabel.ELECTRONIC_INVOICE.value)); commonLog.setResults(operateResults); return commonLog; } /** * 获取结果集 * @param id * @param labels * @return */ private OperateResult getResult(Integer id, String ...labels) { OperateResult operateResult = new OperateResult(); operateResult.setTargetId(id); operateResult.setLabel(Arrays.asList(labels)); return operateResult; } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。