当前位置:   article > 正文

2024年大数据最全使用logstash迁移ES数据并解决限流等问题(1),2024年最新字节跳动大数据开发三面凉凉_logstash 同步es

logstash 同步es

img
img

网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。

需要这份系统化资料的朋友,可以戳这里获取

一个人可以走的很快,但一群人才能走的更远!不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!


不知道nohup啥意思的自己搜索下


## 配置文件


上游elasticsearch文档


https://www.elastic.co/guide/en/logstash/7.10/plugins-inputs-elasticsearch.html


下游elasticsearch文档


https://www.elastic.co/guide/en/logstash/7.10/plugins-outputs-elasticsearch.html


查看文档一顿吭哧吭哧配置文件写好了



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

input {

上游

elasticsearch {
hosts => “http://es1.es.com:80”
index => “xxx”
user => “elastic
password => “XXX”
query => ‘{ “query”: { “query_string”: { “query”: “*” } } }’
size => 2000
scroll => “10m”
docinfo => true
}
}

output {

下游

elasticsearch {
hosts => “http://es2.es.com:80”
index => “xxx”
user => “elastic”
password => “XXX”
document_id => “%{[@metadata][_id]}”
}
}


是不是很简单?当然这个从一个ES级群迁移数据到两一个ES集群的事情虽然不难,其实还是会遇到一些问题的。


## 遇到的问题


### 文档中指定了routing


你直接用上面的配置文件硬怼,就会遇到如下的告警日志



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

