当前位置:   article > 正文

ElasticSearch中局部更新操作(工作笔记)_elasticsearchrepository 更新操作

elasticsearchrepository 更新操作

写在前面

相当于一个备忘录,感觉现在记忆不好了,自己做的过段时间可能就记不清楚了,所以写个笔记备忘一下

原文章链接

ES方面配置

需要在Elasticsearch的配置文件elasticsearch.yml中添加如下配置:
script.engine.groovy.inline.update: on

功能实现

groovy脚本

  • car_target_id和item都为数组,在脚本中数据的+=运算相当于Array的push操作
/** 更新car脚本 */
if (ctx._source.car_target_id==null)        {
    ctx._source.car_target_id=car_target_id;
} else {
    ctx._source.car_target_id+=car_target_id;
}
/** 跟新item脚本 */
if (ctx._source.item==null) {
    ctx._source.item=item;
} else {
    ctx._source.item+=item;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

主要功能代码

  • 索引创建与删除
  • BulkProcessor做异步批量提交使用
/**
 * IndexConstants
 *
 * @author littlehow
 * @time 2017-06-28 11:03
 */
public abstract class IndexConstants {
    /** 索引类型 */
    public static final String COMMON_INDEX_TYPE = "behavior";
    /** 索引名称 */
    public static final String INDEX_NAME_BEHAVIOR_LOG = "behavior_log";
    public static final String INDEX_NAME_BEHAVIOR_STATISTIC = "behavior_statistic";
}

/**
 * BehaviorLogIndex
 *
 * @author littlehow
 * @time 2017-06-27 17:52
 */
public enum BehaviorLogIndex {
    trace_id("trace_id"),//索引id
    description("description"),//描述
    terminal_type("terminal_type"),//终端类型
    person_id("person_id"),//调用者身份标识
    page_no("page_no"),//页码,如果分页的话需要知道当前页码
    product_ids("product_ids"),//产品结果集id,number数组
    product_info("product_info"),//产品信息object数组
    item("item"),//商品信息 object数组
    product_id("product_id"),//产品id
    item_info("item_info"),//商品信息object数组
    result_id("result_id"),//结果信息 product_info.result_id,item_info.result_id
    label("label"),//标签信息string数组 product_info.label, item_info.label
    car_target_id("car_target_id"),//加入购物车的id number数组
    order_target_id("order_target_id"),//下单的id number数组
    keywords("keywords"),//关键字
    real_search_words("real_search_words"),//可能经过了纠错
    interface_name("interface_name"),//调用接口
    call_time("call_time"),//调用时间
    last_data_time("last_update_time"),//最后更新数据的数据时间
    ;
    public final String value;

    BehaviorLogIndex(String value) {
        this.value = value;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 索引建立以及操作类
/**
 * ESOperationDao
 *
 * @author littlehow
 * @time 2017-06-27 17:49
 */
@Repository
public class ESOperationDao implements InitializingBean {
    //日志记录
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    @Autowired
    private ESClientFactory esClientFactory;
    private BulkProcessor bulkProcessor;
    //加入购物车脚本
    private final String updateCarTransScript = "if (ctx._source."+ car_target_id.value +"==null) {ctx._source." + car_target_id.value + "="
            + car_target_id.value + ";}else{ctx._source."+ car_target_id.value +"+=" + car_target_id.value + ";}";
    //加入商品信息
    private final String updateItemScript = "if (ctx._source." + item.value + "==null) {ctx._source." + item.value + "="
            + item + ";}else{ctx._source." + item.value + "+=" + item.value + ";}";

    /**
     * 插入更新索引数据
     * @param indexMapList
     */
    public void upsertBehaviorLogMapList(List<Map<String, Object>> indexMapList) {
        if (indexMapList == null || indexMapList.size() == 0) {
            return;
        }
        Client client = esClientFactory.getClient();
        BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
        for (Map<String, Object> indexMap : indexMapList) {
            String id = indexMap.get(trace_id.value).toString();
            try {
                bulkRequestBuilder.add(new UpdateRequest(INDEX_NAME_BEHAVIOR_LOG, COMMON_INDEX_TYPE, id).docAsUpsert(true).doc(indexMap));
            } catch (Throwable t) {
                t.printStackTrace();
            }
            logger.info("upsert index[{}] 's doc; id={}", INDEX_NAME_BEHAVIOR_LOG, id);
            if(logger.isDebugEnabled()){
                logger.debug(indexMap.toString());
            }
        }
        final int num = indexMapList.size();
        bulkRequestBuilder.execute(new ActionListener<BulkResponse>() {
            @Override
            public void onResponse(BulkResponse bulkItemResponses) {
                if (logger.isDebugEnabled()) {
                    logger.debug("num [{}] bulk success!!!", num);
                }
            }

            @Override
            public void onFailure(Throwable e) {
                logger.error("bulk num [{}] , error msg : {}", num, e.getLocalizedMessage());
            }
        });
    }

    /**
     * 执行日志插入
     * @param indexMap
     */
    public void upsertBehaviorLogMap(Map<String, Object> indexMap) {
        String id = indexMap.get(trace_id.value).toString();
        logger.info(INDEX_NAME_BEHAVIOR_LOG + " upsert id " + id);
        bulkProcessor.add(new UpdateRequest(INDEX_NAME_BEHAVIOR_LOG, COMMON_INDEX_TYPE, id).docAsUpsert(true).doc(indexMap));
    }

    /**
     * 加入购物车转换
     * @param paramMap -- 脚本修改参数
     */
    public void upsertBehaviorCarTrans(Map<String, Object> paramMap) {
        String id = (String)paramMap.get(trace_id.value);
        logger.info(INDEX_NAME_BEHAVIOR_LOG + " upsert id " + id);
        UpdateRequest updateRequest = new UpdateRequest(INDEX_NAME_BEHAVIOR_LOG, COMMON_INDEX_TYPE, id);
        updateRequest.script(new Script(updateCarTransScript, ScriptService.ScriptType.INLINE, null, paramMap));
        bulkProcessor.add(updateRequest);
    }

    /**
     * 商品信息存储
     * @param paramMap
     */
    public void upsertBehaviorItem(Map<String, Object> paramMap) {
        String id = (String)paramMap.get(trace_id.value);
        logger.info(INDEX_NAME_BEHAVIOR_LOG + " upsert id " + id);
        UpdateRequest updateRequest = new UpdateRequest(INDEX_NAME_BEHAVIOR_LOG, COMMON_INDEX_TYPE, id);
        updateRequest.script(new Script(updateItemScript, ScriptService.ScriptType.INLINE, null, paramMap));
        bulkProcessor.add(updateRequest);
    }

    /**
     * 主动刷新processor,将数据存入es
     */
    public void flushBulkProcessor() {
        //该方法被标记为synchronized
        bulkProcessor.flush();
    }

    /**
     * 创建产品索引
     * @return
     */
    public void createBehaviorLogIndex() {
        Client client = esClientFactory.getClient();
        //索引名称
        IndicesExistsRequest indicesExistsRequest = new IndicesExistsRequest(INDEX_NAME_BEHAVIOR_LOG);
        if(!client.admin().indices().exists(indicesExistsRequest).actionGet().isExists()) {
            //索引不存在则创建索引
            String indexSource = null;
            try {
                indexSource = getBehaviorLogIndexSource();
                logger.info(indexSource);
            } catch (IOException e) {
                logger.error("建立索引失败:" + INDEX_NAME_BEHAVIOR_LOG, e);
                throw new IllegalStateException(e.getMessage());
            }
            client.admin().indices().prepareCreate(INDEX_NAME_BEHAVIOR_LOG).addMapping(COMMON_INDEX_TYPE, indexSource).get();
            logger.info("建立索引成功" + INDEX_NAME_BEHAVIOR_LOG);
        }
    }

    /**
     * 删除索引信息
     */
    public void removeBehaviorLogIndex() {
        Client client = esClientFactory.getClient();
        if (client.admin().indices().exists(new IndicesExistsRequest(INDEX_NAME_BEHAVIOR_LOG)).actionGet().isExists()) {
            logger.info("删除索引准备:" + INDEX_NAME_BEHAVIOR_LOG);
            client.admin().indices().prepareDelete(INDEX_NAME_BEHAVIOR_LOG).get();
            logger.info("删除索引成功:" + INDEX_NAME_BEHAVIOR_LOG);
        }
    }

    /**
     * 行为日志索引的source
     *
     * @return
     * @throws IOException
     */
    private String getBehaviorLogIndexSource() throws IOException {
        try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
            builder.startObject().field(COMMON_INDEX_TYPE).startObject().field("properties").startObject();
            builder.field(trace_id.value).startObject().field("type", "string").field("index", "not_analyzed").endObject();
            builder.field(description.value).startObject().field("type", "string").field("index", "not_analyzed").endObject();
            builder.field(terminal_type.value).startObject().field("type", "string").field("index", "not_analyzed").endObject();
            builder.field(person_id.value).startObject().field("type", "string").field("index", "not_analyzed").endObject();
            builder.field(page_no.value).startObject().field("type", "integer").field("index", "not_analyzed").endObject();
            builder.field(product_ids.value).startObject().field("type", "integer").field("index", "not_analyzed").endObject();
            // ------------ product_info嵌套类型start
            builder.field(product_info.value).startObject().field("type", "nested").field("properties").startObject();
            builder.field(result_id.value).startObject().field("type", "integer").field("index", "not_analyzed").endObject();
            builder.field(label.value).startObject().field("type", "string").field("index", "not_analyzed").endObject();
            builder.endObject().endObject();
            // ------------- product_info嵌套类型end
            // ------------ item嵌套类型start
            builder.field(item.value).startObject().field("type", "nested").field("properties").startObject();
            builder.field(product_id.value).startObject().field("type", "integer").field("index", "not_analyzed").endObject();
            // ------------ item_info嵌套类型start
            builder.field(item_info.value).startObject().field("type", "nested").field("properties").startObject();
            builder.field(result_id.value).startObject().field("type", "integer").field("index", "not_analyzed").endObject();
            builder.field(label.value).startObject().field("type", "string").field("index", "not_analyzed").endObject();
            builder.endObject().endObject();
            // ------------- item_info嵌套类型end
            builder.endObject().endObject();
            // ------------ item嵌套类型end
            builder.field(car_target_id.value).startObject().field("type", "integer").field("index", "not_analyzed").endObject();
            builder.field(order_target_id.value).startObject().field("type", "integer").field("index", "not_analyzed").endObject();
            builder.field(keywords.value).startObject().field("type", "string").field("index", "not_analyzed").endObject();
            builder.field(real_search_words.value).startObject().field("type", "string").field("index", "not_analyzed").endObject();
            builder.field(interface_name.value).startObject().field("type", "string").field("index", "not_analyzed").endObject();
            builder.field(call_time.value).startObject().field("type", "date").field("index", "not_analyzed").endObject();
            builder.field(last_data_time.value).startObject().field("type", "date").field("index", "not_analyzed").endObject();
            builder.endObject();//第一个properties的闭合
            builder.startObject("_all").field("enabled", false).endObject();
            builder.endObject();//索引类型的闭合
            builder.endObject();//整个json的闭合
            return builder.string();
        } catch (IOException e) {
            logger.error("拼接索引映射信息失败:" + e.getMessage());
            throw e;
        }
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        bulkProcessor = BulkProcessor.builder(esClientFactory.getClient(), new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId, BulkRequest request) {
                logger.info("执行bulk开始executionId=" + executionId + ", time=" + System.currentTimeMillis());
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                logger.error("执行刷入失败executionId[" + executionId + "]", failure);
            }
        }).setBulkActions(500)//设置500请求进行刷入
          .setConcurrentRequests(1)//设置并发支持
          .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(200L), 3))//设置失败后每隔200毫秒执行一次,最多重试3次
          .build();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 简单测试代码
