赞
踩
写入es前,数据格式如下
{"json_lbm_01":"test01","json_lbm_02":"test02","tmp_lbm_01":"test03","tmp_lbm_02":"test04"}
需求:单纯用pipeline可不可以实现,如果写入key包含json_提换为空,包含tmp提换为core,因为key字段有很多不考虑穷举,最终效果要如下:
- {
- "lbm_01":"test01",
- "lbm_02":"test02",
- "core_lbm_01":"test03",
- "core_lbm_02":"test04"
- }
——问题来源:死磕Elasticsearch知识星球 https://t.zsxq.com/0bzWL3w1X
Elasticsearch mapping 一旦创建是不允许修改的!允许更新 mapping 的地方是几个特殊的点,可以参见:Elasticsearch 可以更改 Mapping 吗?如何修改?
除此之外的 mapping 层面尤其字段层面想要修改需要转换思路。
看开篇问题,本质上属于建模问题,应该建模阶段处理好,后期就不存在这个问题,这点期望大家可以达成共识。
关于Elasticsearch 数据建模的重要性,推荐参考:
针对开篇问题,考虑如下的解决方案:
字段别名 field-alias 区别于索引别名 alias。
索引别名大家都比较熟悉,字段别名听到的多,但是实际用的不见得有那么多。
字段别名是 Elasticsearch 6.4 版本新上的功能,具体参见:
https://www.elastic.co/cn/blog/introducing-field-aliases-in-elasticsearch
因为一般前期建模做足功课,后面环节就不需要了。
优点:已有mapping保持不动,只是在其基础上做了更新操作。
缺点:批量1000个字 段,需要构造1000个字段的mapping,其实可以 脚本实现。
实操参考:
- POST mytxindex-20230303/_bulk
- {"index":{"_id":1}}
- {"json_lbm_01":"test01","json_lbm_02":"test02","tmp_lbm_01":"test03","tmp_lbm_02":"test04"}
-
-
- PUT mytxindex-20230303/_mapping
- {
- "properties": {
- "lbm_01": {
- "type": "alias",
- "path": "json_lbm_01"
- },
- "lbm_02": {
- "type": "alias",
- "path": "json_lbm_02"
- },
- "core_lbm_01": {
- "type": "alias",
- "path": "tmp_lbm_01"
- },
- "core_lbm_02": {
- "type": "alias",
- "path": "tmp_lbm_02"
- }
- }
- }
-
- POST mytxindex-20230303/_search
- {
- "query": {
- "match": {
- "lbm_01": "test01"
- }
- }
- }
召回结果数据如下截图所示:
核心点介绍如下:
优先推荐使用模板 template,解决了字段名称相似的模板化匹配问题。
预处理管道实现分两块:
其一,script 实现了新旧字段的赋值;
其二,remove 移除了不必要的老字段。
优点:这种操作比较常见,中规中矩。
缺点:需要 reindex 迁移索引数据,且在迁移的时候做预处理操作。
具体实现如下:
- #### step1:创建模板,有了模板,字段一键搞定
- PUT _index_template/mytx_template_20230303
- {
- "index_patterns": [
- "mytx_new_*"
- ],
- "template": {
- "settings": {
- "number_of_shards": 1
- },
- "mappings": {
- "dynamic_templates": [
- {
- "lbm_to_keyword": {
- "match_mapping_type": "string",
- "match": "lbm_*",
- "mapping": {
- "type": "keyword"
- }
- }
- },
- {
- "core_lbm_to_keyword": {
- "match_mapping_type": "string",
- "match": "core_lbm_*",
- "mapping": {
- "type": "keyword"
- }
- }
- }
- ]
- }
- }
- }
-
- #### step2:创建预处理管道
- PUT _ingest/pipeline/mytx_pipeline_20230303
- {
- "processors": [
- {
- "script": {
- "source": """
- ctx.lbm_01 = ctx.json_lbm_01;
- ctx.lbm_02 = ctx.json_lbm_02;
- ctx.core_lbm_01 = ctx.tmp_lbm_01;
- ctx.core_lbm_02 = ctx.tmp_lbm_02;
- """,
- "lang": "painless"
- }
- },
- {
- "remove": {
- "field": [
- "json_lbm_01",
- "json_lbm_02",
- "tmp_lbm_01",
- "tmp_lbm_02"
- ],
- "if": "ctx.json_lbm_01 != null && ctx.json_lbm_02 != null && ctx.tmp_lbm_01 != null && ctx.tmp_lbm_02 != null"
- }
- }
- ]
- }
-
- #### step3:数据迁移操作
- POST _reindex
- {
- "source": {
- "index": "mytxindex-20230303"
- },
- "dest": {
- "index": "mytx_new_001",
- "pipeline": "mytx_pipeline_20230303"
- }
- }
-
- #### step4:执行检索
- POST mytx_new_001/_search
召回结果如下图所示:
方案一、方案二都解决不了 N 个字段的问题。
假设有多个字段,不想一个字段一个字段的复制处理,也不想借助第三方脚本如shell 或者 python 处理。
那有没有更好的方案呢?方案三基于字段遍历实现,字段无非是 key:value 组合。
先通过:entry.getKey( )获取 key,然后基于 key 做逻辑判定,构造新的key,然后将旧value 复制给新 key。
最后,通过 putAll 更新。
- PUT _ingest/pipeline/rename_fields_pipeline
- {
- "processors": [
- {
- "script": {
- "source": """
- def new_fields = [:];
- for (entry in ctx.entrySet()) {
- String key = entry.getKey();
- if (key.startsWith('json_')) {
- key = key.replace('json_', '');
- } else if (key.startsWith('tmp_')) {
- key = 'core_' + key.replace('tmp_', '');
- }
- new_fields[key] = entry.getValue();
- }
- ctx.clear();
- ctx.putAll(new_fields);
- """
- }
- }
- ]
- }
-
-
- POST _reindex
- {
- "source": {
- "index": "mytxindex-20230303"
- },
- "dest": {
- "index": "mytx_new_002",
- "pipeline": "rename_fields_pipeline"
- }
- }
最终执行结果如下,和预期一致。
类似问题即便给出了3种不同的实现方案,都能达到给定的业务需求。
但,仍然不建议业务中后期这么处理。更优的解决方案,推荐借助 Elasticsearch 建模阶段做好规划,避免中后期的类似上述问题的涉及大量数据迁移的大的改动。
更多实践想法,欢迎大家一起交流!!!
更短时间更快习得更多干货!
和全球近 1900+ Elastic 爱好者一起精进!
比同事抢先一步学习进阶干货!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。