赞
踩
摄取节点(Ingest Node)是Elasticsearch集群中专门负责数据预处理的节点类型。摄取节点利用管道(Pipeline)这一概念,对进入集群的文档执行一系列预定义的处理步骤,如数据清洗、转换、丰富等,然后再将处理后的文档转发到适当的索引进行存储。以下是对Elasticsearch摄取节点中管道(Pipeline)的详细介绍:
管道由一个JSON文档定义,它描述了文档在被索引前需要经历的一系列处理器(Processor)。一个简单的管道示例:
PUT _ingest/pipeline/my_pipeline { "description": "A pipeline to clean and enrich documents", "processors": [ { "trim": { "field": "message" } }, { "remove": { "field": "unused_field" } }, { "date": { "field": "event_timestamp", "target_field": "@timestamp", "formats": ["ISO8601"] } } ] }
在这个示例中,my_pipeline
管道包含三个处理器:trim
(去除字符串首尾空格)、remove
(删除某个字段)和 date
(解析和格式化日期)。
管道中的处理器按照定义顺序依次执行。每个处理器都对文档进行特定的变换操作,输出作为下一个处理器的输入,直到所有处理器完成处理。
在索引文档时指定管道名称,让摄取节点使用该管道处理文档:
PUT my_index/_doc/1?pipeline=my_pipeline
{
"message": " Hello, World! ",
"unused_field": "Remove me",
"event_timestamp": "2024-0½-01T12:00:00Z"
}
设置索引的 index.default_pipeline
属性,让所有进入该索引的文档自动使用指定的管道:
PUT my_index
{
"settings": {
"index.default_pipeline": "my_pipeline"
}
}
在批量索引(_bulk
API)中指定管道:
POST _bulk?pipeline=my_pipeline
{"index":{"_index":"my_index","_id":"1"}}
{"message":" Hello, World! ","unused_field":"Remove me","event_timestamp":"2024-05-01T12:00:00Z"}
Elasticsearch提供了丰富的处理器类型,涵盖数据清洗、转换、地理处理、文本分析、条件逻辑等多种场景。一些常用的处理器包括:
使用 simulate
API 模拟文档通过管道的处理过程,查看处理前后文档的变化:
POST _ingest/pipeline/_simulate { "pipeline": { "processors": [ ... ] }, "docs": [ { "_source": { "message": " Hello, World! ", ... } } ] }
查询管道统计信息,了解管道处理效率、失败情况等:
GET _ingest/pipeline/my_pipeline/_stats
在实际操作中,请始终参考Elasticsearch官方文档以获取最新的处理器类型、配置选项和最佳实践。摄取管道是提升Elasticsearch数据质量和处理效率的重要工具,应充分利用其功能进行数据预处理。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。