当前位置:   article > 正文

elasticsearch之Pipeline&Processor(管道处理数据)_elasticsearch processors

elasticsearch processors

###Pipeline & Processor
###● Pipeline - 管道会对通过的数据(⽂档),按照顺序进⾏加⼯
###● Processor - Elasticsearch 对⼀些加⼯的⾏为进⾏了抽象包装
###● Elasticsearch 有很多内置的Processors。也⽀持通过插件的⽅式,实现⾃⼰的 Processor

pipeline 就是一组processors

我们隔离通过_ingest/pipeline/_simulate API 模拟管道行为,进行测试
description: 描述我们的 ingest pipeline 是用来做什么的
processors: 我们的pipeline处理流水线,可以是多个处理方式的集合
docs:模拟数据
将我们的数据中的tags字段转为数组
POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "to split blog tags",
    "processors": [
      {
        "split": {
          "field": "tags",
          "separator": ","
        }
      }
    ]
  },
  "docs": [
    {
      "_index": "index",
      "_id": "id",
      "_source": {
        "title": "Introducing big data......",
        "tags": "hadoop,elasticsearch,spark",
        "content": "You konw, for big data"
      }
    },
    {
      "_index": "index",
      "_id": "idxx",
      "_source": {
        "title": "Introducing cloud computering",
        "tags": "openstack,k8s",
        "content": "You konw, for cloud"
      }
    }
  ]
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
将我们的tags字段转为数组,并且给每条数据新增一个views字段
POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "to split blog tags",
    "processors": [
      {
        "split": {
          "field": "tags",
          "separator": ","
        }
      },
      {
        "set": {
          "field": "views",
          "value": "0"
        }
      }
    ]
  },
  "docs": [
    {
      "_index": "index",
      "_id": "id",
      "_source": {
        "title": "Introducing big data......",
        "tags": "hadoop,elasticsearch,spark",
        "content": "You konw, for big data"
      }
    },
    {
      "_index": "index",
      "_id": "idxx",
      "_source": {
        "title": "Introducing cloud computering",
        "tags": "openstack,k8s",
        "content": "You konw, for cloud"
      }
    }
  ]
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

Pipeline API

Action Sample

添加或者更新

PUT _ingest/pipeline/my-pipeline-id
{
  "description": "describe pipeline",
  "processors": [
    {
      "set": {
        "field": "foo",
        "value": "bar"
      }
    }
  ]
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

获取

GET _ingest/pipeline/my-pipeline-id
  • 1

删除

DELETE _ingest/pipeline/my-pipeline-id
  • 1

添加pipeline 并测试

PUT _ingest/pipeline/blog_pepeline
{
  "description": "a blog pipeline",
  "processors": [
    {
      "split": {
        "field": "tags",
        "separator": ","
      }
    },
    {
      "set": {
        "field": "views",
        "value": 0
      }
    }
  ]
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

查看pipeline

GET _ingest/pipeline/blog_pepeline
  • 1
测试指定的pipeline
POST _ingest/pipeline/blog_pepeline/_simulate
{
  "docs": [
    {
      "_source": {
        "title": "Introducing cloud computering",
        "tags": "openstack,k8s",
        "content": "You konw, for cloud"
      }
    }
  ]
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

不使用pipeline更新数据

PUT tech_blogs/_doc/1
{
  "title": "Introducing big data......",
  "tags": "hadoop,elasticsearch,spark",
  "content": "You konw, for big data"
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

使用pipeline 更新数据

POST tech_blogs/_doc?pipeline=blog_pepeline
{
  "title": "Introducing cloud computering",
  "tags": "openstack,k8s",
  "content": "You konw, for cloud"
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

查看数据 一条是pipeline 处理过的,一条没有

GET tech_blogs/_search
GET tech_blogs/_mapping
  • 1
  • 2

因为我们把tags的数据改为了数组,所以我们直接进行update by query 是会报错的,

POST tech_blogs/_update_by_query?pipeline=blog_pepeline
  • 1

我们可以增加 update by query的条件,进行限定,当数据是字符串的时候,再对他进行修改,已经是数组的tags的数据,不做操作,也说明update by query 支持限定设置

我们指定没有views字段的树进行blog_pepeline处理
POST tech_blogs/_update_by_query?pipeline=blog_pepeline
{
  "query": {
    "bool": {
      "must_not": {
        "exists": {
          "field": "views"
        }
      }
    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

这是我们所有的数据都已经变成了pipeline处理过的数据

GET tech_blogs/_search
GET tech_blogs
  • 1
  • 2
⼀些内置 Processors
● https://www.elastic.co/guide/en/elasticsearch/reference/7.1/ingest-processors.html
● Split Processor (例:将给定字段值分成⼀个数组)
● Remove / Rename Processor (例:移除⼀个重命名字段)
● Append (例:为商品增加⼀个新的标签)
● Convert(例:将商品价格,从字符串转换成 float 类型)
● Date / JSON(例:⽇期格式转换,字符串转 JSON 对象)
● Date Index Name Processor (例:将通过该处理器的⽂档,分配到指定时间格式的索引中)
内置 Processors (续)
● https://www.elastic.co/guide/en/elasticsearch/reference/7.1/ingest-processors.html
● Fail Processor (⼀旦出现异常,该 Pipeline 指定的错误信息能返回给⽤户)
● Foreach Process(数组字段,数组的每个元素都会使⽤到⼀个相同的处理器)
● Grok Processor(⽇志的⽇期格式切割)
● Gsub / Join / Split(字符串替换 / 数组转字符串/ 字符串转数组)
● Lowercase / Upcase(⼤⼩写转换)

Ingest Node v.s Logstash 对比

Logstash Ingest Node
数据输⼊与输出: ⽀持从不同的数据源读取,并写⼊不同的数据源 ⽀持从 ES REST API 获取数据,并且写⼊ Elasticsearch
数据缓冲: 实现了简单的数据队列,⽀持重写 不⽀持缓冲
数据处理: ⽀持⼤量的插件,也⽀持定制开发 内置的插件,可以开发 Plugin 进⾏扩展(Plugin 更新需要重启)
配置和使⽤: 增加了⼀定的架构复杂度 ⽆需额外部署
https://www.elastic.co/cn/blog/should-i-use-logstash-or-elasticsearch-ingest-nodes
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/空白诗007/article/detail/771598
推荐阅读
相关标签
  

闽ICP备14008679号