/**
 * EsOperationTest
 *
 * @author littlehow
 * @time 2017-06-29 14:01
 */
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {
        "classpath*:spring/ApplicationContext-beans-search-base.xml",
        "classpath*:spring/ApplicationContext-elasticsearch.xml",
        "classpath*:spring/log/ApplicationContext-call-log.xml"
})
public class EsOperationTest {
    @Autowired
    private DataOperationService dataOperationService;

    @Before
    public void initIndex() {
        dataOperationService.createBehaviorLogIndex();
    }

    @Test
    public void test() {
        CommonLog commonLog = getCommonLog();
        dataOperationService.upsertProductLog(commonLog);
        dataOperationService.upsertCarTransLog(16, commonLog.getMonitorTraceId());
        dataOperationService.upsertItemLog(getItemLogDto(12, commonLog.getMonitorTraceId(), 1012, 1014));
        dataOperationService.upsertItemLog(getItemLogDto(14, commonLog.getMonitorTraceId(), 1018, 1021));
        //加入商品信息
        dataOperationService.flushUpsertData();
    }

    private ItemLogDto getItemLogDto(int productId, String traceId, int start, int end) {
        ItemLogDto itemLogDto = new ItemLogDto();
        itemLogDto.setMonitorTraceId(traceId);
        itemLogDto.setProductId(productId);
        List<ItemDataDto> itemDataDtos = new ArrayList<>();
        for (int i = start; i < end; i++) {
            itemDataDtos.add(getItemDataDto(i));
        }
        itemLogDto.setItemDataDtos(itemDataDtos);
        return itemLogDto;
    }

