当前位置:   article > 正文

HM-SpringCloud微服务系列11.4【缓存同步】

springcloud 先更新数据库再更新缓存代码

大多数情况下,浏览器查询到的都是缓存数据,如果缓存数据与数据库数据存在较大差异,可能会产生比较严重的后果。

所以我们必须保证数据库数据、缓存数据的一致性,这就是缓存与数据库的同步。

1. 数据同步策略

1.1 缓存数据同步常见方式

缓存数据同步的常见方式有三种:

设置有效期:给缓存设置有效期,到期后自动删除。再次查询时更新

  • 优势:简单、方便
  • 缺点:时效性差,缓存过期之前可能不一致
  • 场景:更新频率较低,时效性要求低的业务

同步双写:在修改数据库的同时,直接修改缓存

  • 优势:时效性强,缓存与数据库强一致
  • 缺点:有代码侵入,耦合度高;
  • 场景:对一致性、时效性要求较高的缓存数据

异步通知:修改数据库时发送事件通知,相关服务监听到通知后修改缓存数据

  • 优势:低耦合,可以同时通知多个缓存服务
  • 缺点:时效性一般,可能存在中间不一致状态
  • 场景:时效性要求一般,有多个服务需要同步

1.2 异步通知缓存同步实现方案

而异步实现又可以基于MQ或者Canal来实现:

  1. 基于MQ的异步通知

image-20220506100113411

解读:

  • 商品服务完成对数据的修改后,只需要发送一条消息到MQ中。
  • 缓存服务监听MQ消息,然后完成对缓存的更新。
  • 依然有少量的代码侵入
  1. 基于Canal的通知

image-20220506100329430

解读:

  • 商品服务完成商品修改后,业务直接结束,没有任何代码侵入
  • Canal监听MySQL变化,当发现变化后,立即通知缓存服务
  • 缓存服务接收到canal通知,更新缓存
  • 代码零侵入

2. 安装Canal

2.1 认识Canal

Canal [kə'næl],译意为水道/管道/沟渠,canal是阿里巴巴旗下的一款开源项目,基于Java开发。

基于数据库增量日志解析,提供增量数据订阅&消费。

GitHub的地址:https://github.com/alibaba/canal

Canal是基于MySQL的主从同步来实现的

2.1.1 MySQL主从同步原理

image-20220506100658252

  • 1)MySQL master 将数据变更写入二进制日志( binary log),其中记录的数据叫做binary log events
  • 2)MySQL slave 将 master 的 binary log events拷贝到它的中继日志(relay log)
  • 3)MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

2.1.2 Canal实现原理

Canal就是把自己伪装成MySQL的一个slave节点,从而监听master的binary log变化。

再把得到的变化信息通知给Canal的客户端,进而完成对其它数据库的同步。

image-20220506100814201

2.2 安装和配置Canal

Canal是基于MySQL的主从同步功能,因此必须先开启MySQL的主从功能才可以。

这里以之前用Docker运行的mysql为例进行操作:

image-20220506101447676

2.2.1 开启MySQL主从

一、开启binlog

打开mysql容器挂载的日志文件,在/tmp/mysql/conf目录:

image-20220506101732935

修改文件:

vi /tmp/mysql/conf/my.cnf

添加内容:

  1. log-bin=/var/lib/mysql/mysql-bin
  2. binlog-do-db=heima

配置解读:

  • log-bin=/var/lib/mysql/mysql-bin:设置binary log文件的存放地址和文件名,叫做mysql-bin
  • binlog-do-db=heima:指定对哪个database记录binary log events,这里记录heima这个库
  • image-20220506101926070

最终完整my.cnf文件:

  1. [mysqld]
  2. skip-name-resolve
  3. character_set_server=utf8
  4. datadir=/var/lib/mysql
  5. server-id=1000
  6. log-bin=/var/lib/mysql/mysql-bin
  7. binlog-do-db=heima

image-20220506102113336

docker重启mysql服务:

docker restart mysql

image-20220506102246206

重启完成后查看一下mysql的data目录:生成了binary log文件

image-20220506102437799

二、设置用户权限

需要给从库设置权限

