赞
踩
protected RestHighLevelClient restHighLevelClient;
//调用ES API根据自己的索引名indexName创建CreateIndexRequest对象
final CreateIndexRequest request = new CreateIndexRequest(indexName);
//创建索引的返回结果
CreateIndexResponse response = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
// 为索引设置一个别名
request.alias(new Alias("person_alias"))
//索引已经存在,报错
if (!response.isAcknowledged()) {
throw CommonError.ES_ERROR.exception();
}
protected RestHighLevelClient restHighLevelClient;
//根据索引名获取GetIndextRequest对象
final GetIndexRequest request = new GetIndexRequest.indices(indexName);
boolean isExists = restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
/** * 删除索引 */ public void deleteIndex() { protected RestHighLevelClient client; DeleteIndexRequest request = new DeleteIndexRequest("person_index"); AcknowledgedResponse deleteIndexResponse; try { deleteIndexResponse = client.indices().delete(request, RequestOptions.DEFAULT); boolean acknowledged = deleteIndexResponse.isAcknowledged(); System.out.println("deleteIndex:"+acknowledged); } catch (IOException e) { e.printStackTrace(); } }
/** * 打开索引 */ @Test public void test003openIndex() { protected RestHighLevelClient client; OpenIndexRequest request = new OpenIndexRequest("person_index"); try { OpenIndexResponse openIndexResponse = client.indices().open(request, RequestOptions.DEFAULT); boolean acknowledged = openIndexResponse.isAcknowledged(); System.out.println("openIndex:"+acknowledged); } catch (IOException e) { e.printStackTrace(); }
打开索引
/** * 打开索引 */ @Test public void test003openIndex() { protected RestHighLevelClient client; OpenIndexRequest request = new OpenIndexRequest("person_index"); try { OpenIndexResponse openIndexResponse = client.indices().open(request, RequestOptions.DEFAULT); boolean acknowledged = openIndexResponse.isAcknowledged(); System.out.println("openIndex:"+acknowledged); } catch (IOException e) { e.printStackTrace(); } }
关闭索引
/*** * 关闭索引 */ @Test public void test004closeIndex() { protected RestHighLevelClient client; CloseIndexRequest request = new CloseIndexRequest("person_index"); try { CloseIndexResponse closeIndexResponse = client.indices().close(request, RequestOptions.DEFAULT); boolean acknowledged = closeIndexResponse.isAcknowledged(); System.out.println("closeIndex:"+acknowledged); test003openIndex(); } catch (IOException e) { e.printStackTrace(); }
protected RestHighLevelClient client; //这里的参数indexName是索引名 IndexRequest indexRequest = new IndexRequest(indexName); //设置id indexRequest = indexRequest.id("id"); //将要插入的数据传入 indexRequest = indexRequest.source(JSON.toJSONString(alarmMessagePo), XContentType.JSON); //或者传入map indexRequest = indexRequest.source(new HashMap<>()); /* * ----------设置ES插入后的刷新策略------------ * RefreshPolicy#IMMEDIATE: 请求向ElasticSearch提交了数据,立即进行数据刷新,然后再结束请求。 优点:实时性高、操作延时短。 缺点:资源消耗高。 RefreshPolicy#WAIT_UNTIL: 请求向ElasticSearch提交了数据,等待数据完成刷新,然后再结束请求。 优点:实时性高、操作延时长。 缺点:资源消耗低。 RefreshPolicy#NONE: 默认策略。 请求向ElasticSearch提交了数据,不关系数据是否已经完成刷新,直接结束请求。 优点:操作延时短、资源消耗低。 缺点:实时性低。 * */ indexRequest = indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); //将上面的参数合在一起就变成下面的式子 Requests.indexRequest(indexName).id(alarmMessagePo.getCaseNumber()) .source(JSON.toJSONString(alarmMessagePo), XContentType.JSON) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
protected RestHighLevelClient restHighLevelClient; /** 构造批量插入Request **/ final BulkRequest request = new BulkRequest(); //indexRequest为IndexRequest对象 request.add(indexRequest1); request.add(indexRequest2); request.add(indexRequest3); request.add(indexRequest4); //设置插入后的刷新策略 request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); /** 执行批量插入并获取返回结果 **/ final BulkResponse response = restHighLevelClient.bulk(request, RequestOptions.DEFAULT); //判断执行结果是否有执行失败的条目 if (response.hasFailures()) { for (final BulkItemResponse itemResponse : response) { //表示这条执行结果失败 if (itemResponse.isFailed()) { //执行相应的业务 } } }
同插入
protected RestHighLevelClient restHighLevelClient;
//创建deleteRequest对象,indexName为索引值
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indexName);
//设置删除的条件,CASE_KEY为筛选的字段
deleteByQueryRequest.setQuery(QueryBuilders.termQuery(CASE_KEY, caseNumber));
//执行删除
restHighLevelClient.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
-------------------------------------------------------------------------------
//下面是将上面的代码连在一起写
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indexName)
.setQuery(QueryBuilders.termQuery(CASE_KEY, caseNumber));
/** 删除 **/
restHighLevelClient.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
//创建查询request对象 SearchRequest request = Requests.searchRequest(indexName); //创建查询条件搜索对象(用来存查询的条件) SearchSourceBuilder builder = new SearchSourceBuilder(); //根据条件查询 /** * 使用QueryBuilder * termQuery("key", obj) 完全匹配 * termsQuery("key", obj1, obj2..) 一次匹配多个值 * matchQuery("key", Obj) 单个匹配, field不支持通配符, 前缀具高级特性 * multiMatchQuery("text", "field1", "field2"..); 匹配多个字段, field有通配符忒行 * matchAllQuery(); 匹配所有文件 * 组合查询 * must(QueryBuilders) : AND * mustNot(QueryBuilders): NOT * should: : OR */ builder.query(QueryBuilders.boolQuery().must(QueryBuilders.termQuery(CAUSE_ID_KEY, causeId)) .must(QueryBuilders.termQuery("user", "kimchy")) .mustNot(QueryBuilders.termQuery("message", "nihao")) .should(QueryBuilders.termQuery("gender", "male")); //request加入builder进行筛选 request.source(builder); //执行查询并返回执行结果 SearchResponse response = client.search(request, RequestOptions.DEFAULT); //查询结果 SearchHit[] result = response.response.getHits().getHits(); //查询结果的数量 long len = result.length; //根据查询结果解析成序列化对象(以下AlarmMessagePo为举例) List<AlarmMessagePo> alarmMessagePoList = Arrays.stream(response.getHits().getHits()) .map(hit -> JSON.parseObject(hit.getSourceAsString(), AlarmMessagePo.class)) //以下是举例 final SearchRequest request = Requests.searchRequest(indexName) .source(new SearchSourceBuilder() .query(QueryBuilders.boolQuery().must(QueryBuilders.termQuery(CAUSE_ID_KEY, causeId)) .must(QueryBuilders.rangeQuery(CREATE_TIME_KEY).gte(startTime).lte(endTime))));
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。