    private ItemDataDto getItemDataDto(int itemId) {
        ItemDataDto itemDataDto = new ItemDataDto();
        itemDataDto.setItemId(itemId);
        Set<String> set = new HashSet<>();
        ItemLabel[] itemLabels = ItemLabel.values();
        Random random = new Random();
        set.add(itemLabels[random.nextInt(itemLabels.length)].value);
        set.add(itemLabels[random.nextInt(itemLabels.length)].value);
        itemDataDto.setLabel(new ArrayList<>(set));
        return itemDataDto;
    }

    /**
     * 获取日志信息
     * @return
     */
    private CommonLog getCommonLog() {
        CommonLog commonLog = new CommonLog();
        commonLog.setDescription("hello");
        commonLog.setInterfaceName("ProductService.productSearch");
        commonLog.setKeywords("阿莫西林");
        commonLog.setMonitorTraceId("1313");
        commonLog.setPageNo(1);
        commonLog.setPersonId("123123");
        commonLog.setTerminalType("zhongduanbao");
        commonLog.setTime(System.currentTimeMillis());
        commonLog.setType(CollectionType.PRODUCT_SEARCH.value);
        List<OperateResult> operateResults = new ArrayList<>();
        operateResults.add(getResult(12, ItemLabel.CONTROL_SALE.value, ItemLabel.ELECTRONIC_INVOICE.value));
        operateResults.add(getResult(14, ItemLabel.NEAR_EXPIRY.value, ItemLabel.ELECTRONIC_INVOICE.value));
        commonLog.setResults(operateResults);
        return commonLog;
    }

    /**
     * 获取结果集
     * @param id
     * @param labels
     * @return
     */
    private OperateResult getResult(Integer id, String ...labels) {
        OperateResult operateResult = new OperateResult();
        operateResult.setTargetId(id);
        operateResult.setLabel(Arrays.asList(labels));
        return operateResult;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/68863
推荐阅读
相关标签
  

闽ICP备14008679号