添加一个仅用于数据同步的账户,出于安全考虑,这里仅提供对heima这个库的操作权限

  1. create user canal@'%' IDENTIFIED by 'canal';
  2. GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%' identified by 'canal';
  3. FLUSH PRIVILEGES;

image-20220506102910498

image-20220506103210664

image-20220506103238325

重启mysql容器

docker restart mysql

image-20220506103327360

测试设置是否成功:在mysql控制台,或者Navicat中,输入命令并运行:

show master status;

image-20220506103448040

PS:由于刚刚又重启了一次mysql,所以增加了新的binary log文件:000002

2.2.2 安装Canal

一、创建Docker网络

需要创建一个网络,将MySQL、Canal放到同一个Docker网络中:

docker network create heima

让mysql加入这个网络:

docker network connect heima mysql

image-20220506103841706

二、安装Canal

课前资料中提供了canal的镜像压缩包,本地上传到远程虚拟机:

image-20220506104432017

通过以下命令导入镜像到docker:

docker load -i canal.tar

image-20220506104722193

然后运行命令创建Canal容器:

  1. docker run -p 11111:11111 --name canal \
  2. -e canal.destinations=heima \
  3. -e canal.instance.master.address=mysql:3306 \
  4. -e canal.instance.dbUsername=canal \
  5. -e canal.instance.dbPassword=canal \
  6. -e canal.instance.connectionCharset=UTF-8 \
  7. -e canal.instance.tsdb.enable=true \
  8. -e canal.instance.gtidon=false \
  9. -e canal.instance.filter.regex=heima\\..* \
  10. --network heima \
  11. -d canal/canal-server:v1.1.5

说明:

  • -p 11111:11111:这是canal的默认监听端口,端口映射
  • -e canal.destinations=heima:canal集群名称
  • -e canal.instance.master.address=mysql:3306:数据库地址和端口,如果不知道mysql容器地址,可以通过docker inspect 容器id来查看
  • -e canal.instance.dbUsername=canal:数据库用户名
  • -e canal.instance.dbPassword=canal :数据库密码
  • -e canal.instance.filter.regex=:要监听的表名称

表名称监听支持的语法:

  1. mysql 数据解析关注的表,Perl正则表达式.
  2. 多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)
  3. 常见例子:
  4. 1. 所有表:.* or .*\\..*
  5. 2. canal schema下所有表: canal\\..*
  6. 3. canal下的以canal打头的表:canal\\.canal.*
  7. 4. canal schema下的一张表:canal.test1
  8. 5. 多个规则组合使用然后以逗号隔开:canal\\..*,mysql.test1,mysql.test2

image-20220506105236860

通过以下命令动态查看一下Canal的运行日志(ctrl+c退出):

docker logs -f canal

image-20220506105448052

Canal启动成功后,如何知道Cancl是否已与MySql建立连接?

首先,可以通过以下命令进行Canal容器内部:

docker exec -it canal bash

image-20220506105837801

image-20220506110759080

image-20220506110834351

其次,查看一下Canal的运行日志:

tail -f canal-server/logs/canal/canal.log

image-20220506111004421

可以看到Canal运行正常

再次,查看一下"数据库"heima的日志:

tail -f canal-server/logs/heima/heima.log

image-20220506111321604

可以看到Canal先尝试连接mysql,连接成功后会去找binlog二进制文件(此处是000002),找到后开始做主从同步

证明Canal安装成功了

最后退出Canal容器:

exit

image-20220506111654987

3. 监听Canal

3.1 客户端

Canal提供了各种语言的客户端,当Canal监听到binlog变化时,会通知Canal的客户端。

image-20220506111815352

我们可以利用Canal提供的Java客户端,监听Canal通知消息。当收到变化的消息时,完成对缓存的更新。

不过Canal默认提供的官方Java客户端是比较麻烦的

所以这里我们会使用GitHub上的第三方开源的canal-starter客户端

地址:https://github.com/NormanGyllenhaal/canal-client

canal-starter与SpringBoot完美整合,自动装配,比官方客户端要简单好用很多

3.2 实现监听

