当前位置:   article > 正文

【java学习】ES(Elasticsearch)学习_es7.12 更新部分字段 java

es7.12 更新部分字段 java

1,概念

1)结构

Relational DB -> Databases -> Tables -> Rows -> Columns
Elasticsearch -> Indices -> Types -> Documents -> Fields

2)sql查询

es也可以使用sql进行查询,将es结构对应关系数据库进行sql的查询即可如下:

http://127.0.0.1:9200/_sql?sql=select count(*) from abc_index*
  • 1

仅用于验证es查询结果,因为sql的执行在内存中进行,非常耗时。

3)JAVA API

Elasticsearch的交互,可以使用JAVA API,也可以直接使用es提供的REST API 。

4)REST API

①功能

  • 检查集群、节点和索引的健康信息、状态以及各种统计信息
  • 管理集群、节点、索引数据以及元数据
  • 对索引进行 CRUD(创建、读取、更新和删除)和搜索操作
  • 执行高级的搜索操作, 例如分页、排序、过滤、脚本编写(scripting)、聚合(aggregations)以及其它操作

②格式

curl -X <HTTP Verb> /<Index>/<Type>/<ID>

5)Elasticsearch原理

6)配置

①es的安装和可视化

运行elasticsearch-5.1.1、kibana-5.1.1。

②java yml配置

首先添加依赖;
yml文件的配置如下:

#es的默认名称,如果安装es时没有做特殊的操作名字都是此名称
spring.data.elasticsearch.cluster-name=elasticsearch
# Elasticsearch 集群节点服务地址,用逗号分隔,如果没有指定其他就启动一个客户端节点,默认java访问端口9300
spring.data.elasticsearch.cluster-nodes=localhost:9300
# 设置连接超时时间
spring.data.elasticsearch.properties.transport.tcp.connect_timeout=120s
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

2,curl语法总结

#查看es集群状态 结果分析->(1)
	curl -X GET "localhost:9200/_cat/health?v"
#查看节点的详细信息 结果分析->(9)
	curl -X GET "localhost:9200/_cat/nodes?v"
#查看集群健康信息 结果分析->(2)
	curl localhost:9200/_cluster/health?pretty
#显示集群系统信息,包括CPU JVM、节点、分片等等
	curl -XGET localhost:9200/_cluster/stats?pretty=true
#获取集群堆积的任务
	curl -XGET localhost:9200/_cluster/pending_tasks?pretty=true
#查询所有索引,pretty:格式化
	curl -XGET 'localhost:9200/_cat/indices?v&pretty'
#查询第一个索引的未分配原因。结果分析->(3)
	curl localhost:9200/_cluster/allocation/explain
#查询指定索引索引名未分配的原因。结果分析->(4)
	curl -XGET localhost:9200/_cluster/allocation/explain?pretty -d'
	{
	  "index": "skyeye-dns-2018.12.21",
	  "shard": 1,
	  "primary": true
	}
## 创建index:customer pretty参数:表示请求响应的JSON格式化之后打印出来,方便开发者阅读。student为type
	curl -X PUT "localhost:9200/customer?pretty"
	curl -X PUT "localhost:9200/customer/student/1" -d '{"name":"jack","age":30,"info":"Ilove you"}'
## 删除index:index02
	curl -X DELETE http://192.168.168.101:9200/index02
## 查询具体索引:bank,bank2内容 结果分析->(6)
	curl -X GET "localhost:9200/bank,bank2/_search?q=*&sort=account_number:asc&pretty&ignore_unavailable=true"
## 打开/关闭index
	curl -XPOST http://192.168.168.101:9200/index01/_close
	curl -XPOST http://192.168.168.101:9200/index01/_open
## 创建/全量更新Document 不存在则创建,存在则覆盖。 结果分析->(7)  
	curl -X PUT "localhost:9200/customer/_doc/1?pretty" -H 'Content-Type: application/json' -d'
	{
	   "name": "John Doe"
	}'
## 查询Document
	curl -X GET "localhost:9200/customer/_doc/1?pretty"
## 更新Document
    POST /ecommerce/product/1/_update
    {
      "doc": {
        "name": "jiaqiangban gaolujie yagao"
      }
    }
	POST /_bulk
	{ "update": { "_index": "ecommerce", "_type": "product", "_id": "4","retry_on_conflict" : 3 }}
	{ "doc" : {"test_field" : "test update"} }
## 删除Document
	curl -X DELETE "localhost:9200/customer?pretty"
	POST /_bulk
	{ "delete": { "_index": "ecommerce", "_type": "product", "_id": "4"}} 
## 查询返回最近10条
	curl -XPOST 'localhost:9200/logstash-zhifubao-2018.09.18/_search?pretty' -d '{"query": { "match_all": {} },"from": 10,"size": 10}'

## 查询索引状态
	curl -XGET http://localhost:9200/logstash-zhifubao-2018.08.15/_status

#根据business_no查询,多个条件用逗号拼接
	curl -XGET http://localhost:9200/logstash-zhifubao-2018.08.15/_search?q=message:0ea1b2df-caa4-457c-8cc1-294f5e9284c7
#根据business_no查询,只返回特定字段
	curl -XGET http://localhost:9200/logstash-zhifubao-2018.08.15/_search?q=message:0ea1b2df-caa4-457c-8cc1-294f5e9284c7?_source=message
#分页
	curl -XGET http://localhost:9200/shb01/stu/_search?size=2&from=0
#更新部分字段
	crul –XPUT http:localhost:9200/shb01/student/1/_update?version=1
	–d ‘{“doc”:{“name”:”updatename”}
#根据id删除
	curl -XDELETE http://localhost:9200/shb01/student/AVad05EExskBS1Rg2tdq
#删除所有的索引库中名称为tom的文档
	curl -XDELETE http://localhost:9200/_all/_query?q=name:tom

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72

1)查看es集群状态

curl -X GET "localhost:9200/_cat/health?v"
  • 1

get为访问方式,localhost:9200/_cat/health?v为访问链接。
在这里插入图片描述
从这个响应中,我们可以看到集群的名称,状态,节点数,分片数等等,其中:

  1. status 状态有green、yellow和red三种:
  • green:集群运行正常;
  • yellow:副本分片丢失。集群所有数据都是可用的,集群功能也齐全,但存在某些复制没有被分配;
  • red:主分片丢失。集群的部分数据不可用,集群的功能也是不全的,但是集群还是可以运行的,它可以继续处理搜索请求,不过开发者要尽快修复它。一般这种情况下es已经基本挂掉了。
  1. 当前集群:一共有3个节点,5724个分片等信息。
    如果已经安装了Kibana,也可以通过Kibana查看这些信息。

2)REST API

    curl localhost:9200/_cluster/health?pretty
    {
	  "cluster_name" : "es",    # 集群名称
	  "status" : "green",       # 集群状态,green代表健康,yellow表示部分副分片未分配,red表示部分索引的主副分片未分配。
	  "timed_out" : false,      #
	  "number_of_nodes" : 3,    # 在线的节点数
	  "number_of_data_nodes" : 3,       # 在线的数据节点数
	  "active_primary_shards" : 549,    # 活跃的主分片数量
	  "active_shards" : 1098,           # 活跃的分片数量,包括主分片和副本
	  "relocating_shards" : 2,          # 正在移动的分片数量。
	  "initializing_shards" : 0,        # 正在初始化的分片的数量。
	  "unassigned_shards" : 0,          # 未分配的分片数量
	  "delayed_unassigned_shards" : 0,  # 由于节点离线导致延迟分配的分片数量。
	  "number_of_pending_tasks" : 0,    #
	  "number_of_in_flight_fetch" : 0,  #
	  "task_max_waiting_in_queue_millis" : 0,       #
	  "active_shards_percent_as_number" : 100.0     #所有活跃分片/打开的所有索引的分片总数。
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

3)查询第一个索引的未分配原因