[2024-03-04T10:56:51,751][WARN ][logstash.outputs.elasticsearch][[main]>worker6][main][b7552c5d93f7de321e4e8f1e6da7bf8ec4696e8dff2bb087018235182d1f7fe2] Could not index event to Elasticsearch. {:status=>400, :action=>[“index”, {:_id=>“ded5349e62e678cbf222560e5da90a47”, :_index=>“xxx”, :routing=>nil, :_type=>“_doc”}, #LogStash::Event:0x5d3bdb61], :response=>{“index”=>{“_index”=>“xxx”, “_type”=>“_doc”, “_id”=>“ded5349e62e678cbf222560e5da90a47”, “status”=>400, “error”=>{“type”=>“routing_missing_exception”, “reason”=>“routing is required for [xxx]/[_doc]/[ded5349e62e678cbf222560e5da90a47]”, “index_uuid”=>“na”, “index”=>“xxx”}}}}
[2024-03-04T10:56:51,751][WARN ][logstash.outputs.elasticsearch][[main]>worker8][main][b7552c5d93f7de321e4e8f1e6da7bf8ec4696e8dff2bb087018235182d1f7fe2] Could not index event to Elasticsearch. {:status=>400, :action=>[“index”, {:_id=>“1181a16445b0069dc824fdde48454b57”, :_index=>“xxx”, :routing=>nil, :_type=>“_doc”}, #LogStash::Event:0x5a1ba4d6], :response=>{“index”=>{“_index”=>“xxx”, “_type”=>“_doc”, “_id”=>“1181a16445b0069dc824fdde48454b57”, “status”=>400, “error”=>{“type”=>“routing_missing_exception”, “reason”=>“routing is required for [xxx]/[_doc]/[1181a16445b0069dc824fdde48454b57]”, “index_uuid”=>“na”, “index”=>“xxx”}}}}


啥情况?



  • 1
  • 2
  • 3
  • 4
  • 5

{“type”=>“routing_missing_exception”, “reason”=>“routing is required for [xxx]/[_doc]/[ded5349e62e678cbf222560e5da90a47]”, “index_uuid”=>“na”, “index”=>“xxx”}}}


原来是没有指定routing字段


我们来看下索引信息



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

{
“xxx” : {
“aliases” : { },
“mappings” : {
“_routing” : {
“required” : true
},
“properties” : {

  }
},
"settings" : {
}
  • 1
  • 2
  • 3
  • 4

}
}


原来如此,需要指定routing,配置文件一通改,就变成了下面的模样



  • 1
  • 2
  • 3
  • 4
  • 5
input {
  elasticsearch {
    hosts => "http://es1.es.com:80"
    index => "xxx"
    user => "elastic"
    password => "XXX"
    query => '{ "query": { "query_string": { "query": "*" } } }'
    size => 2000
    scroll => "1m"
    docinfo => true
    # input中添加routing
    docinfo_fields => ["_index", "_id", "_type", "_routing"]
  }
}

output {
  elasticsearch {
    hosts => "http://es2.es.com:80"
    index => "xxx"
    user => "elastic"
    password => "XXX"
    document_id => "%{[@metadata][_id]}"
    # 指定routing
    routing => "%{[@metadata][_routing]}"
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

那么问题来了,如果你所有的索引都用这个模板,那么当上游没有指定routing字段的时候,下游的数据中的routing字段就会是`[@metadata][_routing]`,真的是人都麻了,这个logstash组件一段都不智能,那么这个问题能解决吗?别急,看到最后你就知道了


### 索引严格模式,无法写入@timestamp和@version字段


上面的问题解决了,跑着跑着,又遇到事了



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

[2024-03-04T11:43:48,372][WARN ][logstash.outputs.elasticsearch][[main]>worker0][main][23eda3c9518e4ba5a787adadf9714d5512c8ad9a9754020744b84ca81fe1bedc] Could not index event to Elasticsearch. {:status=>400, :action=>[“index”, {:_id=>“110109711637125402”, :_index=>“xxx”, :routing=>nil, :_type=>“_doc”}, #LogStash::Event:0x5e156236], :response=>{“index”=>{“_index”=>“xxx”, “_type”=>“_doc”, “_id”=>“110109711637125402”, “status”=>400, “error”=>{“type”=>“strict_dynamic_mapping_exception”, “reason”=>“mapping set to strict, dynamic introduction of [@timestamp] within [_doc] is not allowed”}}}}
[2024-03-04T11:43:48,372][WARN ][logstash.outputs.elasticsearch][[main]>worker0][main][23eda3c9518e4ba5a787adadf9714d5512c8ad9a9754020744b84ca81fe1bedc] Could not index event to Elasticsearch. {:status=>400, :action=>[“index”, {:_id=>“110109711960916147”, :_index=>“xxx”, :routing=>nil, :_type=>“_doc”}, #LogStash::Event:0x75333e01], :response=>{“index”=>{“_index”=>“xxx”, “_type”=>“_doc”, “_id”=>“110109711960916147”, “status”=>400, “error”=>{“type”=>“strict_dynamic_mapping_exception”, “reason”=>“mapping set to strict, dynamic introduction of [@timestamp] within [_doc] is not allowed”}}}}
[2024-03-04T11:43:48,372][WARN ][logstash.outputs.elasticsearch][[main]>worker0][main][23eda3c9518e4ba5a787adadf9714d5512c8ad9a9754020744b84ca81fe1bedc] Could not index event to Elasticsearch. {:status=>400, :action=>[“index”, {:_id=>“110109712328692950”, :_index=>“xxx”, :routing=>nil, :_type=>“_doc”}, #LogStash::Event:0x7405cd45], :response=>{“index”=>{“_index”=>“xxx”, “_type”=>“_doc”, “_id”=>“110109712328692950”, “status”=>400, “error”=>{“type”=>“strict_dynamic_mapping_exception”, “reason”=>“mapping set to strict, dynamic introduction of [@timestamp] within [_doc] is not allowed”}}}}


看下索引结构



  • 1
  • 2
  • 3
  • 4
  • 5

{
“xxx” : {
“aliases” : { },
“mappings” : {
“dynamic” : “strict”,
“properties” : {
}
},
“settings” : {
“index” : {
}
}
}
}


原来是索引设置了,严格模式,不允许插入新的字段,那咋整?


还有logstash支持一些filter可以删除掉一些字段,那么我们安排上



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

input {
elasticsearch {
hosts => “http://es1.es.com:80”
index => “merchant_order_rel_pro_v2”
user => “elastic”
password => “XXX”
query => ‘{ “query”: { “query_string”: { “query”: “*” } } }’
size => 2000
scroll => “1m”
docinfo => true
}
}
filter {
mutate {
# 删除logstash多余字段
remove_field => [“@version”,“@timestamp”]
}
}
output {
elasticsearch {
hosts => “http://es2.es.com:80”
index => “xxx”
user => “elastic”
password => “XXX”
document_id => “%{[@metadata][_id]}”
}
}


### logstash限流


有的时候写入的太快了,下游扛不住,刚开始是通过修改参数来解决,但是每次修改任务都要重新跑,人有点麻了


网上找了一通也没见到logstash有限流插件


发现可以调用本地ruby脚本,不会ruby让gpt生成了一个令牌桶算法的脚本,但是限流效果一言难尽,只能说能限流,但是数字不是你想要的值。


没办法了只好研究下怎么编写插件,结果gradle功底太差了,源码编译不过彻底麻了


最后没办法,自己写了个java版本的基于guava的RateLimiter实现的限流插件打成jar包直接放进去解决了该问题


https://github.com/valsong/logstash-java-rate-limiter


#### logstash-java-rate-limiter使用方法


使用方法也很简单,将我编写的插件的jar放到目录`logstash/logstash-core/lib/jars/`中即可


* 参数




| param | type | required | 默认值 | 样例 | desc |
| --- | --- | --- | --- | --- | --- |
| rate\_path | string | no | 无 | /usr/share/logstash/rate.txt | 从该文件中读取第一行作为限流值,你可以随时修改这个文件中的限流值 |
| count\_path | string | no | 无 | /usr/share/logstash/count.txt | 记录已经同步的事件的数量到该文件中 |
| count\_log\_delay\_sec | long | no | 30 | 30 | 根据设置的秒数以固定间隔在logstash的日志中打印事件数量 |


* 在文件中设置限流值



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

echo 5000 > /usr/share/logstash/rate.txt


* 添加一个filter叫`java_rate_limit`到任务的配置文件中



  • 1
  • 2
  • 3
  • 4
  • 5

input {
elasticsearch {
hosts => “http://xxx-es.xxx.com:9200”
index => “xxx”
user => “elastic”
password => “XXXX”
query => ‘{ “query”: { “query_string”: { “query”: “*” } } }’
size => 2000
scroll => “10m”
docinfo => true
# docinfo_fields => [“_index”, “_id”, “_type”, “_routing”]
}
}

img
img

网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。

需要这份系统化资料的朋友,可以戳这里获取

一个人可以走的很快,但一群人才能走的更远!不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!

img-ZNqSgMWC-1715600083445)]
[外链图片转存中…(img-dScJs60j-1715600083445)]

网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。

需要这份系统化资料的朋友,可以戳这里获取

一个人可以走的很快,但一群人才能走的更远!不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号