当前位置:   article > 正文

缓存同步之 RabbitMQ 和 Canal_canal通知

canal通知

缓存同步

大多数情况下,浏览器查询到的都是缓存数据,如果缓存数据与数据库数据存在较大差异,可能会产生比较严重的后果。所以我们必须保证数据库数据、缓存数据的一致性,这就是缓存与数据库的同步。

数据同步策略

设置有效期:给缓存设置有效期,到期后自动删除。再次查询时更新

  • 优势:简单、方便
  • 缺点:时效性差,缓存过期之前可能不一致
  • 场景:更新频率较低,时效性要求低的业务

同步双写:在修改数据库的同时,直接修改缓存

  • 优势:时效性强,缓存与数据库强一致
  • 缺点:有代码侵入,耦合度高;
  • 场景:对一致性、时效性要求较高的缓存数据

异步通知:修改数据库时发送事件通知,相关服务监听到通知后修改缓存数据

  • 优势:低耦合,可以同时通知多个缓存服务
  • 缺点:时效性一般,可能存在中间不一致状态
  • 场景:时效性要求一般,有多个服务需要同步

大多情况下采用异步通知,而异步实现又可以基于MQ或者Canal来实现。

基于MQ的异步通知

RabbitMQ

SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。

SpringAmqp的官方地址:https://spring.io/projects/spring-amqp

整体流程:

  • 服务完成对数据的修改后,只需要发送一条消息到MQ中。
  • 缓存服务监听MQ消息,然后完成对缓存的更新。
  • 依然有少量的代码侵入

在这里插入图片描述

案例代码

引入依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 1
  • 2
  • 3
  • 4
编写配置
spring:
  rabbitmq:
    host: 192.168.150.101 # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: XXX # 用户名
    password: XXX # 密码
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
声明队列交换机名称
/**
 * 声明队列交换机名称
 */
public class MqConstants {

    /**
     * 交换机
     */
    public final static String ITEM_EXCHANGE = "item.topic";

    /**
     * 监听新增和修改的队列
     */
    public final static String ITEM_INSERT_AND_UPDATE_QUEUE = "item.insert.update.queue";
    /**
     * 监听删除的队列
     */
    public final static String ITEM_DELETE_QUEUE = "item.delete.queue";