Elasticsearch 5.0发布了 _cluster / allocation / explain,帮助诊断未分配分片的原因。

#查询第一个索引的未分配原因。
curl localhost:9200/_cluster/allocation/explain
  • 1
  • 2

4)查询指定索引索引名未分配的原因

#查询指定索引索引名未分配的原因。
curl -XGET localhost:9200/_cluster/allocation/explain?pretty -d'
{
  "index": "skyeye-dns-2018.12.21",
  "shard": 1,
  "primary": true
}
返回值:
assigned 分片是已分配还是未分配
shard_state_fetch_pending 是否仍在获取有关分片的信息
unassigned_info-reason 分片最初未分配的原因
allocation_delay_in_millis 分片可分配之前的已配置延迟
remaining_delay_in_millis 可以分配分片之前的剩余延迟
nodes-es.…….com.1-node_attributes 节点具有用户添加的属性
nodes-es.…….com.1-store-shard_copy 该节点的分片副本信息和错误(如果适用)
nodes-es.…….com.1-final_decision、final_explanation 分片是否可以分配给该节点的最终决定和解释
nodes-es.…….com.1-weight 分配器想要向该节点分配分片的权重
nodes-es.…….com.1-decisions 节点决策列表,这些决策将成为有关分片的最终决策
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

根据final_explanation进行问题排查:

RED-there is no copy of the shard available(主分片数据丢失)

一般是文件系统上对应分片的文件被清理了,这个是会丢数据的,但是丢的只是这一个分片的数据。
当有分片处于red状态时,会影响到这个索引的数据写入,手动分配空分片等于在通知集群可以重新给这个索引的该分片分配一个新的空分片,然后该索引就可以正常进行数据写入了。

分配一个空的主分片:

curl -XPOST 'localhost:9200/_cluster/reroute?pretty' -H 'Content-Type: application/json' -d'
{
  "commands": [
    {
      "allocate_empty_primary": {
        "index": "original_a_20200329",
        "shard": 1,
        "node": "es.a.com.1",
        "accept_data_loss": true
      }
    },
    {
      "allocate_empty_primary": {
        "index": "original_a_20200329",
        "shard": 1,
        "node": "es.a.com.0",
        "accept_data_loss": true
      }
    }
  ]
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

如果red索引比较多,那么一个个分片这样处理比较麻烦,用脚本进行批处理。

RED-the copy of the shard is stale, allocation ids do not match(主分片数据陈旧)

可以手动将该主分片分配到es.IP上:

curl -XPOST 'localhost:9200/_cluster/reroute?pretty' -H 'Content-Type: application/json' -d'
{
  "commands": [
    {
      "allocate_stale_primary": {
        "index": "a-2020.05.21",
        "shard": 1,
        "node": "es.a.3",
        "accept_data_loss": true
      }
    }
  ]
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

YELLOW-initializing_shards > 0

YELLOW-initializing_shards = 0

delayed_unassigned_shards > 0

5)检查存储使用情况

du -sh /data*/es
  • 1

如果出现磁盘写满的情况,ES集群很可能会变成yellow,red,可能是删除历史索引的程序出问题。

6)查询具体索引内容

查询bank,bank2:

curl -X GET "localhost:9200/bank,bank2/_search?q=*&sort=account_number:asc&pretty&ignore_unavailable=true"
  • 1

多索引查询:

  • 支持使用简单表示法,如test1,test2,test3表示法
  • 使用_all表示所有索引
  • 使用通配符,如 test*或 *test或 te*t或 *test*
  • 也支持排除能力,例如:test*,-test3
    常用其它参数:
参数说明默认值举例
ignore_unavailable当指定多个索引时,如果有索引不可用(不存在或者已经关闭)那么是否忽略该索引false
allow_no_indices允许通配符匹配索引truecurl -X GET "localhost:9200/bank3*/_search?q=*&pretty&allow_no_indices=false",若bank3不存在,则报错。
expand_wildcards查询索引的范围open表示查询所有匹配并open的索引,closed则表示查询所有匹配的索引
pretty响应的JSON将被格式化true
format=yaml请求以更可读的yaml格式响应
human=true以人类可读的格式来返回数据true则返回{"exists_time":"1h"},否则,返回:{"exists_time_in_millis":3600000}

索引参数相关:

URL说明
/index/_search不解释
/_aliases获取或操作索引的别名
/index/type/创建或操作类型
/index/_mapping创建或操作mapping
/index/_settings创建或操作设置(number_of_shards是不可更改的)
/index/_open打开被关闭的索引
/index/_close关闭索引
/index/_refresh刷新索引(使新加内容对搜索可见)
/index/_flush刷新索引,将变动提交到lucene索引文件中,并清空elasticsearch的transaction log
/index/_optimize优化segement,个人认为主要是对segement进行合并
/index/_status获得索引的状态信息
/index/_segments获得索引的segments的状态信息
/index/_explain不执行实际搜索,而返回解释信息
/index/_analyze不执行实际搜索,根据输入的参数进行文本分析
/index/type/id操作指定文档,不解释
/index/type/id/_create创建一个文档,如果该文件已经存在,则返回失败
/index/type/id/_update更新一个文件,如果改文件不存在,则返回失败

7)创建/全量更新Document

不存在则创建,存在则覆盖。

    curl -X PUT "localhost:9200/customer/_doc/1?pretty" -H 'Content-Type: application/json' -d'
    {
      "name": "John Doe"
    }'
  • 1
  • 2
  • 3
  • 4
  1. document是不可变的,如果要修改document的内容,可以通过全量替换,直接对document重新建立索引,替换里面所有的内容。
  2. es会将老的document标记为deleted(逻辑删除),然后新增我们给定的一个document,当我们创建越来越多的document的时候,es会在适当的时机在后台自动删除(物理删除)标记为deleted的document。
  3. 替换必须带上所有的field,否则其他数据会丢失。
  4. Elasticsearch中,并不强制要求显式的创建索引,即前面案例中,如果开发者在添加文档之前,还没有创建customer索引,那么该文档一样也会创建成功的(此时索引会被自动创建)。

未指定id时,系统会自动创建id。

POST /_bulk
{ "index": { "_index": "ecommerce", "_type":"product"}}
{ "name": "test yagao", "desc": "youxiao fangzhu"}
  • 1
  • 2
  • 3

强制创建文档create

POST /_bulk
{ "create": { "_index": "ecommerce", "_type": "product", "_id": "4" }}
{ "test_field":    "test12" }
  • 1
  • 2
  • 3

8) 删除Document

curl -X DELETE "localhost:9200/customer?pretty"
  • 1

在删除一个document之后,我们可以从侧面证明,它不是立即物理删除的,因为它的一些版本号等信息还是保留的。

POST /_bulk
{ "delete": { "_index": "ecommerce", "_type": "product", "_id": "4"}} 
  • 1
  • 2

由于删除只需要被删除文档的ID,所以并没有对应的源文档。
bulk API按顺序执行这些操作。如果其中一个操作因为某些原因失败了,它将会继续处理后面的操作。当bulk API返回时,它将提供每个操作的状态(按照同样的顺序),所以开发者能够看到每个操作成功与否。

9)Nodes参数相关

