当前位置:   article > 正文

SpringBoot 集成Elastic Search,写入性能优化尝试_springboot elasticsearch-sql 性能

springboot elasticsearch-sql 性能

项目计划将采集的设备日志数据存储在elastic search中,所以开始尝试使用elastic search替换原来的关系型数据库。

(一)SpringBoot 集成Elastic Search

首先将项目的Springboot版本升到最新版:2.2.6,该版本支持ElasticSearch的6.8.7版本

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.2.6.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

然后在项目中引入ElasticSearch

<dependency>
    <groupId>org.springframework.data</groupId>
    <artifactId>spring-data-elasticsearch</artifactId>
</dependency>

(二)ElasticsearchTemplate集成

创建ES对象类,对应ES中的一个Index+Type

  1. @Data
  2. @JsonInclude(JsonInclude.Include.NON_NULL)
  3. @Document(indexName = "device_log", type = "log", shards = 3, replicas = 0, refreshInterval = "10s")
  4. public class DeviceLogPo {
  5. @Id
  6. private Long uuid;
  7. @Field(type = FieldType.Keyword)
  8. private String deviceId;
  9. @Field(type = FieldType.Integer)
  10. private Integer source;
  11. @Field(type = FieldType.Integer)
  12. private Integer operateUser;
  13. @Field(type = FieldType.Text)
  14. private String content;
  15. @Field(type = FieldType.Date)
  16. private Date createTime;
  17. @Field(type = FieldType.Date)
  18. private Date updateTime;
  19. @Field(type = FieldType.Keyword)
  20. private String topic;
  21. }

 创建Repository对象类,用于进行增删改查操作

  1. public interface DeviceLogRepository extends ElasticsearchRepository<DeviceLogPo, Long> {
  2. }

 创建Service类,结合具体业务实现

  1. @Service
  2. @Slf4j
  3. public class DeviceLogService extends ElasticSearchBaseService {
  4. @Autowired
  5. private SnowflakeUtil snowflakeUtil;
  6. @Autowired
  7. private DeviceLogRepository deviceLogRepository;
  8. /**
  9. *
  10. * @param deviceLogPo
  11. * @return
  12. */
  13. public int insert(DeviceLogPo deviceLogPo) {
  14. try {
  15. //不手动设置ID时,ES会自动生成id
  16. // long uuid = snowflakeUtil.getUUID();
  17. // deviceLogPo.setUuid(uuid);
  18. deviceLogRepository.save(deviceLogPo);
  19. return 1;
  20. } catch (Exception e) {
  21. log.error(null, e);
  22. return 0;
  23. }
  24. }
  25. public void bulkIndex(List<DeviceLogPo> deviceLogPoList) {
  26. try {
  27. int count = 0;
  28. if (elasticsearchTemplate.indexExists(DeviceLogPo.class)) {
  29. elasticsearchTemplate.createIndex(DeviceLogPo.class);
  30. }
  31. ArrayList<IndexQuery> queries = new ArrayList<>();
  32. for (DeviceLogPo deviceLogPo : deviceLogPoList) {
  33. IndexQuery indexQuery = new IndexQuery();
  34. indexQuery.setId(String.valueOf(snowflakeUtil.getUUID()));
  35. indexQuery.setObject(deviceLogPo);
  36. indexQuery.setIndexName("device_log");
  37. indexQuery.setType("log");
  38. queries.add(indexQuery);
  39. if (count % 500 == 0) {
  40. elasticsearchTemplate.bulkIndex(queries);
  41. queries.clear();
  42. }
  43. count++;
  44. }
  45. if (queries.size() > 0) {
  46. elasticsearchTemplate.bulkIndex(queries);
  47. }
  48. } catch (Exception e) {
  49. log.error(null, e);
  50. }
  51. }
  52. }

(三)Index Template配置

template需要在index创建直接配置好,这样才能生效。6.8.x的版本需要通过http请求的方式来创建template

  1. 设置index模板:
  2. put http://xxxxxx:9200/_template/template_device
  3. {
  4. "template": "device_*",
  5. "settings": {
  6. "index": {
  7. "refresh_interval": "10s", //10秒刷新
  8. "number_of_shards": "3", //主分片数量
  9. "number_of_replicas": "0", //副本数量
  10. "translog": {
  11. "flush_threshold_size": "128mb", //内容容量到达1gb异步刷新
  12. "sync_interval": "30s", //间隔30s异步刷新(设置后无法更改)
  13. "durability": "async" //异步刷新
  14. }
  15. }
  16. }
  17. }

(四)写入性能测试

创建controller,统计写入耗时

  1. @Slf4j
  2. @RestController
  3. @RequestMapping(value = "device", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
  4. public class DeviceController extends BaseController {
  5. @Autowired
  6. private SnowflakeUtil snowflakeUtil;
  7. @Autowired
  8. protected DeviceLogService deviceLogService;
  9. /**
  10. *
  11. * @param deviceLogPo
  12. * @return
  13. */
  14. @PostMapping(value = "insertDeviceLog")
  15. public int insertDeviceLog(@RequestBody DeviceLogPo deviceLogPo) {
  16. long start = System.currentTimeMillis();
  17. ArrayList list = new ArrayList();
  18. for (int i = 0; i < 100000; i++) {
  19. DeviceLogPo temp = new DeviceLogPo();
  20. temp.setContent(deviceLogPo.getContent());
  21. temp.setOperateUser(deviceLogPo.getOperateUser());
  22. list.add(temp);
  23. }
  24. deviceLogService.bulkIndex(list);
  25. long end = System.currentTimeMillis();
  26. log.debug("cost:" + (end - start) + "ms=============================");
  27. return 0;
  28. }
  29. }

(五)总结

实际测试结果:

服务器配置:3台4核4GB服务器组成的ES集群

单条写入性能:40条/秒

bulk写入性能:4800条/秒的速度

 

ES的单条写入性能比较差,大数据量的写入需要结合bulk写入来实现。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/200635
推荐阅读
相关标签
  

闽ICP备14008679号