赞
踩
项目计划将采集的设备日志数据存储在elastic search中,所以开始尝试使用elastic search替换原来的关系型数据库。
(一)SpringBoot 集成Elastic Search
首先将项目的Springboot版本升到最新版:2.2.6,该版本支持ElasticSearch的6.8.7版本
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.6.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent>
然后在项目中引入ElasticSearch
<dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-elasticsearch</artifactId> </dependency>
(二)ElasticsearchTemplate集成
创建ES对象类,对应ES中的一个Index+Type
- @Data
- @JsonInclude(JsonInclude.Include.NON_NULL)
- @Document(indexName = "device_log", type = "log", shards = 3, replicas = 0, refreshInterval = "10s")
- public class DeviceLogPo {
- @Id
- private Long uuid;
- @Field(type = FieldType.Keyword)
- private String deviceId;
- @Field(type = FieldType.Integer)
- private Integer source;
- @Field(type = FieldType.Integer)
- private Integer operateUser;
- @Field(type = FieldType.Text)
- private String content;
- @Field(type = FieldType.Date)
- private Date createTime;
- @Field(type = FieldType.Date)
- private Date updateTime;
- @Field(type = FieldType.Keyword)
- private String topic;
- }
创建Repository对象类,用于进行增删改查操作
- public interface DeviceLogRepository extends ElasticsearchRepository<DeviceLogPo, Long> {
- }
创建Service类,结合具体业务实现
- @Service
- @Slf4j
- public class DeviceLogService extends ElasticSearchBaseService {
-
- @Autowired
- private SnowflakeUtil snowflakeUtil;
-
- @Autowired
- private DeviceLogRepository deviceLogRepository;
-
- /**
- *
- * @param deviceLogPo
- * @return
- */
- public int insert(DeviceLogPo deviceLogPo) {
- try {
- //不手动设置ID时,ES会自动生成id
- // long uuid = snowflakeUtil.getUUID();
- // deviceLogPo.setUuid(uuid);
- deviceLogRepository.save(deviceLogPo);
- return 1;
- } catch (Exception e) {
- log.error(null, e);
- return 0;
- }
- }
-
- public void bulkIndex(List<DeviceLogPo> deviceLogPoList) {
- try {
- int count = 0;
- if (elasticsearchTemplate.indexExists(DeviceLogPo.class)) {
- elasticsearchTemplate.createIndex(DeviceLogPo.class);
- }
- ArrayList<IndexQuery> queries = new ArrayList<>();
- for (DeviceLogPo deviceLogPo : deviceLogPoList) {
- IndexQuery indexQuery = new IndexQuery();
- indexQuery.setId(String.valueOf(snowflakeUtil.getUUID()));
- indexQuery.setObject(deviceLogPo);
- indexQuery.setIndexName("device_log");
- indexQuery.setType("log");
- queries.add(indexQuery);
- if (count % 500 == 0) {
- elasticsearchTemplate.bulkIndex(queries);
- queries.clear();
- }
- count++;
- }
- if (queries.size() > 0) {
- elasticsearchTemplate.bulkIndex(queries);
- }
- } catch (Exception e) {
- log.error(null, e);
- }
- }
- }
(三)Index Template配置
template需要在index创建直接配置好,这样才能生效。6.8.x的版本需要通过http请求的方式来创建template
- 设置index模板:
- put http://xxxxxx:9200/_template/template_device
- {
- "template": "device_*",
- "settings": {
- "index": {
- "refresh_interval": "10s", //每10秒刷新
- "number_of_shards": "3", //主分片数量
- "number_of_replicas": "0", //副本数量
- "translog": {
- "flush_threshold_size": "128mb", //内容容量到达1gb异步刷新
- "sync_interval": "30s", //间隔30s异步刷新(设置后无法更改)
- "durability": "async" //异步刷新
- }
- }
- }
- }
(四)写入性能测试
创建controller,统计写入耗时
- @Slf4j
- @RestController
- @RequestMapping(value = "device", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
- public class DeviceController extends BaseController {
- @Autowired
- private SnowflakeUtil snowflakeUtil;
- @Autowired
- protected DeviceLogService deviceLogService;
-
- /**
- *
- * @param deviceLogPo
- * @return
- */
- @PostMapping(value = "insertDeviceLog")
- public int insertDeviceLog(@RequestBody DeviceLogPo deviceLogPo) {
- long start = System.currentTimeMillis();
- ArrayList list = new ArrayList();
- for (int i = 0; i < 100000; i++) {
- DeviceLogPo temp = new DeviceLogPo();
- temp.setContent(deviceLogPo.getContent());
- temp.setOperateUser(deviceLogPo.getOperateUser());
- list.add(temp);
- }
- deviceLogService.bulkIndex(list);
- long end = System.currentTimeMillis();
- log.debug("cost:" + (end - start) + "ms=============================");
- return 0;
- }
- }
(五)总结
实际测试结果:
服务器配置:3台4核4GB服务器组成的ES集群
单条写入性能:40条/秒
bulk写入性能:4800条/秒的速度
ES的单条写入性能比较差,大数据量的写入需要结合bulk写入来实现。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。