3.2.1 引入依赖

  1. <!--canal客户端-->
  2. <dependency>
  3. <groupId>top.javatool</groupId>
  4. <artifactId>canal-spring-boot-starter</artifactId>
  5. <version>1.2.1-RELEASE</version>
  6. </dependency>

image-20220506112529693

3.2.2 编写配置

  1. canal:
  2. destination: heima # canal集群名/实例名,要与安装canal时设置的名称一致(跟canal-server运行时设置的destination一致)
  3. server: 10.193.193.141:11111 # canal服务地址

image-20220506112557894

3.2.3 修改Item实体类

image-20220506112851953

通过@Id、@Column、等注解完成Item与数据库表字段的映射:

  1. package com.heima.item.pojo;
  2. import com.baomidou.mybatisplus.annotation.IdType;
  3. import com.baomidou.mybatisplus.annotation.TableField;
  4. import com.baomidou.mybatisplus.annotation.TableId;
  5. import com.baomidou.mybatisplus.annotation.TableName;
  6. import lombok.Data;
  7. import org.springframework.data.annotation.Id;
  8. import org.springframework.data.annotation.Transient;
  9. import java.util.Date;
  10. @Data
  11. @TableName("tb_item")
  12. public class Item {
  13. @TableId(type = IdType.AUTO)
  14. @Id
  15. private Long id;//商品id
  16. private String name;//商品名称
  17. private String title;//商品标题
  18. private Long price;//价格(分)
  19. private String image;//商品图片
  20. private String category;//分类名称
  21. private String brand;//品牌名称
  22. private String spec;//规格
  23. private Integer status;//商品状态 1-正常,2-下架
  24. private Date createTime;//创建时间
  25. private Date updateTime;//更新时间
  26. @TableField(exist = false)
  27. @Transient
  28. private Integer stock;
  29. @TableField(exist = false)
  30. @Transient
  31. private Integer sold;
  32. }

image-20220506150024834

3.2.4 编写监听器

image-20220506112744159

通过实现EntryHandler<T>接口编写监听器,监听Canal消息。注意两点:

  • 实现类通过@CanalTable("tb_item")指定监听的表信息
  • EntryHandler的泛型是与表对应的实体类

image-20220506150643429

image-20220506150724079

  1. package com.heima.item.config;
  2. import com.github.benmanes.caffeine.cache.Cache;
  3. import com.heima.item.pojo.Item;
  4. import org.checkerframework.checker.units.qual.A;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.stereotype.Component;
  7. import top.javatool.canal.client.annotation.CanalTable;
  8. import top.javatool.canal.client.handler.EntryHandler;
  9. /**
  10. * 每当数据库发生增删改,会将对应的数据传递给以下三个方法
  11. */
  12. @CanalTable("tb_item")
  13. @Component
  14. public class ItemHandler implements EntryHandler<Item> {
  15. @Autowired
  16. private RedisHandler redisHandler;
  17. @Autowired
  18. private Cache<Long, Item> itemCache;
  19. @Override
  20. public void insert(Item item) {
  21. // PS:本地缓存效率更高,建议放到redis缓存前处理
  22. // 写数据到本地JVM进程缓存
  23. itemCache.put(item.getId(), item);
  24. // 写数据到redis缓存
  25. redisHandler.saveItem(item);
  26. }
  27. @Override
  28. public void update(Item before, Item after) {
  29. // 更新本地JVM进程缓存数据
  30. itemCache.put(after.getId(), after);
  31. // 更新redis缓存数据
  32. redisHandler.saveItem(after);
  33. }
  34. @Override
  35. public void delete(Item item) {
  36. // 删除本地JVM进程缓存数据
  37. itemCache.invalidate(item.getId());
  38. // 删除redis缓存数据
  39. redisHandler.deleteItemById(item.getId());
  40. }
  41. }

注意:上面这里对Redis的操作都封装到了RedisHandler这个对象中,用到时直接调用即可

是之前做缓存预热时编写的一个配置类,内容如下:

