赞
踩
当在对mysql表进行insert,update时,同时使用es api对es数据库进行同步修改,保证mysql和es数据的一致性。
新增/修改数据时将相关信息推送到kafka,具体可以看EsModel的字段
@Data @Accessors(chain = true) public class EsModel { //mysql表名 String table; //mysql主键id,与es的id对应 Long id; //mysql主键id列表,与es的id对应 List<Long> ids; //es索引 String esIndex; }
然后发送消息
private void sendKafka(String topic, String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
future.addCallback(m -> log.info("kafka_send_success:{}", m), e -> log.error("kafka_send_error:{}", e.getMessage(), e));
}
首先kafka消费者根据id和表名去mysql数据库取出修改或新增后的数据,为了可以从不同表取出数据:
首先要传递表名,使用${table}而不是#{table},因为后者会被当成字符串处理;
使用user这样的单个model接收数据肯定是不行的,这里使用了hashmap作为数据存储格式。
List<Map<String, Object>> selectData(@Param("table") String table, @Param("ids") List<Long> ids);
<select id="selectData" resultType="java.util.HashMap">
select * from ${table}
where id in
<foreach item='item' collection='ids' open="(" separator="," close=")">#{item}</foreach>
</select>
application.yml配置文件也需修改
避免部分字段为null时不返回数据
mybatis:
configuration:
call-setters-on-nulls: true
调用mapper获取mysql数据之后,然后调用es客户端向es发送请求,完成同步。
这里使用的Indexrequest,它可以兼容插入和更新两种操作,当es索引中没有传入id对应文档时候会插入,反之就会update。
@KafkaListener(topics = "${spring.kafka.topic.elasticsearch}") @Transactional(rollbackFor = Exception.class) public void esManageListener(String message, Acknowledgment ack) { log.info("kafka_received: message={}", message); try{ EsModel esModel = JSON.parseObject(message, EsModel.class); List<Map<String, Object>> mysqlData = esMapper.selectData(esModel.getTable(), esModel.getIds()); BulkRequest bulkRequest = new BulkRequest(); for (Map data : mysqlData){ String dataJson = JSONObject.toJSONString(data, SerializerFeature.WriteMapNullValue); IndexRequest indexRequest = new IndexRequest() .index(esModel.getEsIndex()) .id(String.valueOf(data.get("id"))) .type("_doc") .source(dataJson, XContentType.JSON); bulkRequest.add(indexRequest); } BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT); }catch (Exception e){ e.printStackTrace(); // TODO: 2021/9/7 log }finally{ ack.acknowledge(); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。