URL说明
/_nodes/process看file descriptor 这个信息
/_nodes/process/stats统计信息(内存、CPU能)
/_nodes/jvm获得各节点的虚拟机统计和配置信息
/_nodes/jvm/stats更详细的虚拟机信息
/_nodes/http获得各个节点的http信息(如ip地址)
/_nodes/http/stats获得各个节点处理http请求的统计情况
/_nodes/thread_pool获得各种类型的线程池(elasticsearch分别对不同的操作提供不同的线程池)的配置信息
/_nodes/thread_pool/stats获得各种类型的线程池的统计信息

以上这些操作和可以通过如

/_nodes/${nodeId}/jvm/stats
/_nodes/${nodeip}/jvm/stats
/_nodes/${nodeattribute}/jvm/stats
  • 1
  • 2
  • 3

的形式针对指定节点的操作。

3,java api总结

1)集群健康信息查看

ClusterHealthResponse response = client.admin().cluster()
.prepareHealth("library")
.execute().actionGet();
  • 1
  • 2
  • 3

2)TransportClient

import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
 
 
Settings settings = Settings.builder()
                .put("cluster.name", "elasticsearch").build();
TransportClient client ==  new PreBuiltTransportClient(settings). addTransportAddress(new TransportAddress(InetAddress.getByName("XXX.XXX.XX.XX"), 9300));
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

3)查询Document

java api:queryBuilder = QueryBuilders.matchAllQuery().boost(11f).normsField("title");

boost参数被用来增加一个子句的相对权重(当boost大于1时),或者减小相对权重(当boost介于0到1时),但是增加或者减小不是线性的。换言之,boost设为2并不会让最终的_score加倍。
相反,新的_score会在适用了boost后被归一化(Normalized)。每种查询都有自己的归一化算法(Normalization Algorithm)。但是能够说一个高的boost值会产生一个高的_score。
如果你在实现你自己的不基于TF/IDF的相关度分值模型并且你需要对提升过程拥有更多的控制,你可以使用function_score查询,它不通过归一化步骤对文档的boost进行操作。

4)判断index是否存在

在es查询之前,判断index是否存在,以免抛出异常。

	private Client client;
	private void init() throws Exception{
		Settings settings = Settings.settingsBuilder().put("cluster.name", "log-test")
				.build();
		client = TransportClient.builder().settings(settings)
				.addPlugin(DeleteByQueryPlugin.class)
				.build()
				.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300))
				.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
	}
	