image-20220506165137703

  1. package com.heima.item.config;
  2. import com.fasterxml.jackson.core.JsonProcessingException;
  3. import com.fasterxml.jackson.databind.ObjectMapper;
  4. import com.heima.item.pojo.Item;
  5. import com.heima.item.pojo.ItemStock;
  6. import com.heima.item.service.IItemService;
  7. import com.heima.item.service.IItemStockService;
  8. import org.springframework.beans.factory.InitializingBean;
  9. import org.springframework.beans.factory.annotation.Autowired;
  10. import org.springframework.context.annotation.Bean;
  11. import org.springframework.data.redis.core.RedisTemplate;
  12. import org.springframework.data.redis.serializer.RedisSerializer;
  13. import org.springframework.data.redis.serializer.StringRedisSerializer;
  14. import org.springframework.stereotype.Component;
  15. import javax.annotation.Resource;
  16. import java.util.List;
  17. /**
  18. * 实现redis缓存预热
  19. * 项目启动那一刻,就会创建RedisHandler这个Bean,并注入redisTemplate对象,然后执行afterPropertiesSet()
  20. */
  21. @Component
  22. public class RedisHandler implements InitializingBean {
  23. @Autowired
  24. private RedisTemplate redisTemplate;
  25. @Autowired
  26. private IItemService itemService;
  27. @Autowired
  28. private IItemStockService stockService;
  29. /**
  30. * from jackson
  31. * spring里一个默认的json处理工具ObjectMapper
  32. * 静态常量,工具
  33. */
  34. private static final ObjectMapper MAPPER = new ObjectMapper();
  35. /**
  36. * 初始化缓存
  37. * @throws Exception
  38. */
  39. @Override
  40. public void afterPropertiesSet() throws Exception {
  41. // 1. 查询商品信息(此处理应仅查询热点数据,实际因为此次演示数据不多,所以全查全放到缓存中)
  42. List<Item> itemList = itemService.list();
  43. // 2. 放入缓存
  44. for (Item item : itemList) {
  45. // 2.1 item序列化为json
  46. String json = MAPPER.writeValueAsString(item);
  47. // 2.2 存入redis
  48. redisTemplate.opsForValue().set("item:id:"+item.getId(), json); //key=前缀"item:id:"+id,value=json
  49. }
  50. // 3. 查询商品库存信息(同上)
  51. List<ItemStock> stockList = stockService.list();
  52. // 4. 放入缓存
  53. for (ItemStock stock : stockList) {
  54. // 4.1 stock序列化为json
  55. String json = MAPPER.writeValueAsString(stock);
  56. // 4.2 存入redis
  57. redisTemplate.opsForValue().set("item:stock:id:"+stock.getId(), json);
  58. }
  59. }
  60. public void saveItem(Item item) {
  61. try {
  62. String json = MAPPER.writeValueAsString(item);
  63. redisTemplate.opsForValue().set("item:id:" + item.getId(), json);
  64. } catch (JsonProcessingException e) {
  65. throw new RuntimeException(e);
  66. }
  67. }
  68. public void deleteItemById(Long id) {
  69. redisTemplate.delete("item:id:" + id);
  70. }
  71. }

3.2.5 测试

重启ItemApplication和ItemApplication2这两个微服务:

image-20220506165951114

image-20220506165936065

从控制台中可以看到在服务启动成功后会一直收到消息,证明java代码已经与Canal建立起连接了

访问http://localhost:8081/item/10001

image-20220506170417745

初始项目中老师已经给准备好了管理数据操作界面,可以直接使用

image-20220506170558792

访问http://localhost:8081

image-20220506170757750

修改一下商品信息

image-20220506170856292

image-20220506170919592

查看一下idea控制台日志,发现刚才操作数据库时,有日志,说明数据修改了

image-20220506171100206

image-20220506171126698

image-20220506171152177

然后,再来访问http://localhost:8081/item/10001,查看数据是否已发生变化

image-20220506171656227

最后,再通过RDM来看一下redis缓存数据

image-20220506171749409

至此实现了:当数据库数据发生变化时,缓存都会跟着一起变

4. 多级缓存架构总结

image-20220506173433403


  1. 红框是本节11.4实现的内容

image-20220506173015693

  1. 蓝框时本次课程未实现的内容(可以参考绿框实现)

image-20220506173138915

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/85678
推荐阅读
相关标签
  

闽ICP备14008679号