赞
踩
默认情况下,Elasticsearch集群中每个分片的搜索结果数量限制为10000。这是为了避免潜在的性能问题。
但是我们 在实际工作过程中时常会遇到 需要深度分页,以及查询批量数据更新的情况
在此方案中,我们建议仅限于测试用,生产禁用,毕竟当数据量大的时候,过大的数据量可能导致es的内存溢出,直接崩掉,一年绩效白干。
PUT wkl_test/_settings
{
"index":{
"max_result_window":2147483647
}
}
查看索引的 settings
重新查数据:
使用scroll API:scroll API可以帮助我们在不加载所有数据的情况下获取所有结果。它会在后台执行查询以获取滚动ID,并将其用于进行后续查询。这样就可以一次性获取所有结果,而不必担心限制
在游标方案中,我们只需要在第一次拿到游标id,之后通过游标就能唯一确定查询,在这个查询中通过我们指定的 size 移动游标,具体操作看看下面实操。
GET wkl_test/_search?scroll=5m
{
"query": {
"match_all": {}
},
"sort": [
{
"seq": {
"order": "asc"
}
}
],
"size": 200
}
@Test
public void testScroll(){
RestHighLevelClient restHighLevelClient ;
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.mustNot(QueryBuilders.existsQuery("seq"));
try {
//滚动查询的Scroll,设置请求滚动时间窗口时间
Scroll scroll = new Scroll(TimeValue.timeValueMillis(180000));
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
//加入query语句
sourceBuilder.query(boolQueryBuilder);
//每次滚动的长度
sourceBuilder.size(SIZE);
//加入排序字段
sourceBuilder.sort("id", SortOrder.DESC);
//构建searchRequest
//加入scroll和构造器
SearchRequest searchRequest = new SearchRequest()
.indices("wkl_test")
.source(sourceBuilder)
.scroll(scroll);
//存储scroll的list
List<String> scrollIdList = new ArrayList<>();
//执行首次检索
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
//首次检索返回scrollId,用于下一次的滚动查询
String scrollId = searchResponse.getScrollId();
//拿到hits结果
SearchHit[] hits = searchResponse.getHits().getHits();
long value = searchResponse.getHits().getTotalHits().value;
//保存返回结果List大小
Long resultSize = 0L;
scrollIdList.add(scrollId);
try {
//滚动查询将SearchHit封装到result中
while (ArrayUtils.isNotEmpty(hits) && hits.length > 0) {
BulkRequest bulkRequest = new BulkRequest();
JSONArray esArray = new JSONArray();
for (SearchHit hit : hits) {
String sourceAsString = hit.getSourceAsString();
String index = hit.getIndex();
JSONObject jsonObject = JSONObject.parseObject(sourceAsString);
String seq = jsonObject.getString("seq");
if(StringUtils.isBlank(seq) ){
esArray.add(jsonObject);
String uuid = jsonObject.getString("id");
jsonObject.put("is_del",1);
bulkRequest.add(new UpdateRequest(index, uuid).doc(jsonObject));
}
}
resultSize = resultSize+hits.length;
//发送请求
//实时更新
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
BulkResponse bulk = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
System.out.println(bulk.getTook()+"-------"+bulk.getItems().length);
//说明滚动完了,返回结果即可
if (resultSize > 20000) {
break;
}
//继续滚动,根据上一个游标,得到这次开始查询位置
SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId);
searchScrollRequest.scroll(scroll);
//得到结果
SearchResponse searchScrollResponse = restHighLevelClient.scroll(searchScrollRequest, RequestOptions.DEFAULT);
//定位游标
scrollId = searchScrollResponse.getScrollId();
hits = searchScrollResponse.getHits().getHits();
scrollIdList.add(scrollId);
}
System.out.println("----彻底结束了-----");
} finally {
//清理scroll,释放资源
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.setScrollIds(scrollIdList);
restHighLevelClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
优缺点:
适用场景
检索第一页的查询如下所示:
GET wkl_test/_search
{
"query": {
"match_all": {}
},
"sort": [
{
"seq": {
"order": "asc"
}
}
],
"size": 200
}
上述请求的结果包括每个文档的 sort 值数组。
这些 sort 值可以与 search_after 参数一起使用,以开始返回在这个结果列表之后的任何文档。例如,我们可以使用上一个文档的 sort 值并将其传递给 search_after 以检索下一页结果:
@Test
public void testSearchAfter() throws IOException {
RestHighLevelClient restHighLevelClient = es7UtilApi.getRestHighLevelClient();
MatchAllQueryBuilder matchAllQueryBuilder = QueryBuilders.matchAllQuery();
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(matchAllQueryBuilder);
searchSourceBuilder.from(0);
searchSourceBuilder.size(200);
searchSourceBuilder.sort("seq", SortOrder.ASC);
searchSourceBuilder.trackTotalHits(true);
SearchRequest searchRequest = new SearchRequest()
.indices("wkl_test")
.source(searchSourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
SearchHits hits = searchResponse.getHits();
long value = hits.getTotalHits().value;
System.out.println("查询到记录数=" + value);
List<JSONObject> list = new ArrayList<>();
SearchHit[] searchHists = hits.getHits();
Object[] sortValues = searchHists[searchHists.length - 1].getSortValues();
if (searchHists.length > 0) {
for (SearchHit hit : searchHists) {
String sourceAsString = hit.getSourceAsString();
JSONObject jsonObject = JSON.parseObject(sourceAsString);
jsonObject.put("_id", hit.getId());
list.add(jsonObject);
}
}
//往后的每次请求都携带上一次的sort_id进行访问。
while (ArrayUtils.isNotEmpty(searchHists) && searchHists.length > 0){
searchSourceBuilder.searchAfter(sortValues);
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponseAfter = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
hits = searchResponseAfter.getHits();
searchHists = hits.getHits();
sortValues = searchHists[searchHists.length - 1].getSortValues();
if (searchHists.length > 0) {
for (SearchHit hit : searchHists) {
String sourceAsString = hit.getSourceAsString();
JSONObject jsonObject = JSON.parseObject(sourceAsString);
jsonObject.put("_id", hit.getId());
list.add(jsonObject);
}
}
if(list.size()>20000){
break;
}
System.out.println("-----彻底结束了-------");
}
}
「优点:」
无状态查询,可以防止在查询过程中,数据的变更无法及时反映到查询中。
不需要维护scroll_id,不需要维护快照,因此可以避免消耗大量的资源。
「缺点:」
由于无状态查询,因此在查询期间的变更可能会导致跨页面的不一值。
排序顺序可能会在执行期间发生变化,具体取决于索引的更新和删除。
至少需要制定一个唯一的不重复字段来排序。
它不适用于大幅度跳页查询,或者全量导出,对第N页的跳转查询相当于对es不断重复的执行N次search after,而全量导出则是在短时间内执行大量的重复查询。
在7.*版本中,ES官方不再推荐使用Scroll方法来进行深分页,而是推荐使用带PIT的search_after来进行查询;
从7.*版本开始,您可以使用SEARCH_AFTER参数通过上一页中的一组排序值检索下一页命中。
使用SEARCH_AFTER需要多个具有相同查询和排序值的搜索请求。
如果这些请求之间发生刷新,则结果的顺序可能会更改,从而导致页面之间的结果不一致。
为防止出现这种情况,您可以创建一个时间点(PIT)来在搜索过程中保留当前索引状态。
#keep_alive必须要加上,它表示这个pit能存在多久,这里设置的是1分钟
POST wkl_test/_pit?keep_alive=1m
在每个搜索请求中添加 keep_alive 参数来延长 PIT 的保留期,相当于是重置了一下时间
GET _search
{
"query": {
"match_all": {}
},
"pit":{
"id":"t_yxAwEId2tsX3Rlc3QWU0hzbEJkYWNTVEd0ZGRoN0xsQVVNdwAWUGQtaXJpT0xTa2VUN0RGLXZfTlBvZwAAAAAACHG1fxY1UWNKX1RHOFMybXBaV20zbWx3enp3ARZTSHNsQmRhY1NUR3RkZGg3TGxBVU13AAA=",
"keep_alive":"5m"
},
"sort": [
{
"seq": {
"order": "asc"
}
}
],
"size": 200
}
DELETE _pit
{
"id":"t_yxAwEId2tsX3Rlc3QWU0hzbEJkYWNTVEd0ZGRoN0xsQVVNdwAWUGQtaXJpT0xTa2VUN0RGLXZfTlBvZwAAAAAACHG1fxY1UWNKX1RHOFMybXBaV20zbWx3enp3ARZTSHNsQmRhY1NUR3RkZGg3TGxBVU13AAA="
}
如果数据量小(from+size在10000条内),或者只关注结果集的TopN数据,可以使用from/size 分页,简单粗暴
数据量大,深度翻页,后台批处理任务(数据迁移)之类的任务,使用 scroll 方式
数据量大,深度翻页,用户实时、高并发查询需求,使用 search after 方式
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。