赞
踩
千万、亿级别数据批量写入ES的调优和原理解析
Elasticsearch version (bin/elasticsearch --version):
7.8
Plugins installed:
kibana
JVM version (java -version):
java version "1.8.0_102"
OS version (uname -a if on a Unix-like system):
Linux 4.9.0-4-amd64 #1 SMP Debian 4.9.65-3 (2017-12-03) x86_64 GNU/Linux
ES节点:3台,4C16G,JVM8G
ES 是 JAVA 应用——底层存储引擎是基于 Lucene 的
1.1、是JAVA应用,就离不开JVM和GC
内存从大的方面分为堆内内存和堆外内存
1.2、堆外内存概念
堆外一般指堆外内存,英文全称:off-heap memory
堆外内存=物理机内存
堆外内存指的是java虚拟机堆以外的内存,这个区域是受操作系统管理,而不是jvm。
1.3、堆内内存概念
堆内一般指堆内内存,英文全称:on-heap memory (heap:堆,java的内存区)
堆内内存 = 新生代+老年代+持久代
对于JVM,在jvm参数中可使用-Xms,-Xmx等参数就可以设置堆的大小和最大值
1.4、Elasticsearch内部是如何使用这些内存的呢?下面这张图说明了Elasticsearch和Lucene对内存的使用情况。
上图解析:Elasticsearch 限制的内存大小是 JAVA 堆空间的大小,不包括Lucene 缓存倒排索引数据空间。
- Node Query Cache的默认大小:
-
- indices.queries.cache.size:10% // 也可以设置为绝对值,比如512mb
- index.queries.cache.enabled:true
- Indexing Buffer的默认大小:
- indices.memory.index_buffer_size:10%
- indices.memory.min_index_buffer_size:48mb
- indices.memory.max_index_buffer_size:unbounded
- 修改jvm heap大小:
- vim /opt/elasticsearch/config/jvm.options
- 设置
- -Xms10g
- -Xms10g
-
- 接着停止es集群(kill -9 pid),再启动
- su es
- ./bin/elasticsearch -d
-
- 查看是否生效
- ps -ef | grep elasticsearch
- Shard Request Cache的默认大小:
- indices.requests.cache.size:1%
- Field Data Cache的默认大小:
- indices.fielddata.cache.size:unbounded
如果不在analyzed string fields上使用聚合,就不会产生Field Data Cache,也就不会使用大量的内存,所以可以考虑分配较小的heap给Elasticsearch。因为heap越小意味着Elasticsearch的GC会比较快,并且预留给Lucene的内存也会比较大。1.5、其他影响内存项
https://www.elastic.co/guide/en/elasticsearch/reference/6.7/modules-threadpool.html
GET /_cat/segments?v
GET /_cat/nodes?v&h=id,ip,port,v,master,name,heap.current,heap.percent,heap.max,
ram.current,ram.percent,ram.max,
fielddata.memory_size,fielddata.evictions,query_cache.memory_size,query_cache.evictions,
request_cache.memory_size,request_cache.evictions,request_cache.hit_count,request_cache.miss_count
indices.breaker.fielddata.limit:60% (默认heap的60%)
如果设置了indices.fielddata.cache.size,当达到size时,cache会剔除旧的fielddata。indices.breaker.fielddata.limit 必须大于 indices.fielddata.cache.size,否则只会触发fielddata circuit breaker,而不会剔除旧的fielddata。1)Elasticsearch默认安装后设置的内存是1GB,这是远远不够用于生产环境的。有两种方式修改Elasticsearch的堆内存:
export ES_HEAP_SIZE=10g
在es启动时会读取该变量;./bin/elasticsearch -Xmx10g -Xms10g
2)分配给 es 的内存最好不要超过 32G
我有一个 1 TB 内存的机器!
这个 32 GB 的分割线是很重要的。那如果你的机器有很大的内存怎么办呢? 一台有着 512–768 GB内存的服务器愈发常见。
首先,我们建议避免使用这样的高配机器(参考 硬件)。
但是如果你已经有了这样的机器,你有三个可选项:
- 你主要做全文检索吗?考虑给 Elasticsearch 4 - 32 GB 的内存, 让 Lucene 通过操作系统文件缓存来利用余下的内存。那些内存都会用来缓存 segments,带来极速的全文检索。
- 你需要更多的排序和聚合?而且大部分的聚合计算是在数字、日期、地理点和
非分词
字符串上?你很幸运,你的聚合计算将在内存友好的 doc values 上完成! 给 Elasticsearch 4 到 32 GB 的内存,其余部分为操作系统缓存内存中的 doc values。- 你在对分词字符串做大量的排序和聚合(例如,标签或者 SigTerms,等等)不幸的是,这意味着你需要 fielddata,意味着你需要堆空间。考虑在单个机器上运行两个或多个节点,而不是拥有大量 RAM 的一个节点。仍然要坚持 50% 原则。
假设你有个机器有 128 GB 的内存,你可以创建两个节点,每个节点内存分配不超过 32 GB。 也就是说不超过 64 GB 内存给 ES 的堆内存,剩下的超过 64 GB 的内存给 Lucene。
如果你选择这一种,你需要配置
cluster.routing.allocation.same_shard.host: true
。 这会防止同一个分片(shard)的主副本存在同一个物理机上(因为如果存在一个机器上,副本的高可用性就没有了)。
参考2.x官方文档
ES写入/索引流程分析如上图,简单分析一下索引写入流程
1、client发起write请求
2、数据写到index buffer和translog中
3、经过1s或者达到10%堆内存阈值后将数据从buffer中的数据写入到segment file,refresh到os cache(FileSystem cache)中,打开供搜索,并清空buffer
4、当translog达到flush_threshold_size大小后,触发commit操作
4-1)将此时buffer中的数据写入到新的segment,并写入到os cache,打开被搜索,然后清空buffer
4-2)一个commit point被写入磁盘文件,标记了被写入的所有index segment file
4-3)同时将os cache中的所有segment file都fsync到DISK中,这个过程叫flush
4-4)清空现有translog日志文件,新建一个translog
文章参考:
配置项参考该类:elasticsearch-hadoop-6.8.21.jar
org.elasticsearch.hadoop.cfg#ConfigurationOptions
主要的配置项如下
1、连接参数配置
"es.http.timeout" -> "5m" ①
"es.http.retries" -> "50" ②
① ② 这两个参数是控制http接口层面的超时及重试,覆盖读请求和写请求,默认值比较小,默认超时时间为1分钟,重试次数为3,建议调整为超时时间5分钟,重试次数50次。
2、写入批次配置
"es.batch.size.bytes" -> "10mb" ①
"es.batch.size.entries" -> "20000" ②"es.batch.write.refresh" -> "false" ③
"es.batch.write.retry.count" -> "10" ④
"es.batch.write.retry.wait" -> "60s" ⑤
① ② 这两个参数控制单次批量写入的数据量大小和条数,数据积累量先达到哪个参数设置,都会触发一次批量写入。增大单次批量写入的数据,可以提高写入ES的整体吞吐。
因为ES的写入一般是顺序写入,在一次批量写入中,很多数据的写入处理逻辑可以合并,大量的IO操作也可以合并。
默认值设置的比较小,可以适当根据集群的规模调大这两个值,建议为20MB和2w条。
③ 是否每次bulk操作后都进行refresh。 每次refresh后,ES会将当前内存中的数据生成一个新的segment。如果refresh速度过快,会产生大量的小segment,大量segment在进行合并时,会消耗磁盘的IO。默认值为开启,如果写时查询要求没那么高,建议设置为false。在索引的settings中通过refresh_interval配置项进行控制,可以根据业务的需求设置为30s或-1,index_buffer默认是堆的10%,满了就会refresh
④ ⑤ 这两个参数会控制单次批量写入请求的重试次数,以及重试间隔。当超过重试次数后,Yarn任务管理会将该任务标记为failed,造成整个写数据任务的失败。默认值为3,为了防止集群偶发的网络抖动或压力过大造成的集群短暂熔断,建议将这个值调大,设置为50。
当每条数据比较均匀的时候,用es.batch.size.entries限制批量写入条数比较合适,但是当每条数据不均匀时,建议用es.batch.size.bytes限制每批次的写入数据量比较合适。当然,bulk size不能无限的增大,会造成写入任务的积压。
实际效果:spark调优过程比较便捷,基于上述配置,可以达到 “单机4C16G——10个线程——20w/s的写入速度”,5KW数据量-45min左右可写入完成
ElasticSearch的配置选项分为静态设置和动态设置两种,静态设置必须在结点级别(node-level)设置,或配置在elasticsearch.yml配置文件中,或配置在环境变量中,或配置在命令行中,在结点启动之后,静态设置不能修改;动态索引可以创建时添加或者创建后再添加或者修改
5.1、bulk批量写入
5.2、多线程写入
5.3、修改translog flush 间隔
- index.translog.durability: async // 刷新策略,默认request
- index.translog.sync_interval:120s // 默认5s,translog buffer到文件的刷新时间
- index.translog.flush_threshold_size:1gb
- /**
- flush阈值,默认512MB;超过512M,触发flush操作
- 会将index buffer中的数据,refresh到os cache中
- 产生新的segment file,如果这个值太小,
- 就会频繁发生refresh、merge和flush
- **/
5.4、修改索引刷新时间及副本数
curl -XPUT "http://localhost:9200/myindex/_settings" -H 'Content-Type: application/json' -d' { "index.number_of_replicas": 0, "index.refresh_interval": "120s" }'
- // org.elasticsearch.action.support.WriteRequest.RefreshPolicy#WriteRequestBuilder
- default B setRefreshPolicy(RefreshPolicy refreshPolicy) {
- request().setRefreshPolicy(refreshPolicy);
- return (B) this;
- }
枚举org.elasticsearch.action.support.WriteRequest.RefreshPolicy
定义了三种策略:
- NONE,
- IMMEDIATE,
- WAIT_UNTIL;
可知有以下三种刷新策略:
RefreshPolicy#IMMEDIATE:
RefreshPolicy#WAIT_UNTIL:
RefreshPolicy#NONE:
5.5、修改merge参数以及线程数
- cat /sys/block/*/queue/rotational
- -- 0 SSD // 固态硬盘
- -- 1 HDD // 机械硬盘
参数修改 | 好处 | 坏处 |
---|---|---|
index.merge.policy.max_merge_at_once index.merge.policy.segments_per_tier (eg :50) | 提升indexing速度 | 减少了segment merge动作的发生,意味着更多的segments,会降低searching速度 |
index.merge.policy.max_merge_at_once index.merge.policy.segments_per_tier (eg :5) | Segments更少,即能够提升searching速度 | 更多的segments merge操作,会花费更多系统资源(CPU/IO/RAM),会降低indexing速度 |
curl -XPUT "https://localhost:9200/index/_settings?pretty" -H 'Content-Type: application/json' -d' { "index": { "merge": { "scheduler": { "max_thread_count": "1" }, "policy": { "segments_per_tier": "24", "max_merge_at_once": "24", "floor_segment": "2m", "max_merged_segment": "2g" } } } }'
5.6、禁用Doc Values
curl -XPUT "http://localhost:9200/myindex" -H 'Content-Type: application/json' -d'
{
"mappings": {
"my_type": {
"properties": {
"session_id": {
"type": "keyword",
"doc_values": false
}
}
}
}
}'
5.7、禁用_source字段
curl -XPUT 'https://ip:port/index?pretty' -H 'Content-Type: application/json' -d' { "mappings": { "tweet": { "_source": { "enabled": false } } } }'
说明: 在禁用_source 字段之前请注意:如果_source字段不可用,则不支持以下功能:
5.8、自动生成DOC_ID
5.9、禁用 _source 和禁用 _all 字段
5.10、bulk api解决超时问题
-
- BulkRequest request = new BulkRequest();
- request.timeout("2m"); // 索引操作超时时间
-
- // option-1
- RestClientBuilder builder = RestClient.builder(
- new HttpHost("localhost", 9200))
- .setRequestConfigCallback(
- new RestClientBuilder.RequestConfigCallback() {
- @Override
- public RequestConfig.Builder customizeRequestConfig(
- RequestConfig.Builder requestConfigBuilder) {
- return requestConfigBuilder
- .setConnectTimeout(5000)
- .setSocketTimeout(60000)
- .setConnectionRequestTimeout(5000); // 获取连接的超时时间
- }
- });
-
- // option-2
- RequestConfig requestConfig = RequestConfig.custom()
- .setConnectTimeout(5000)
- .setSocketTimeout(60000)
- .setConnectionRequestTimeout(5000)
- .build();
- RequestOptions options = RequestOptions.DEFAULT.toBuilder()
- .setRequestConfig(requestConfig)
- .build();
RequestConfig有三个超时如下
kibana监控模块通过调用es索引存储的监控数据,制作了许多开箱即用报表供用户使用。主要分为集群层面、节点层面和索引层面
kibana通过es索引中存储的数据计算出了许多指标报表,如上图所示包含了查询(加载)速率和查询(加载)延时,除此之外还有cpu、内存、存储以及负载占用等等许多指标
参考文章
es实战-Monitoring原理讲解及kibana可视化实战_casterQ的博客-CSDN博客_kibana的monitoring
1、3台4C16G的ES集群
2、字段数100个,
3、字段长度50字节,
4、一条记录打满5KB,
5、数据量5000w,
6、运行资源标准1C4G(倍数增长),
7、并发度即程序线程数,8、UUID作为DOC_ID
调优过程比较多,截取有针对性提升的时间节点对应的调优项
调优项 | 批次/条 | 运行资源 | 并发度 | 同步效率 | 同步时间 |
---|---|---|---|---|---|
jvm heap 1G | 5000 | 2C8G | 4 | / | 211.08min |
jvm heap 8G | 3000 | 4C16G | 8 | 50w/min | 159min |
jvm heap 8G | 2000 | 4C16G | 8 | 50w/min | 154min |
+refresh_interval:10s +max_thread_count:1 +number_replicas:0 | 2000 | 6C24G | 12 | 75w/min | 77min |
+index_buffer_size:20% | 5000 | 6C24G | 12 | 76w/min | 72.73min |
+refresh_interval:20s +max_merge_at_once:50 +segment_per_tier:50 | 5000 | 6C24G | 16 | 78w/min | 70min |
+number_of_shards:3 +translog.durability:async +translog.sync_interval:60s +flush_threshold_size:1g | 5000 | 6C24G | 16 | 90w/min | 57.62min |
继承上述调优项 | 5000 | 2C8G | 4 | 60w/min | 92.77min |
+refresh_interval:60s | 5000 | 2C8G | 4 | 62w/min | 86.87min |
继承上述调优项 | 5000 | 4C16G | 12 | 92w/min | 59.35min |
- {
- "settings": {
- "index": {
- "refresh_intervals": "60s",
- "translog": {
- "flush_threshold_size": "1g",
- "sync_interval": "60s",
- "durability": "async"
- },
- "unassigned": {
- "node_left": {
- "delayed_timeout": "3h"
- }
- },
- "number_of_replicas": 0,
- "merge": {
- "scheduler": {
- "max_thread_count": 1
- },
- "policy": {
- "segment_per_tier": 50,
- "max_merge_at_once": 50
- }
- }
- }
- }
- }
1)方法比结论重要。一个系统性问题往往是多种因素造成的,在处理集群的写入性能问题上,了解原理后,需要将问题分解,在单台机子上进行压测,观察哪种系统资源达到极限;例如:CPU、磁盘利用率、I/O block、线程切换、堆栈状态;然后分析并调整参数,优化单台能力,先解决局部问题,在此基础上解决整体问题效率更高
2)可以使用更好的CPU,或者SSD,对写入性能提升明显
3)若能达到3w/s的写入性能,优化就差不多了
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。