public boolean indexExists(String index){
		IndicesExistsRequest request = new IndicesExistsRequest(index);
		IndicesExistsResponse response = client.admin().indices().exists(request).actionGet();
		if (response.isExists()) {
			return true;
		}
		return false;
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

查询操作

term query

场景:用于精确查询一个字段。

curl -XGET http://192.168.168.101:9200/index01/_search?pretty -H  'Content-Type: application/json' -d '
 {
 "query":
 {"term":
 {"title":"你好"}
 }
 }'

#通过浏览器sql检索:
http://ip:9200/indexName/_search?q=title:你好
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

term查找时直接对关键词进行查找。

java api:

SearchResponse response = client.prepareSearch("index1", "index2")
        .setTypes("type1", "type2")
        .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
        .setQuery(QueryBuilders.termQuery("multi", "test"))                 //term Query
        .setPostFilter(QueryBuilders.rangeQuery("age").from(12).to(18))     // range query、Filter query
        .setFrom(0).setSize(60).setExplain(true)
        .get();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

terms query

场景:用于精确查询多个字段。

{
    'query':{
        'terms':{
            'tag':["search",'nosql','hello']   //json中必须包含数组。
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

i> Preparing a query 准备查询请求

SearchResponse response = client.prepareSearch("library")
.addFields("title", "_source")
.execute().actionGet();
for(SearchHit hit: response.getHits().getHits()) {
     System.out.println(hit.getId());
     if (hit.getFields().containsKey("title")) {
            System.out.println("field.title: "+ hit.getFields().get("title").getValue());
     }
     System.out.println("source.title: " + hit.getSource().get("title"));
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

ii> Building queries 构造查询

import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.termQuery("sid_s", text));
Search search = new Search.Builder(searchSourceBuilder.toString()).addIndex("webpage").build();
SearchResult result = client.execute(search);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

match query

场景:用于当前文档的全文模糊查找。
match在匹配时会对所查找的关键词进行分词,然后按分词匹配查找。

GET /ecommerce/product/_search
{'query':{'match':{'title':'你好'}}}

{
   "query": {
     "match": {
       "__type": "info"
     }
   },
   "sort": [
     {
       "campaign_end_time": {
         "order": "desc"
       }
     }
   ]
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

匹配查询:

queryBuilder = QueryBuilders
.matchQuery("message", "a quick brown fox")
.operator(Operator.AND)
.zeroTermsQuery(ZeroTermsQuery.ALL);
  • 1
  • 2
  • 3
  • 4

match_all

场景:查询指定索引下的所有文档。

{'query':{'match_all':{'title':'标题一样'}}}
  • 1

multi match

场景:多值匹配查询。

{
  "query": {
    "multi_match": {
      "query": "运动 上衣",
      "fields": [
        "brandName^100",
        "brandName.brandName_pinyin^100",
        "brandName.brandName_keyword^100",
        "sortName^80",
        "sortName.sortName_pinyin^80",
        "productName^60",
        "productKeyword^20"
      ],
      "type": <multi-match-type>,
      "operator": "AND"
    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

match_phrase(短语查询)

场景:精确查找所有字段。
match(全文检索)会将输入的搜索串拆解开来,去倒排索引里面去一一匹配,只要能匹配上任意一个拆解后的单词,就可以作为结果返回。
match_phrase要求输入的搜索串,必须在指定的字段文本中,完全包含一模一样的,才可以算匹配,才能作为结果返回。

GET /ecommerce/product/_search
{
    "query" : {
        "match_phrase" : {
            "producer" : "yagao producer"
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

Bool query

场景:多条件查询。
bool查询包含四个子句,must,filter,should,must_not。

  • must:表示一定要满足,相当于and;
  • should:表示可以满足也可以不满足,相当于or;
  • must_not:表示不能满足该条件,相当于not。
  • filter: 过滤查询。Elasticsearch在2.x版本将filter query去掉,并入bool query。
{
    'query':{
        'bool':{
            'must':[{
                'term':{
                    '_type':{
                        'value':'age'
                    }
                }
             },{
                 'term':{
                     'account_grade':{
                         'value':'23'
                     }
                 }
             }
           ]
        }
    }  
}

{
	"bool":{
            "must":{
                "term":{"user":"lucy"}
            },
            "filter":{
                "term":{"tag":"teach"}	
            },
            "should":[
              	{"term":{"tag":"wow"}},
                {"term":{"tag":"elasticsearch"}}
            ],
           	"mininum_should_match":1,
           	"boost":1.0  		            
        }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

“minimum_should_match”: 1,表示最小匹配度,可以设置为百分百,详情看源文档Elasticsearch Reference [6.4] » Query DSL » Minimum Should Match,设置了这个值的时候就必须满足should里面的设置了,另外注意这边should里面同一字段设置的多个值(意思是当这个值等于X或者等于Y的时候都成立,务必注意格式)。

match来指定查询条件;
filter执行速度高于查询,原因如下:

  • 过滤器不会计算相关度的得分(即结果中的_score字段,如果不关注这个字段,使用filter更好)
  • 过滤器可以被缓存到内存中,在重复搜索时,速度会比较快。
    过滤查询java api:
QueryBuilder filterBuilder = QueryBuilders  
         .filteredQuery(  
              QueryBuilders.existsQuery("title").queryName("exist"),  
              QueryBuilders.termQuery("title", "elastic")  
          );  
  
SearchResponse response = client.prepareSearch("library")  
          .setPostFilter(filterBuilder)  
          .execute().actionGet();  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

range query

{
	'query':{
    	'range':{
            'age':{
                'gte':'30',
                'lte':'20'
            }
    	}
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

java api 见 term query例子。

java api-精确匹配:termQuery,相当于=
java api-范围匹配:rangeQuery,相当于SQL between and

分页查询

    GET /ecommerce/product/_search
    {
      "query": { "match_all": {} },
      "_source": ["name", "price"]
	  "size": 1,
  	  "from": 10
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

size表示返回的文档个数为1,默认为10。
from表示从第10个开始查询,主要用于分页查询。
_source表示自定义返回字段。默认会返回文档的所有字段。

分页查询:

SearchResponse response = client.prepareSearch("library")
.setQuery(QueryBuilders.matchAllQuery())
.setFrom(10)   //跳过前10个文档
.setSize(20)   //获取20个文档
.execute().actionGet();
response.getHits().totalHits()//可以统计当前匹配到的结果数
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

批量查询

优点:能够大大减少网络的请求次数,缩减网络开销。
自定义设置index、type以及document id:(id为1的没有查到(found为false))

GET /_mget
{
   "docs" : [
      {
         "_index" : "ecommerce",
         "_type" :  "product",
         "_id" :    1
      },
      {
         "_index" : "ecommerce",
         "_type" :  "product",
         "_id" :    2
      }
   ]
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

在对应的index、type下进行批量查询:(注意:在ElasticSearch6.0以后一个index下只能有一个type,否则会报错)

GET /ecommerce/product/_mget
{
    "ids": [2, 3]
}
或者:
GET /ecommerce/product/_mget
{
   "docs" : [
      {
         "_id" :    2
      },
      {
         "_id" :    3
      }
   ]
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

通配符查询

{
    'query':{
        'wildcard':{
            'title':'cr?me'
        }
    }   
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

正则表达式查询

{
    'query':{
        'regex':{
            'title':{
                'value':'cr.m[ae]',
                'boost':10.0
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

match_phrase_prefix query

场景:前缀查询。

{
    'query':{
        'match_phrase_prefix':{
            'title':{
                'query':'crime punish',
                'slop':1
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

query_string

{
    'query':{
        'query_string':{
            'query':'title:crime^10 +title:punishment -otitle:cat +author:(+Fyodor +dostoevsky)'
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

sort 排序

降序排序desc,升序asc。

GET /ecommerce/product/_search
{
    "query" : {
        "match" : {
            "name" : "yagao"
        }
    },
    "sort": [
        { "price": "desc" }
    ]
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

java api:

searchSourceBuilder.fetchSource(null, "content").sort("_score");
		searchSourceBuilder.sort("date", SortOrder.DESC);
SortBuilders.scriptSort(script, type) //使用脚本来实现排序
 
SortBuilders.geoDistanceSort(fieldName) //根据空间距离来进行排序
  • 1
  • 2
  • 3
  • 4
  • 5
searchRequestBuilder.addSort("publish_time", SortOrder.DESC);
  • 1

按照某个字段排序的话,hit.getScore()将会失效。

聚合查询

bucket和metric:

  • bucket(桶):group by 分组之后,相同的数据放进一个bucket。
  • metric(度量/指标):对一个数据分组执行的统计。如:avg\max\min

group by缺点:

  1. 涉及group by的查询会降低查询速率
  2. group by之后无法拿到其它信息(通过后文讲解的tophits可以拿到)
  3. group by之后无法排序

terms聚合

terms根据字段值项分组聚合。field按什么字段分组,size指定返回多少个分组,shard_size指定每个分片上返回多少个分组,order排序方式。可以指定include和exclude正则筛选表达式的值,指定missing设置缺省值。

【terms】 java api见【max/min/avg/sum/stats】中的例子。

计算每个tag下的商品数量:

GET /ecommerce/product/_search
{
  "size": 0,   //size=0,表示只获取聚合结果,而不要执行聚合的原始数据。
  "aggs": {  //aggs:固定语法,要对一份数据执行分组聚合操作
    "all_tags": {   //all_tags:自定义对每个aggs取名。
      "terms": { "field": "tags" }   //terms根据字段的值进行分组;field:根据指定的字段的值进行分组将文本
    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

返回结果:

{
  "took": 53,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "failed": 0
  },
  "hits": {
    "total": 2,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "all_tags": {
      "doc_count_error_upper_bound": 0,
      "sum_other_doc_count": 0,
      "buckets": [
        {
          "key": "fangzhu",
          "doc_count": 2
        },
        {
          "key": "meibai",
          "doc_count": 2
        }
      ]
    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

hits.hits:我们指定了size是0,所以hits.hits就是空的,否则会把执行聚合的那些原始数据给你返回回来
aggregations:聚合结果
all_tags:我们指定的某个聚合的名称
buckets:根据我们指定的field划分出的buckets
key:每个bucket对应的分组字段的值
doc_count:这个bucket分组内,有多少个数据
默认的排序规则:按照doc_count降序排序

max/min/avg/sum/stats

stats:bucket,terms,自动就会有一个doc_count,就相当于是数量。
avg:avg aggs,求平均值
max:求一个bucket内,指定field值最大的那个数据
min:求一个bucket内,指定field值最小的那个数据
sum:求一个bucket内,指定field值的总和先分组,再算每组的平均值

GET /ecommerce/product/_search
{
    "size": 0,
    "aggs" : {
        "group_by_tags" : {
            "terms" : { "field" : "tags" },
            "aggs" : {
                "avg_price": { "avg": { "field": "price" } },
                "min_price" : { "min": { "field": "price"} }, 
                "max_price" : { "max": { "field": "price"} },
                "sum_price" : { "sum": { "field": "price" } } 
            }
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
{
   "aggs":{
      "avg_fees":{
      		"avg":{
      			"field":"fees"
      		}
      	}
   }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

聚合操作主要是调用了SearchRequestBuilder的addAggregation方法,通常是传入一个TermsBuilder。
多字段上的聚合操作需要用到子聚合(subAggregation),子聚合调用TermsBuilder的subAggregation方法,可以添加的子聚合有TermsBuilder、SumBuilder、AvgBuilder、MaxBuilder、MinBuilder等常见的聚合操作。

从实现上来讲,SearchRequestBuilder在内部保持了一个私有的 SearchSourceBuilder实例, SearchSourceBuilder内部包含一个List,每次调用addAggregation时会调用 SearchSourceBuilder实例,添加一个AggregationBuilder。
同样的,TermsBuilder也在内部保持了一个List,调用addAggregation方法(来自父类addAggregation)时会添加一个AggregationBuilder。

聚合操作

例如要计算每个球队年龄最大/最小/总/平均的球员年龄,如果使用SQL语句,应表达如下:

select team, max(age) as max_age from player group by team;
  • 1

ES的java api:

TermsBuilder teamAgg= AggregationBuilders.terms("player_count ").field("team");
MaxBuilder ageAgg= AggregationBuilders.max("max_age").field("age");
sbuilder.addAggregation(teamAgg.subAggregation(ageAgg));
SearchResponse response = sbuilder.execute().actionGet();
  • 1
  • 2
  • 3
  • 4

子聚合

例如要计算每个球队球员的平均年龄,同时又要计算总年薪,如果使用SQL语句,应表达如下:

select team, avg(age)as avg_age, sum(salary) as total_salary from player group by team;
  • 1

ES的java api:

TermsBuilder teamAgg= AggregationBuilders.terms("team");
AvgBuilder ageAgg= AggregationBuilders.avg("avg_age").field("age");
SumBuilder salaryAgg= AggregationBuilders.avg("total_salary ").field("salary");
sbuilder.addAggregation(teamAgg.subAggregation(ageAgg).subAggregation(salaryAgg));
SearchResponse response = sbuilder.execute().actionGet();
  • 1
  • 2
  • 3
  • 4
  • 5

一次计算出count max min avg sum

    public void stats(){
        SearchResponse response = client.prepareSearch(indexName).setTypes(typeName)
                .addAggregation(AggregationBuilders.stats("ageAgg").field("age"))
                .get();
        Stats ageAgg = response.getAggregations().get("ageAgg");
        System.out.println("总数:"+ageAgg.getCount());
        System.out.println("最小值:"+ageAgg.getMin());
        System.out.println("最大值:"+ageAgg.getMax());
        System.out.println("平均值:"+ageAgg.getAvg());
        System.out.println("和:"+ageAgg.getSum());
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

group by多个field

例如要计算每个球队每个位置的球员数,如果使用SQL语句,应表达如下:

select team, position, count(*) as pos_count from player group by team, position;
  • 1

ES的java api:

TermsBuilder teamAgg= AggregationBuilders.terms("player_count ").field("team");
TermsBuilder posAgg= AggregationBuilders.terms("pos_count").field("position");
sbuilder.addAggregation(teamAgg.subAggregation(posAgg));
SearchResponse response = sbuilder.execute().actionGet();
  • 1
  • 2
  • 3
  • 4

group by/count

例如要计算每个球队的球员数,如果使用SQL语句,应表达如下:

select team, count(*) as player_count from player group by team;
  • 1
TermsBuilder teamAgg= AggregationBuilders.terms("player_count ").field("team");
sbuilder.addAggregation(teamAgg);
SearchResponse response = sbuilder.execute().actionGet();

CountResponse response = client.prepareCount("library")
.setQuery(QueryBuilders.termQuery("title", "elastic"))
.execute().actionGet();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

聚合后对Aggregation结果排序

例如要计算每个球队总年薪,并按照总年薪倒序排列,如果使用SQL语句,应表达如下:

select team, sum(salary) as total_salary from player group by team order by total_salary desc;
  • 1

ES的java api:

TermsBuilder teamAgg= AggregationBuilders.terms("team").order(Order.aggregation("total_salary ", false);
SumBuilder salaryAgg= AggregationBuilders.avg("total_salary ").field("salary");
sbuilder.addAggregation(teamAgg.subAggregation(salaryAgg));
SearchResponse response = sbuilder.execute().actionGet();
  • 1
  • 2
  • 3
  • 4

需要特别注意的是,排序是在TermAggregation处执行的,Order.aggregation函数的第一个参数是aggregation的名字,第二个参数是boolean型,true表示正序,false表示倒序。

Aggregation结果条数的问题

默认情况下,search执行后,仅返回10条聚合结果,如果想反悔更多的结果,需要在构建TermsBuilder 时指定size:

TermsBuilder teamAgg= AggregationBuilders.terms("team").size(15);
  • 1

Aggregation结果的解析/输出

得到response后:

Map<String, Aggregation> aggMap = response.getAggregations().asMap();
StringTerms teamAgg= (StringTerms) aggMap.get("keywordAgg");
Iterator<Bucket> teamBucketIt = teamAgg.getBuckets().iterator();
while (teamBucketIt .hasNext()) {
Bucket buck = teamBucketIt .next(); //分桶
//球队名
String team = buck.getKey();
//记录数
long count = buck.getDocCount();
//得到所有子聚合
Map subaggmap = buck.getAggregations().asMap();
//avg值获取方法
double avg_age= ((InternalAvg) subaggmap.get("avg_age")).getValue();
//sum值获取方法
double total_salary = ((InternalSum) subaggmap.get("total_salary")).getValue();
//...
//max/min以此类推
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

Top Hits Aggregation

i> 作用
Top Hits聚合主要用于桶聚合后查询分组后的其它数据。
比如对于下表,通过max(time)group by ip进行分组后,我们还想知道每一组数据hostname等其它字段内容,则需要使用Top Hits,再每个bucket中查询对应的数据,具体代码如下:

scoreiphostnametime
		 	TermsAggregationBuilder depIpGroup = AggregationBuilders.terms("group_by_ip").field("dip").size(10000);//一次最多拿到10000条数据,要拿到更多的数据参考后文scroll的相关讲解
            TopHitsAggregationBuilder detail = AggregationBuilders.topHits("detail").size(1);//用于拿到分组以外的其它详情数据。size来确定数量,默认返回3条数据。sort用于组内排序。
            MaxAggregationBuilder maxTime = AggregationBuilders.max("max_time").field("time");

            SearchRequestBuilder searchRequestBuilder = client
                    .prepareSearch(indexExistsList.toArray(new String[indexNameList.size()]))//通过变长数组查询多个index
                    .setTypes(indexType).addAggregation(depIpGroup.subAggregation(maxTime).subAggregation(detail));
            SearchResponse searchResponse = searchRequestBuilder
                    .execute()
                    .actionGet();
            Terms ipTerms = searchResponse.getAggregations().get("group_by_ip");
            for (Terms.Bucket bucket : ipTerms.getBuckets()) {//分桶
                ListEntity listEntity = new ListEntity();
                listEntity.setIpAddress(bucket.getKey().toString());

                TopHits topHits = bucket.getAggregations().get("detail");
                SearchHit hit = topHits.getHits().getHits()[0];//????没看懂原理,先这样用吧。返回的是一个id不同、其它数据相同的数组,由size决定长度。
                listEntity.setHostname(hit.getSource().get("hostname").toString());
                listEntity.setScore(Integer.parseInt(hit.getSource().get("score").toString()));
                dataList.add(listEntity);
            }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

cardinality去重

{
    "size": 0, 
    "aggs": {
      "count_type": {
        "cardinality": {
          "field": "__type"
        }
      }
    }
}
cardinality
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

percentiles百分比

percentiles对指定字段(脚本)的值按从小到大累计每个值对应的文档数的占比(占所有命中文档数的百分比),返回指定占比比例对应的值。默认返回[ 1, 5, 25, 50, 75, 95, 99 ]分位上的值。

{
    "size": 0, 
    "aggs": {
      "age_percents":{
        "percentiles": {
          "field": "age",
          "percents": [
            1,
            5,
            25,
            50,
            75,
            95,
            99
          ]
        }
      }
       
    }
}


{
  "size": 0,
  "aggs": {
    "states": {
      "terms": {
        "field": "gender"
      },
      "aggs": {
        "banlances": {
          "percentile_ranks": {
            "field": "balance",
            "values": [
              20000,
              40000
            ]
          }
        }
      }
    }
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

percentiles rank

统计小于等于指定值的文档比。

{
    "size": 0, 
    "aggs": {
      "tests": {
        "percentile_ranks": {
          "field": "age",
          "values": [
            10,
            15
          ]
        }
      }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

filter聚合

场景:对不同的bucket下的aggs,进行filter。
filter对满足过滤查询的文档进行聚合计算,在查询命中的文档中选取过滤条件的文档进行聚合,先过滤在聚合。

如果放query里面的filter,是全局的,会对所有的数据都有影响。
但是,如果,比如说,你要统计,长虹电视,最近1个月的平均值; 最近3个月的平均值; 最近6个月的平均值,用bucket filter。

{
    "size": 0, 
    "aggs": {
      "agg_filter":{
        "filter": {
          "match":{"gender":"F"}
        },
        "aggs": {
          "avgs": {
            "avg": {
              "field": "age"
            }
          }
        }
      }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

filtters聚合

多个过滤组聚合计算。

{
    "size": 0, 
    "aggs": {
      "message": {
        "filters": {
          
          "filters": {
            "errors": {
              "exists": {
                "field": "__type"
              }
            },
            "warring":{
              "term": {
                "__type": "info"
              }
            }
          }
        }
      }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

range聚合

{
    "aggs": {
      "agg_range": {
        "range": {
          "field": "cost",
          "ranges": [
            {
              "from": 50,
              "to": 70
            },
            {
              "from": 100
            }
          ]
        },
        "aggs": {
          "bmax": {
            "max": {
              "field": "cost"
            }
          }
        }
      }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

date_range聚合

{
     "aggs": {
       "date_aggrs": {
         "date_range": {
           "field": "accepted_time",
           "format": "MM-yyy", 
           "ranges": [
             {
               "from": "now-10d/d",
               "to": "now"
             }
           ]
         }
       }
     }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

date_histogram聚合(时间直方图聚合)

按天、月、年等进行聚合统计。可按 year (1y), quarter (1q), month (1M), week (1w), day (1d), hour (1h), minute (1m), second (1s) 间隔聚合或指定的时间间隔聚合。

{ 
  "aggs": {
    "sales_over_time": {
      "date_histogram": {
        "field": "accepted_time",
        "interval": "quarter",
        "min_doc_count" : 0, //可以返回没有数据的月份
        "extended_bounds" : { //强制返回数据的范围
           "min" : "2014-01-01",
           "max" : "2014-12-31"
        }
      }
    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

missing聚合

{ 
  
  "aggs": {
    "account_missing": {
      "missing": {
        "field": "__type"
      }
    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

global bucket

将所有数据纳入聚合的scope,而不管之前的query。
aggregation,scope,一个聚合操作,必须在query的搜索结果范围内执行
出来两个结果,一个结果,是基于query搜索结果来聚合的; 一个结果,是对所有数据执行聚合的。

GET /tvs/sales/_search 
{
  "size": 0, 
  "query": {
    "term": {
      "brand": {
        "value": "长虹"
      }
    }
  },
  "aggs": {
    "single_brand_avg_price": {
      "avg": {
        "field": "price"
      }
    },
    "all": {
      "global": {},
      "aggs": {
        "all_brand_avg_price": {
          "avg": {
            "field": "price"
          }
        }
      }
    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

返回结果:

    {
      "took": 4,
      "timed_out": false,
      "_shards": {
        "total": 5,
        "successful": 5,
        "failed": 0
      },
      "hits": {
        "total": 3,
        "max_score": 0,
        "hits": []
      },
      "aggregations": {
        "all": {
          "doc_count": 8,
          "all_brand_avg_price": {
            "value": 2650
          }
        },
        "single_brand_avg_price": {
          "value": 1666.6666666666667
        }
      }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

single_brand_avg_price:就是针对query搜索结果,执行的,拿到的,就是长虹品牌的平均价格
all.all_brand_avg_price:拿到所有品牌的平均价格

top_hits 按搜索结果聚合

top_hits 获取前几个doc_(即分组内前几个doc_,由size指定,默认为3个)
source 返回指定field(主要用于group by之后不能查看其它字段详情)。

    GET /ecommerce/product/_search
    {
        "size": 0,
        "aggs" : {
            "group_by_tags" : {
                "terms" : { "field" : "tags" },
                "aggs" : {
                    "top_tags": {
                      "top_hits": { 
                        "_source": {
                          "include": "name"
                        }, 
                        "size": 1
                      }
                    } 
                }
            }
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

collect_mode 子聚合计算

depth_first

直接进行子聚合的计算
计算每个tag下的商品的平均价格,并且按照平均价格降序排序:

"order": { "avg_price": "desc" }
  • 1
GET /ecommerce/product/_search
{
    "size": 0,
    "aggs" : {
        "all_tags" : {
            "terms" : { "field" : "tags", "collect_mode" : "breadth_first", "order": { "avg_price": "desc" } },
            "aggs" : {
                "avg_price" : {
                    "avg" : { "field" : "price" }
                }
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

vii> breadth_first
先计算出当前聚合的结果,针对这个结果在对子聚合进行计算。

"ranges": [{},{}]
  • 1

按照指定的价格范围区间进行分组,然后在每组内再按照tag进行分组,最后再计算每组的平均价格:

GET /ecommerce/product/_search
{
  "size": 0,
  "aggs": {
    "group_by_price": {
      "range": {
        "field": "price",
        "ranges": [
          {
            "from": 0,
            "to": 20
          },
          {
            "from": 20,
            "to": 40
          },
          {
            "from": 40,
            "to": 50
          }
        ]
      },
      "aggs": {
        "group_by_tags": {
          "terms": {
            "field": "tags"
          },
          "aggs": {
            "average_price": {
              "avg": {
                "field": "price"
              }
            }
          }
        }
      }
    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

histogram

类似于terms,也是进行bucket分组操作,接收一个field,按照这个field的值的各个范围区间,进行bucket分组操作

interval:10,划分范围,010,1020,20~30

GET /ecommerce/product/_search
{
   "size" : 0,
   "aggs":{
      "price":{
         "histogram":{ 
            "field": "price",
            "interval": 10
         },
         "aggs":{
            "revenue": {
               "sum": { 
                 "field" : "price"
               }
             }
         }
      }
   }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

date histogram

按照我们指定的某个date类型的日期field,以及日期interval,按照一定的日期间隔,去划分bucket
date interval = 1m,
2017-01-01~2017-01-31,就是一个bucket
2017-02-01~2017-02-28,就是一个bucket
然后会去扫描每个数据的date field,判断date落在哪个bucket中,就将其放入那个bucket

min_doc_count:即使某个日期interval,2017-01-01~2017-01-31中,一条数据都没有,那么这个区间也是要返回的,不然默认是会过滤掉这个区间的
extended_bounds,min,max:划分bucket的时候,会限定在这个起始日期,和截止日期内

GET /tvs/sales/_search
{
   "size" : 0,
   "aggs": {
      "sales": {
         "date_histogram": {
            "field": "sold_date",
            "interval": "month", 
            "format": "yyyy-MM-dd",
            "min_doc_count" : 0, 
            "extended_bounds" : { 
                "min" : "2016-01-01",
                "max" : "2017-12-31"
            }
         }
      }
   }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

滚动(翻页)查询

java api

滚动搜索(Scroll API)

String scrollId = "";
            QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
            sourceBuilder.query(queryBuilder).size(10000);
            SearchRequest request = Requests.searchRequest(indexName);
            request.scroll("1s");
            request.source(sourceBuilder);
            SearchResponse response = client.search(request).actionGet();
            severityCount = deelHits(severityCount, response.getHits());
            scrollId = response.getScrollId();
            while (true) {
                SearchScrollRequestBuilder searchScrollRequestBuilder = client.prepareSearchScroll(scrollId);
                searchScrollRequestBuilder.setScroll("1s");
                // 请求
                SearchResponse response1 = searchScrollRequestBuilder.get();
                SearchHits hits = response1.getHits();
                if (hits.getHits().length == 0) {
                    break;
                }else {
                    severityCount = deelHits(severityCount, hits); //hit.getSource().get("detail").toString()读取数据
                    //下一批处理
                    scrollId = response1.getScrollId();
                }
            }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
public class Scroll {

    public static void main(String[] args) {

        try{
            long startTime = System.currentTimeMillis();
            /*创建客户端*/
            //client startup
            //设置集群名称
            Settings settings = Settings.builder()
                    .put("cluster.name", "elsearch")
                    .put("client.transport.sniff", true)
                    .build();
            //创建client
            TransportClient client = new PreBuiltTransportClient(settings)
                    .addTransportAddress(new InetSocketTransportAddress(
                    InetAddress.getByName("54.223.232.95"),9300));

            List<String> result = new ArrayList<>();

            String scrollId = "";

            //第一次请求
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();


            //TODO: 设置查询条件
            RangeQueryBuilder rangequerybuilder = QueryBuilders
                .rangeQuery("inputtime")
                .from("2016-12-14 02:00:00").to("2016-12-14 07:59:59");
            sourceBuilder.query(QueryBuilders.boolQuery()
                .must(QueryBuilders
                        .matchPhraseQuery("pointid","W3.UNIT1.10HFC01CT013"))
                    .must(rangequerybuilder))
                    .size(100)//如果开启游标,则滚动获取
                    .sort("inputtime", SortOrder.ASC);
            //查询
            SearchRequest request = Requests.searchRequest("pointdata");
                request.scroll("2m");
                request.source(sourceBuilder);
            SearchResponse response = client.search(request).actionGet();
            //TODO:处理数据
            SearchHits hits = response.getHits();
            for(int i = 0; i < hits.getHits().length; i++) {
                //System.out.println(hits.getHits()[i].getSourceAsString());
                result.add(hits.getHits()[i].getSourceAsString());
            }
            //记录滚动ID
            scrollId = response.getScrollId();


            while(true){
                //后续的请求
                //scrollId = query.getScollId();
                SearchScrollRequestBuilder searchScrollRequestBuilder = client
                    .prepareSearchScroll(scrollId);            
                // 重新设定滚动时间            
                //TimeValue timeValue = new TimeValue(30000);
                searchScrollRequestBuilder.setScroll("2m");
                // 请求            
                SearchResponse response1 = searchScrollRequestBuilder.get();

                //TODO:处理数据
                SearchHits hits2 = response1.getHits();
                if(hits2.getHits().length == 0){
                    break;
                }
                for(int i = 0; i < hits2.getHits().length; i++) {
                    result.add(hits2.getHits()[i].getSourceAsString());
                }
                //下一批处理
                scrollId = response1.getScrollId();
            }

            System.out.println(result.size());
            long endTime = System.currentTimeMillis();
            System.out.println("Java程序运行时间:" + (endTime - startTime) + "ms");
        }catch(Exception e){
            e.printStackTrace();
        }

    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82

REST API

scroll原理

scroll搜索会在第一次搜索的时候,保存一个当时的视图快照,之后只会基于该旧的视图快照提供数据搜索,如果这个期间数据变更,是不会让用户看到的;
采用基于_doc(不使用_score)进行排序的方式,性能较高
每次发送scroll请求,我们还需要指定一个scroll参数,指定一个时间窗口,每次搜索请求只要在这个事件窗口内能完成就可以了

# sort默认是相关度排序("sort":[{"FIELD":{"order":"desc"}}]),不按_score排序,按_doc排序
# size设置的是这批查三条
# 第一次查询会生成快照
GET /lib3/user/_search?scroll=1m #这一批查询在一分钟内完成
{
	"query":{
		"match":{}
	},
	"sort":[  
		"_doc"
	],
	"size":3 
}

# 第二次查询通过第一次的快照ID来查询,后面以此类推
GET /_search/scroll
{
	"scroll":"1m",
	"scroll_id":"DnF1ZXJ5VGhIbkXIdGNoAwAAAAAAAAAdFkEwRENOVTdnUUJPWVZUd1p2WE5hV2cAAAAAAAAAHhZBMERDTIU3Z1FCT1|WVHdadIhOYVdnAAAAAAAAAB8WQTBEQ05VN2dRQk9ZVIR3WnZYTmFXZw=="
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

基于 scroll 解决深度分页问题

原理上是对某次查询生成一个游标 scroll_id , 后续的查询只需要根据这个游标去取数据,直到结果集中返回的 hits 字段为空,就表示遍历结束。
注意:scroll_id 的生成可以理解为建立了一个临时的历史快照,在此之后的增删改查等操作不会影响到这个快照的结果。

使用 curl 进行分页读取过程如下:

  1. 先获取第一个 scroll_id,url 参数包括 /index/_type/ 和 scroll,scroll 字段指定了scroll_id 的有效生存期,以分钟为单位,过期之后会被es 自动清理。如果文档不需要特定排序,可以指定按照文档创建的时间返回会使迭代更高效。
GET /product/info/_search?scroll=2m
{
	"query":{
		"match_all":{
		}
	},
	"sort":["_doc"]
}

# 返回结果
{
  "_scroll_id": "DnF1ZXJ5VGhIbkXIdGNoAwAAAAAAAAAdFkEwRENOVTdnUUJPWVZUd1p2WE5hV2cAAAAAAAAAHhZBMERDTIU3Z1FCT1|WVHdadIhOYVdnAAAAAAAAAB8WQTBEQ05VN2dRQk9ZVIR3WnZYTmFXZw==",
  "took": 1,
  "timed_out": false,
  "_shards": {
  		"total": 1,
  		"successful": 1,
  		"failed": 0
  },
  "hits":{...}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  1. 后续的文档读取上一次查询返回的scroll_id 来不断的取下一页,如果srcoll_id 的生存期很长,那么每次返回的 scroll_id 都是一样的,直到该 scroll_id 过期,才会返回一个新的 scroll_id。请求指定的 scroll_id 时就不需要 /index/_type 等信息了。每读取一页都会重新设置 scroll_id 的生存时间,所以这个时间只需要满足读取当前页就可以,不需要满足读取所有的数据的时间,1 分钟足以。
GET /product/info/_search?scroll=DnF1ZXJ5VGhIbkXIdGNoAwAAAAAAAAAdFkEwRENOVTdnUUJPWVZUd1p2WE5hV2cAAAAAAAAAHhZBMERDTIU3Z1FCT1|WVHdadIhOYVdnAAAAAAAAAB8WQTBEQ05VN2dRQk9ZVIR3WnZYTmFXZw==
{
	"query":{
		"match_all":{
		}
	},
	"sort":["_doc"]
}

# 返回结果
{
    "_scroll_id": "DnF1ZXJ5VGhIbkXIdGNoAwAAAAAAAAAdFkEwRENOVTdnUUJPWVZUd1p2WE5hV2cAAAAAAAAAHhZBMERDTIU3Z1FCT1|WVHdadIhOYVdnAAAAAAAAAB8WQTBEQ05VN2dRQk9ZVIR3WnZYTmFXZw==",
    "took": 106,
    "_shards": {
        "total": 1,
        "successful": 1,
        "failed": 0
    },
    "hits": {
        "total": 22424,
        "max_score": 1.0,
        "hits": [{
                "_index": "product",
                "_type": "info",
                "_id": "did-519392_pdid-2010",
                "_score": 1.0,
                "_routing": "519392",
                "_source": {
                    ....
                }
            }
        ]
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  1. 所有文档获取完毕之后,需要手动清理掉 scroll_id 。虽然es 会有自动清理机制,但是 srcoll_id 的存在会耗费大量的资源来保存一份当前查询结果集映像,并且会占用文件描述符。所以用完之后要及时清理。使用 es 提供的 CLEAR_API 来删除指定的 scroll_id。
# 删掉指定的多个 srcoll_id 
DELETE /_search/scroll -d 
{
	"scroll_id":[
		"cXVlcnlBbmRGZXRjaDsxOzg3OTA4NDpTQzRmWWkwQ1Q1bUlwMjc0WmdIX2ZnOzA7"
	]
}

# 删除掉所有索引上的 scroll_id 
DELETE /_search/scroll/_all

# 查询当前所有的scroll 状态
GET /_nodes/stats/indices/_search?pretty

# 返回结果
{
  "cluster_name" : "200.200.107.232",
  "nodes" : {
    "SC4fYi0CT5mIp274ZgH_fg" : {
      "timestamp" : 1514346295736,
      "name" : "200.200.107.232",
      "transport_address" : "200.200.107.232:9300",
      "host" : "200.200.107.232",
      "ip" : [ "200.200.107.232:9300", "NONE" ],
      "indices" : {
        "search" : {
          "open_contexts" : 0,
          "query_total" : 975758,
          "query_time_in_millis" : 329850,
          "query_current" : 0,
          "fetch_total" : 217069,
          "fetch_time_in_millis" : 84699,
          "fetch_current" : 0,
          "scroll_total" : 5348,
          "scroll_time_in_millis" : 92712468,
          "scroll_current" : 0
        }
      }
    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

基于 search_after 实现深度分页

search_after 是 ES5.0 及之后版本提供的新特性,search_after 有点类似 scroll,但是和 scroll 又不一样,它提供一个活动的游标,通过上一次查询最后一条数据来进行下一次查询。
search_after 分页的方式和 scroll 有一些显著的区别,首先它是根据上一页的最后一条数据来确定下一页的位置,同时在分页请求的过程中,如果有索引数据的增删改查,这些变更也会实时的反映到游标上。

  1. 第一页的请求和正常的请求一样。
GET /order/info/_search
{
    "size": 10,
    "query": {
        "match_all" : {
        }
    },
    "sort": [
        {"date": "asc"}
    ]
}

# 返回结果
{
    "_index": "zmrecall",
    "_type": "recall",
    "_id": "60310505115909",
    "_score": null,
    "_source": {
      ...
      "date": 1545037514
    },
    "sort": [
    	1545037514
    ]
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  1. 第二页的请求,使用第一页返回结果的最后一个数据的值,加上 search_after 字段来取下一页。注意:使用 search_after 的时候要将 from 置为 0 或 -1。
curl -XGET 127.0.0.1:9200/order/info/_search
{
    "size": 10,
    "query": {
        "match_all" : {
        }
    },
    "search_after": [1463538857], # 这个值与上次查询最后一条数据的sort值一致,支持多个
    "sort": [
        {"date": "asc"}
    ]
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
注意:
如果 search_after 中的关键字为654,那么654323的文档也会被搜索到,所以在选择 search_after 的排序字段时需要谨慎,可以使用比如文档的id或者时间戳等。
search_after 适用于深度分页+ 排序,因为每一页的数据依赖于上一页最后一条数据,所以无法跳页请求。
返回的始终是最新的数据,在分页过程中数据的位置可能会有变更。这种分页方式更加符合 moa 的业务场景。
  • 1
  • 2
  • 3
  • 4

es返回结果

获取所有数据:

GET /_search
  • 1

接口访问链接:127.0.0.1:9200/_search
返回数据含义:

返回参数说明备注
found表示查询的数据是否存在
took耗费时间(毫秒)。
timed_out是否超时默认无timeout
_source表示查询到的数据
_shardsshards fail的条件(primary和replica全部挂掉),不影响其他shard默认情况下来说,一个搜索请求,会打到一个index的所有primary shard上去,当然了,每个primary shard都可能会有一个或多个replic shard,所以请求也可以到primary shard的其中一个replica shard上去
_shards.total表示应执行索引操作的分片(主分片和副本分片)的数量
_shards.successful表示索引操作成功的分片数
_shards.failed返回一个数组,这个数组是在副本分片上索引操作失败的情况下相关错误的数组如果没有失败的分片,failed将会为0。
hits.total本次搜索返回了几条结果
hits.max_scorescore的含义,就是document对于一个search的相关度的匹配分数,越相关,就越匹配,分数也高
hits.hits包含了匹配搜索的document的详细数据,默认查询前10条数据,按_score降序排序在java api中,可以通过client.setSize设置返回数量。
hits.hits._index索引名,对应sql的库
hits.hits._type类型,对应sql的表
hits.hits._id搜索的id
hits.hits._score描述搜索结果的匹配度,得分越高,文档匹配度越高,得分越低,文档的匹配度越低。
hits.hits._source搜索到的具体数据
hits.hits._source.fields搜索到的具体字段在java api中通过hit.getSource().get(“field_name”)获取

可以通过设置timeout这个值,来定时返回已经搜索到的数据。timeout机制,指定每个shard,就只能在timeout时间范围内,将搜索到的部分数据(也可能是搜索到的全部数据),直接返回给client,而不是等到所有数据全部搜索出来后再返回。可以通过如下方式进行设置:

    timeout=10ms,timeout=1s,timeout=1m
    GET /_search?timeout=10m
  • 1
  • 2

基于bulk的增删改

Elasticsearch也提供了相关操作的批处理功能,这些批处理功能通过使用_bulk API实现。通过批处理可以非常高效的完成多个文档的操作,同时可以减少不必要的网络请求。
bulk语法:

  • delete:删除一个文档,只要1个json串就可以了
  • create:PUT /index/type/id/_create,强制创建
  • index:普通的put操作,可以是创建文档,也可以是全量替换文档
  • update:执行的partial update操作

注意点:

  • bulk api对json的语法有严格的要求,除了delete外,每一个操作都要两个json串,且每个json串内不能换行,非同一个json串必须换行,否则会报错;
  • bulk操作中,任意一个操作失败,是不会影响其他的操作的,但是在返回结果里,会告诉你异常日志;

bulk api奇特的json格式
目前处理流程:

  • 直接按照换行符切割json,不用将其转换为json对象,不会出现内存中的相同数据的拷贝;
  • 对每两个一组的json,读取meta,进行document路由;
  • 直接将对应的json发送到node上去;

换成良好json格式的处理流程:

  1. 将json数组解析为JSONArray对象,这个时候,整个数据,就会在内存中出现一份一模一样的拷贝,一份数据是json文本,一份数据是JSONArray对象;
  2. 解析json数组里的每个json,对每个请求中的document进行路由;
  3. 为路由到同一个shard上的多个请求,创建一个请求数组;
  4. 将这个请求数组序列化;
  5. 将序列化后的请求数组发送到对应的节点上去;

数据录入

可以将json数据(可以在http://www.json-generator.com/网站上自动生成)放到当前用户目录下,然后执行如下命令,将数据导入到Elasticsearch中,如下:

    curl -H "Content-Type: application/json" -XPOST "localhost:9200/bank/_doc/_bulk?pretty&refresh" --data-binary "@accounts.json"
  • 1

参考文献

ElasticSearch教程——汇总篇
elasticsearch文档Delete By Query API(二)
Elasticsearch-基础介绍及索引原理分析
ElasticSearch教程——Java常用操作
ElasticSearch AggregationBuilders java api常用聚合查询

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/68341
推荐阅读
相关标签
  

闽ICP备14008679号