赞
踩
之前在介绍ES集群的时候,曾经介绍过集群中存在一个摄取节点,在数据保存到文档索引之前,我们可能需要对文档进行预处理,而摄取节点会拦截这些请求,根据需要将文档中的内容进行处理然后传递会索引或者API中。
默认配置下所有节点都启用了ingest。因此任何一个ES节点都可以处理ingest任务。就像之前ES集群中描述的,我们可以创建一个专门处理相关业务的ingest节点。
控制节点的ingest开关在其elasticsearch.yml
中的参数
## 关闭ingest
node.ingest: false
摄取节点节点对数据的处理主要是通过管道(pipeline),在索引和请求中指定管道参数,这样ingest节点在拦截请求后就指定使用哪条管道进行处理。
我们可以使用_ingest/pipeline
API来创建管道,我们现在创建一个对des
设置默认值的管道
PUT _ingest/pipeline/test_pipeline
{
"description" : "这是测试的管道内容",
"processors" : [
{
"set" : {
"field": "des",
"value": "这是管道默认数据"
}
}
]
}
这里需要注意,根据官方文档的介绍,此版本号没有实际作用,仅仅方便使用者根据其业务环境进行外部的版本管理
PUT _ingest/pipeline/test_pipeline
{
"description" : "这是测试的管道内容",
"version": 100,
"processors" : [
{
"set" : {
"field": "des",
"value": "这是管道默认数据"
}
}
]
}
查询索引中的管道
通过GET
请求我们可以查询出,当前管道的配置信息
GET _ingest/pipeline/test_pipeline
返回内容
{
"test_pipeline" : {
"description" : "这是测试的管道内容",
"version" : 100,
"processors" : [
{
"set" : {
"field" : "des",
"value" : "这是管道默认数据"
}
}
]
}
}
查询管道版本
GET _ingest/pipeline/test_pipeline?filter_path=*.version
返回数据
{
"test_pipeline" : {
"version" : 100
}
}
删除管道
删除管道类似删除文档之前需要使用DELETE
请求调用其插入的API就可以完成
DELETE _ingest/pipeline/test_pipeline
删除所有管道
和文档操作类似,你可以使用*
符号匹配所有的管道进行清除。
DELETE _ingest/pipeline/*
现在我们存在一个这样的索引
"test_field2": { "aliases": {}, "mappings": { "properties": { "channel": { "type": "nested" }, "des": { "type": "text" }, "name": { "type": "keyword" } } }, "settings": { "index": { "creation_date": "1574774756443", "number_of_shards": "1", "number_of_replicas": "1", "uuid": "RCF17AZOR1GPs84LKw88lA", "version": { "created": "7020099" }, "provided_name": "test_field2" } } }
现在我们尝试向索引中插入数据。
使用已经存在管道
PUT test_field2/_doc/1?pipeline=test_pipeline
{
"name": "内容1",
"desc": "描述1",
"channel": [
{
"name": "one",
"num": 33
},
{
"name": "two",
"num": 44
}
]
}
然后查询ID为1的数据会得到下面内容
{ "_index" : "test_field2", "_type" : "_doc", "_id" : "1", "_version" : 1, "_seq_no" : 0, "_primary_term" : 1, "found" : true, "_source" : { "des" : "这是管道默认数据", "name" : "内容1", "channel" : [ { "num" : 33, "name" : "one" }, { "num" : 44, "name" : "two" } ], "desc" : "描述1" } }
会发现其des内容并非我们插入的数据,而是管道设置的参数。
从上面的内容可以发现,管道其实很像是我们平时使用的拦截器操作,会拦截一些操作然后对其进行修改。
请求数据的时候定义管道
除了上面的使用方式,我们也可以在请求数据的时候使用管道
PUT test_field2/_doc/5 { "pipeline": { "description": "这是自定义的管道内容", "processors": [ { "set": { "field": "des", "value": "这是管道默认数据" } } ] }, "docs": [ { "_source": { "name": "内容1", "desc": "描述1", "channel": [ { "name": "one", "num": 33 }, { "name": "two", "num": 44 } ] } } ] }
此时我们尝试查询数据会发现数据使用的是自定义的管道参数
{ "_index" : "test_field2", "_type" : "_doc", "_id" : "5", "_version" : 1, "_seq_no" : 2, "_primary_term" : 1, "found" : true, "_source" : { "pipeline" : { "description" : "这是自定义的管道内容", "processors" : [ { "set" : { "field" : "des", "value" : "这是管道默认数据" } } ] }, "docs" : [ { "_source" : { "name" : "内容1", "desc" : "描述1", "channel" : [ { "name" : "one", "num" : 33 }, { "name" : "two", "num" : 44 } ] } } ] } }
当然官方也提供了_simulate
方法让你模拟数据在经过了管道的处理后所得到的结果
POST _ingest/pipeline/_simulate { "pipeline" : { "description": "_description", "processors": [ { "set" : { "field" : "field2", "value" : "_value" } } ] }, "docs": [ { "_index": "index", "_id": "id", "_source": { "foo": "bar" } }, { "_index": "index", "_id": "id", "_source": { "foo": "rab" } } ] }
返回结果
{ "docs": [ { "doc": { "_id": "id", "_index": "index", "_type": "_doc", "_source": { "field2": "_value", "foo": "bar" }, "_ingest": { "timestamp": "2017-05-04T22:30:03.187Z" } } }, { "doc": { "_id": "id", "_index": "index", "_type": "_doc", "_source": { "field2": "_value", "foo": "rab" }, "_ingest": { "timestamp": "2017-05-04T22:30:03.188Z" } } } ] }
查看管道对文件的影响
POST _ingest/pipeline/_simulate?verbose { "pipeline" : { "description": "_description", "processors": [ { "set" : { "field" : "field2", "value" : "_value2" } }, { "set" : { "field" : "field3", "value" : "_value3" } } ] }, "docs": [ { "_index": "index", "_id": "id", "_source": { "foo": "bar" } }, { "_index": "index", "_id": "id", "_source": { "foo": "rab" } } ] }
返回内容
{ "docs": [ { "processor_results": [ { "doc": { "_id": "id", "_index": "index", "_type": "_doc", "_source": { "field2": "_value2", "foo": "bar" }, "_ingest": { "timestamp": "2017-05-04T22:46:09.674Z" } } }, { "doc": { "_id": "id", "_index": "index", "_type": "_doc", "_source": { "field3": "_value3", "field2": "_value2", "foo": "bar" }, "_ingest": { "timestamp": "2017-05-04T22:46:09.675Z" } } } ] }, { "processor_results": [ { "doc": { "_id": "id", "_index": "index", "_type": "_doc", "_source": { "field2": "_value2", "foo": "rab" }, "_ingest": { "timestamp": "2017-05-04T22:46:09.676Z" } } }, { "doc": { "_id": "id", "_index": "index", "_type": "_doc", "_source": { "field3": "_value3", "field2": "_value2", "foo": "rab" }, "_ingest": { "timestamp": "2017-05-04T22:46:09.677Z" } } } ] } ] }
ps. 本文中所涉及的API以及文档内容基于7.2版本。
个人水平有限,上面的内容可能存在没有描述清楚或者错误的地方,假如开发同学发现了,请及时告知,我会第一时间修改相关内容。假如我的这篇内容对你有任何帮助的话,麻烦给我点一个赞。你的点赞就是我前进的动力。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。