当前位置:   article > 正文

缓存同步策略

缓存同步策略

缓存同步策略

缓存数据同步的常见方式有三种:

设置有效期

给缓存设置有效期,到期后自动删除。再次查询时更新。
- 优点:简单,方便。
- 缺点:时效性差,缓存过期之前可能不一致。
- 场景:更新频率较低,时效性要求低的业务。

同步双写

在修改数据库的同时,直接修改缓存。
- 优点:时效性强,缓存与数据库强一致。
- 缺点:有代码侵入,耦合度高。
- 场景:对一致性,时效性要求较高的缓存数据。

异步通知

修改数据库时发送事件通知,相关服务监听到通知后修改缓存数据。
- 优点:低耦合,可以同时通知多个缓存服务。
- 缺点:时效性一般,可能存在中间不一致状态。
- 状态:时效性一般,有多个服务需要同步。

基于Canal的异步通知

在这里插入图片描述

Canal

Canal: 阿里巴巴旗下的一款开源项目,基于Java开发。基于数据库增量日志解析,提供增量数据订阅和消费。

Canal是基于MySQL主从同步来实现的,MySQL主从同步的原理如下:

  • Mysql master 将数据变更写入二进制日志(binary log),其中记录的数据叫做binary log events
  • MySql slave将master中的binary log日志拷贝到它的中继日志(relay log)
  • MySql slave重放relay log中事件,将数据变更,反映它自己的数据
    在这里插入图片描述
    Canal就是把自己伪装成MySQL的一个slave从节点,从而监听master的binary log变化。在把得到的变化信息通知给Canal的客户端,进而完成对其他数据库的同步。
    在这里插入图片描述

操作

开启MySQL主从

Canal是基于MySQL的主从同步功能,因此必须先开启MySQL的主从功能才可以。

开启binlog

修改my.cnf文件
如果修改不起作用则修改/etc/mysql/mysql.conf.d/mysqld.cnf 文件

# 设置binary log文件的存放地址和文件名,叫做mysql-bin
log-bin=/var/lib/mysql/mysql-bin
# 指定对哪个database记录binary log events,这里记录item这个库
binlog-do-db=item
  • 1
  • 2
  • 3
  • 4
设置用户权限
create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%' identified by 'canal';
FLUSH PRIVILEGES;
  • 1
  • 2
  • 3

重启mysql容器即可

docker restart mysql
  • 1

测试设置是否成功:在mysql控制台,或者Navicat中,输入命令:

mysql> show master status;
+------------------+----------+--------------+------------------+-------------------+
| File             | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+------------------+----------+--------------+------------------+-------------------+
| mysql-bin.000001 |      154 | item         |                  |                   |
+------------------+----------+--------------+------------------+-------------------+
1 row in set (0.00 sec)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

安装Canal

我们需要创建一个网络,将MySQL、Canal、MQ放到同一个Docker网络中:

docker network create binlog-network
  • 1

让mysql加入这个网络:

docker network connect binlog-network mysql
  • 1

下载canal

docker pull canal/canal-server:v1.1.5
  • 1

创建canal容器:

docker run -p 11111:11111 --name canal \
-e canal.destinations=item\
-e canal.instance.master.address=mysql5.7:3306  \
-e canal.instance.dbUsername=canal  \
-e canal.instance.dbPassword=canal  \
-e canal.instance.connectionCharset=UTF-8 \
-e canal.instance.tsdb.enable=true \
-e canal.instance.gtidon=false  \
-e canal.instance.filter.regex=item\\..* \
--network binlog-network \
-d canal/canal-server:v1.1.5
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

说明:

- `-p 11111:11111`:这是canal的默认监听端口
- `-e canal.instance.master.address=mysql:3306`:数据库地址和端口,如果不知道mysql容器地址,可以通过`docker inspect 容器id`来查看
- `-e canal.instance.dbUsername=canal`:数据库用户名
- `-e canal.instance.dbPassword=canal` :数据库密码
- `-e canal.instance.filter.regex=`:要监听的表名称
  • 1
  • 2
  • 3
  • 4
  • 5

表名称监听支持的语法:

mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\) 
常见例子:
1.  所有表:.*   or  .*\\..*
2.  canal schema下所有表: canal\\..*
3.  canal下的以canal打头的表:canal\\.canal.*
4.  canal schema下的一张表:canal.test1
5.  多个规则组合使用然后以逗号隔开:canal\\..*,mysql.test1,mysql.test2 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

查看启动结果

[root@localhost home]# docker logs -f canal 
DOCKER_DEPLOY_TYPE=VM
==> INIT /alidata/init/02init-sshd.sh
==> EXIT CODE: 0
==> INIT /alidata/init/fix-hosts.py
==> EXIT CODE: 0
==> INIT DEFAULT
Generating SSH1 RSA host key: [  OK  ]
Starting sshd: [  OK  ]
Starting crond: [  OK  ]
==> INIT DONE
==> RUN /home/admin/app.sh
==> START ...
start canal ...
start canal successful
==> START SUCCESSFUL ...

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

查看canal连接数据库状态:

[root@f1c7640d4486 admin]# tail  canal-server/logs/item/item.log 
2021-12-05 11:07:26.012 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-item 
2021-12-05 11:07:26.067 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^item\..*$
2021-12-05 11:07:26.067 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
2021-12-05 11:07:26.083 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2021-12-05 11:07:26.431 [destination = item , address = mysql5.7/172.18.0.2:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2021-12-05 11:07:26.431 [destination = item , address = mysql5.7/172.18.0.2:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status
2021-12-05 11:07:27.510 [destination = item , address = mysql5.7/172.18.0.2:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000001,position=4,serverId=1000,gtid=<null>,timestamp=1638669831000] cost : 1055ms , the next step is binlog dump
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

java 代码中引入

增加依赖:

<dependency>
    <groupId>top.javatool</groupId>
    <artifactId>canal-spring-boot-starter</artifactId>
    <version>1.2.1-RELEASE</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

application.yml中增加canal的连接配置

canal:
  destination: item
  server: 192.168.25.129:11111
  • 1
  • 2
  • 3

redis的Handler

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class RedisHandler implements InitializingBean {

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Autowired
    private IItemService itemService;
    @Autowired
    private IItemStockService stockService;

    private static final ObjectMapper MAPPER = new ObjectMapper();

    @Override
    public void afterPropertiesSet() throws Exception {
        // 初始化缓存
        // 1.查询商品信息
        List<Item> itemList = itemService.list();
        // 2.放入缓存
        for (Item item : itemList) {
            // 2.1.item序列化为JSON
            String json = MAPPER.writeValueAsString(item);
            // 2.2.存入redis
            redisTemplate.opsForValue().set("item:id:" + item.getId(), json);
        }

        // 3.查询商品库存信息
        List<ItemStock> stockList = stockService.list();
        // 4.放入缓存
        for (ItemStock stock : stockList) {
            // 2.1.item序列化为JSON
            String json = MAPPER.writeValueAsString(stock);
            // 2.2.存入redis
            redisTemplate.opsForValue().set("item:stock:id:" + stock.getId(), json);
        }
    }

    public void saveItem(Item item) {
        try {
            String json = MAPPER.writeValueAsString(item);
            redisTemplate.opsForValue().set("item:id:" + item.getId(), json);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    public void deleteItemById(Long id) {
        redisTemplate.delete("item:id:" + id);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60

增加canal的Handler

import com.github.benmanes.caffeine.cache.Cache;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;

@CanalTable("tb_item")
@Component
public class ItemHandler implements EntryHandler<Item> {

    @Autowired
    private RedisHandler redisHandler;
    @Autowired
    private Cache<Long, Item> itemCache;

    @Override
    public void insert(Item item) {
        // 写数据到JVM进程缓存
        itemCache.put(item.getId(), item);
        // 写数据到redis
        redisHandler.saveItem(item);
    }

    @Override
    public void update(Item before, Item after) {
        // 写数据到JVM进程缓存
        itemCache.put(after.getId(), after);
        // 写数据到redis
        redisHandler.saveItem(after);
    }

    @Override
    public void delete(Item item) {
        // 删除数据到JVM进程缓存
        itemCache.invalidate(item.getId());
        // 删除数据到redis
        redisHandler.deleteItemById(item.getId());
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

在这里插入图片描述
视频资料点击:https://www.bilibili.com/video/BV1LQ4y127n4?p=157

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/581316
推荐阅读
相关标签
  

闽ICP备14008679号