当前位置:   article > 正文

mysql与es实时同步之数据库双写_es双写一致性

es双写一致性

mysql与es实时同步之数据库双写

简介

当在对mysql表进行insert,update时,同时使用es api对es数据库进行同步修改,保证mysql和es数据的一致性。

第一步-获取更新

新增/修改数据时将相关信息推送到kafka,具体可以看EsModel的字段

@Data
@Accessors(chain = true)
public class EsModel {

    //mysql表名
    String table;

    //mysql主键id,与es的id对应
    Long id;

    //mysql主键id列表,与es的id对应
    List<Long> ids;

    //es索引
    String esIndex;

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

然后发送消息

    private void sendKafka(String topic, String message) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
        future.addCallback(m -> log.info("kafka_send_success:{}", m), e -> log.error("kafka_send_error:{}", e.getMessage(), e));
    }
  • 1
  • 2
  • 3
  • 4

第二步-同步es

首先kafka消费者根据id和表名去mysql数据库取出修改或新增后的数据,为了可以从不同表取出数据:
首先要传递表名,使用${table}而不是#{table},因为后者会被当成字符串处理;
使用user这样的单个model接收数据肯定是不行的,这里使用了hashmap作为数据存储格式。

List<Map<String, Object>> selectData(@Param("table") String table, @Param("ids") List<Long> ids);

<select id="selectData" resultType="java.util.HashMap">
    select * from ${table}
    where id in
    <foreach item='item' collection='ids' open="(" separator="," close=")">#{item}</foreach>
</select>

application.yml配置文件也需修改
避免部分字段为null时不返回数据 
mybatis:
  configuration:
    call-setters-on-nulls: true
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

调用mapper获取mysql数据之后,然后调用es客户端向es发送请求,完成同步。
这里使用的Indexrequest,它可以兼容插入和更新两种操作,当es索引中没有传入id对应文档时候会插入,反之就会update。

    @KafkaListener(topics = "${spring.kafka.topic.elasticsearch}")
    @Transactional(rollbackFor = Exception.class)
    public void esManageListener(String message, Acknowledgment ack) {
        log.info("kafka_received: message={}", message);
        try{
            EsModel esModel = JSON.parseObject(message, EsModel.class);
            List<Map<String, Object>> mysqlData = esMapper.selectData(esModel.getTable(), esModel.getIds());
            BulkRequest bulkRequest = new BulkRequest();
            for (Map data : mysqlData){
                String dataJson = JSONObject.toJSONString(data, SerializerFeature.WriteMapNullValue);
                IndexRequest indexRequest = new IndexRequest()
                        .index(esModel.getEsIndex())
                        .id(String.valueOf(data.get("id")))
                        .type("_doc")
                        .source(dataJson, XContentType.JSON);
                bulkRequest.add(indexRequest);
            }
            BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
        }catch (Exception e){
            e.printStackTrace();
            // TODO: 2021/9/7  log
        }finally{
            ack.acknowledge();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/煮酒与君饮/article/detail/882757
推荐阅读
相关标签
  

闽ICP备14008679号