    /**
     * 新增或修改的 RoutingKey
     */
    public final static String ITEM_INSERT_AND_UPDATE_KEY = "item.insert.update";
    /**
     * 删除的 RoutingKey
     */
    public final static String ITEM_DELETE_KEY = "item.delete";
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
配置队列交换机
@Configuration
public class MqConfig {
    /**
     * 配置交换机
     * @return
     */
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange(ITEM_EXCHANGE, true, false);
    }
    /**
     * 配置新增和修改的队列
     * @return
     */
    @Bean
    public Queue insertQueue(){
        return new Queue(ITEM_INSERT_AND_UPDATE_QUEUE, true);
    }
    /**
     * 配置删除的队列
     * @return
     */
    @Bean
    public Queue deleteQueue(){
        return new Queue(ITEM_DELETE_QUEUE, true);
    }
    /**
     * 绑定 新增和修改的队列和对应的 RoutingKey
     * @return
     */
    @Bean
    public Binding insertQueueBinding(){
        return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(ITEM_INSERT_AND_UPDATE_KEY);
    }
    /**
     * 绑定 删除的队列和对应的 RoutingKey
     * @return
     */
    @Bean
    public Binding deleteQueueBinding(){
        return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(ITEM_DELETE_KEY);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
发送MQ消息

在service中的增、删、改业务中分别发送MQ消息:

@RestController
@RequestMapping("/item")
public class ItemController {

    @Autowired
    private IItemService itemService;

    @Autowired
    private RabbitTemplate rabbitTemplate;

	//新增
    @PostMapping
    public void saveItem(@RequestBody Item item){
        itemService.save(item);
        rabbitTemplate.convertAndSend(ITEM_EXCHANGE,ITEM_INSERT_AND_UPDATE_KEY,item.getId());
    }
	//修改
    @PutMapping
    public void updateItem(@RequestBody Item item) {
        itemService.updateById(item);
        rabbitTemplate.convertAndSend(ITEM_EXCHANGE,ITEM_INSERT_AND_UPDATE_KEY,item.getId());
    }

	//删除
    @DeleteMapping("/{id}")
    public void deleteById(@PathVariable("id") Long id){
        itemService.removeById(id);
        rabbitTemplate.convertAndSend(ITEM_EXCHANGE,ITEM_DELETE_KEY,id);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
监听MQ消息
@Component
public class ItemListener {
    
    @Autowired
    private ItemServiceImpl itemService;
    @Autowired
    private StringRedisTemplate redisTemplate;

    /**
     * 监听新增或修改的业务
     * @param id 
     */
    @RabbitListener(queues = ITEM_INSERT_AND_UPDATE_QUEUE)
    public void listenInsertOrUpdate(Long id){
        //查询数据库
        Item item = itemService.getById(id);
        //新增redis缓存,对于修改,直接新增覆盖(key一致)redis缓存即可
        redisTemplate
                .opsForValue()
                .set("item:id:"+id, JSONUtil.toJsonStr(item), Duration.ofMinutes(30));
    }

    /**
     * 监听删除的业务
     * @param id 
     */
    @RabbitListener(queues = ITEM_DELETE_QUEUE)
    public void listenDelete(Long id){
        //删除redis缓存
        redisTemplate.delete("item:id:"+id);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

基于Canal的通知

认识Canal

Canal [kə’næl],译意为水道/管道/沟渠,canal是阿里巴巴旗下的一款开源项目,基于Java开发,基于数据库增量日志解析,提供增量数据订阅&消费。

GitHub官方的地址:https://github.com/alibaba/canal

Canal是基于mysql的主从同步来实现的,Canal就是把自己伪装成MySQL的一个slave节点,从而监听masterbinary log变化。再把得到的变化信息通知给Canal的客户端,进而完成对其它数据库的同步。
在这里插入图片描述

整体流程:

  • 服务完成修改后,业务直接结束,没有任何代码侵入。
  • Canal监听MySQL变化,当发现变化后,立即通知缓存服务。
  • 缓存服务接收到canal通知,更新缓存。
  • 代码零侵入
    在这里插入图片描述

实例代码

引入依赖
<dependency>
    <groupId>top.javatool</groupId>
    <artifactId>canal-spring-boot-starter</artifactId>
    <version>1.2.1-RELEASE</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
编写配置
canal:
  destination: canal # canal的集群名字,要与安装canal时设置的名称一致
  server: 192.168.150.101:11111 # canal服务地址,更换ip
  • 1
  • 2
  • 3
修改实体类

通过@Id、@Column、@Transient等注解完成实体类与数据库表字段的映射:

  • @Id :主键ID
  • @Column:属性名与数据库表中字段不一致
  • @Transient:数据库表中不存在字段
@Data
@TableName("tb_item")
public class Item {
    @TableId(type = IdType.AUTO)
    @Id
    private Long id;//商品id
    @Column(name = "name")
    private String name;//商品名称
    private String title;//商品标题
    private Long price;//价格(分)
    private String image;//商品图片
    private String category;//分类名称
    private String brand;//品牌名称
    private String spec;//规格
    private Integer status;//商品状态 1-正常,2-下架
    private Date createTime;//创建时间
    private Date updateTime;//更新时间
    @TableField(exist = false)
    @Transient
    private Integer sold;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
编写监听器

通过实现EntryHandler<T>接口编写监听器,监听Canal消息。注意两点:

  • 实现类通过@CanalTable("XXX")指定监听的表信息
  • EntryHandler的泛型是与表对应的实体类
@CanalTable("tb_item")
@Component
public class ItemHandler implements EntryHandler<Item> {

    @Autowired
    private StringRedisTemplate redisTemplate;

    /**
     * 监听新增方法
     * @param item
     */
    @Override
    public void insert(Item item) {
        //新增redis缓存
        redisTemplate
                .opsForValue()
                .set("item:id:"+item.getId(), JSONUtil.toJsonStr(item), Duration.ofMinutes(30));
    }

    /**
     * 监听修改方法
     * @param before
     * @param after
     */
    @Override
    public void update(Item before, Item after) {
        //对于修改,直接新增覆盖(key一致)redis缓存即可
        redisTemplate
                .opsForValue()
                .set("item:id:"+after.getId(), JSONUtil.toJsonStr(after), Duration.ofMinutes(30));
    }

    /**
     * 监听删除方法
     * @param item
     */
    @Override
    public void delete(Item item) {
        //删除redis缓存
        redisTemplate.delete("item:id:"+item.getId());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/445305
推荐阅读
相关标签
  

闽ICP备14008679号