当前位置:   article > 正文

pipeline 和 painless对文档数据进行预处理_es painless pipeline

es painless pipeline

在es中每个节点默认都是ingest节点,可以对即将索引的文档数据进行预处理,比如修改文档字段的默认值、添加新的字段、字符串转数组等,es可以通过pipeline或者painless方式对数据进行预处理。Pipeline:数据管道,对于进入管道的数据进行有序的加工,它由一系列的processer处理器组成,每个处理器都可以单独做预处理,在es中内置了一系列的处理器,也可以扩展,pipeline的使用需要先在ES订阅pipeline,在更新文档或者重建索引的时候指定pipeline;painless脚本,在数据更新时候,可以用painless脚本进行预处理,在数据查询时,可以拦截数据修改返回值。

  1. 1 初始化数据
  2. DELETE tech_blogs
  3. PUT tech_blogs/_doc/1
  4. {
  5. "title":"Introducing big data......",
  6. "tags":"hadoop,elasticsearch,spark",
  7. "content":"You konw, for big data"
  8. }
  9. 2 pipeline 测试 tags字段转数组
  10. POST _ingest/pipeline/_simulate
  11. {
  12. "pipeline": {
  13. "description": "to split blog tags",
  14. "processors": [
  15. {
  16. "split": {
  17. "field": "tags",
  18. "separator": ","
  19. }
  20. }
  21. ]
  22. },
  23. "docs": [
  24. {
  25. "_index": "index",
  26. "_id": "id",
  27. "_source": {
  28. "title": "Introducing big data......",
  29. "tags": "hadoop,elasticsearch,spark",
  30. "content": "You konw, for big data"
  31. }
  32. },
  33. {
  34. "_index": "index",
  35. "_id": "idxx",
  36. "_source": {
  37. "title": "Introducing cloud computering",
  38. "tags": "openstack,k8s",
  39. "content": "You konw, for cloud"
  40. }
  41. }
  42. ]
  43. }
  44. 3 同时为文档,增加一个字段。blog查看量
  45. POST _ingest/pipeline/_simulate
  46. {
  47. "pipeline": {
  48. "description": "to split blog tags",
  49. "processors": [
  50. {
  51. "split": {
  52. "field": "tags",
  53. "separator": ","
  54. }
  55. },
  56. {
  57. "set":{
  58. "field": "views",
  59. "value": 0
  60. }
  61. }
  62. ]
  63. },
  64. "docs": [
  65. {
  66. "_index":"index",
  67. "_id":"id",
  68. "_source":{
  69. "title":"Introducing big data......",
  70. "tags":"hadoop,elasticsearch,spark",
  71. "content":"You konw, for big data"
  72. }
  73. },
  74. {
  75. "_index":"index",
  76. "_id":"idxx",
  77. "_source":{
  78. "title":"Introducing cloud computering",
  79. "tags":"openstack,k8s",
  80. "content":"You konw, for cloud"
  81. }
  82. }
  83. ]
  84. }
  85. 4 在ES添加注册一个 Pipeline
  86. PUT _ingest/pipeline/blog_pipeline
  87. {
  88. "description": "a blog pipeline",
  89. "processors": [
  90. {
  91. "split": {
  92. "field": "tags",
  93. "separator": ","
  94. }
  95. },
  96. {
  97. "set":{
  98. "field": "views",
  99. "value": 0
  100. }
  101. }
  102. ]
  103. }
  104. 5查看Pipleline
  105. GET _ingest/pipeline/blog_pipeline
  106. 6测试pipeline
  107. POST _ingest/pipeline/blog_pipeline/_simulate
  108. {
  109. "docs": [
  110. {
  111. "_source": {
  112. "title": "Introducing cloud computering",
  113. "tags": "openstack,k8s",
  114. "content": "You konw, for cloud"
  115. }
  116. }
  117. ]
  118. }
  119. 7不使用pipeline更新数据
  120. PUT tech_blogs/_doc/1
  121. {
  122. "title":"Introducing big data......",
  123. "tags":"hadoop,elasticsearch,spark",
  124. "content":"You konw, for big data"
  125. }
  126. 8使用pipeline更新数据
  127. PUT tech_blogs/_doc/2?pipeline=blog_pipeline
  128. {
  129. "title": "Introducing cloud computering",
  130. "tags": "openstack,k8s",
  131. "content": "You konw, for cloud"
  132. }
  133. 9查看两条数据,一条被处理,一条未被处理
  134. POST tech_blogs/_search
  135. {}
  136. 10 重建索引update_by_query 会导致错误
  137. POST tech_blogs/_update_by_query?pipeline=blog_pipeline
  138. {
  139. }
  140. 11增加update_by_query的条件(对没预处理的文档重建)
  141. POST tech_blogs/_update_by_query?pipeline=blog_pipeline
  142. {
  143. "query": {
  144. "bool": {
  145. "must_not": {
  146. "exists": {
  147. "field": "views"
  148. }
  149. }
  150. }
  151. }
  152. }
  153. 12 增加一个 通过painless脚本添加content_length字段
  154. POST _ingest/pipeline/_simulate
  155. {
  156. "pipeline": {
  157. "description": "to split blog tags",
  158. "processors": [
  159. {
  160. "split": {
  161. "field": "tags",
  162. "separator": ","
  163. }
  164. },
  165. {
  166. "script": {
  167. "source": """
  168. if(ctx.containsKey("content")){
  169. ctx.content_length = ctx.content.length();
  170. }else{
  171. ctx.content_length=0;
  172. }
  173. """
  174. }
  175. },
  176. {
  177. "set":{
  178. "field": "views",
  179. "value": 0
  180. }
  181. }
  182. ]
  183. },
  184. "docs": [
  185. {
  186. "_index":"index",
  187. "_id":"id",
  188. "_source":{
  189. "title":"Introducing big data......",
  190. "tags":"hadoop,elasticsearch,spark",
  191. "content":"You konw, for big data"
  192. }
  193. },
  194. {
  195. "_index":"index",
  196. "_id":"idxx",
  197. "_source":{
  198. "title":"Introducing cloud computering",
  199. "tags":"openstack,k8s",
  200. "content":"You konw, for cloud"
  201. }
  202. }
  203. ]
  204. }
  205. 13 重新初始化数据
  206. DELETE tech_blogs
  207. PUT tech_blogs/_doc/1
  208. {
  209. "title":"Introducing big data......",
  210. "tags":"hadoop,elasticsearch,spark",
  211. "content":"You konw, for big data",
  212. "views":0
  213. }
  214. 14 通过painless脚本给view字段值+100
  215. POST tech_blogs/_update/1
  216. {
  217. "script": {
  218. "source": "ctx._source.views += params.new_views",
  219. "params": {
  220. "new_views":100
  221. }
  222. }
  223. }
  224. 15 查看views计数
  225. POST tech_blogs/_search
  226. {
  227. }
  228. 16 保存painless脚本在 Cluster State
  229. POST _scripts/update_views
  230. {
  231. "script":{
  232. "lang": "painless",
  233. "source": "ctx._source.views += params.new_views"
  234. }
  235. }
  236. 17 测试对其值增加1000
  237. POST tech_blogs/_update/1
  238. {
  239. "script": {
  240. "id": "update_views",
  241. "params": {
  242. "new_views":1000
  243. }
  244. }
  245. }
  246. 18 在查询时通过painless脚本对查询结果view字段添加一个随机值
  247. GET tech_blogs/_search
  248. {
  249. "script_fields": {
  250. "rnd_views": {
  251. "script": {
  252. "lang": "painless",
  253. "source": """
  254. java.util.Random rnd = new Random();
  255. doc['views'].value+rnd.nextInt(1000);
  256. """
  257. }
  258. }
  259. },
  260. "query": {
  261. "match_all": {}
  262. }
  263. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/901923
推荐阅读
相关标签
  

闽ICP备14008679号