当前位置:   article > 正文

Redis缓存与数据库双写一致性解决方案_缓存双写一致性 真实解决方案

缓存双写一致性 真实解决方案

目录

 

1、冤孽的诞生

1.1 需求起因

1.2 策略之争

2、标准解决方案

2.1 延时双删策略

2.2 异步更新缓存(基于订阅binlog的同步机制)

3 、基于binlog订阅实现步骤

3.1 准备材料

3.2 代码实现


1、冤孽的诞生

1.1 需求起因

在高并发的业务场景下,数据库大多数情况都是用户并发访问最薄弱的环节。所以,就需要使用redis做一个缓冲操作,让请求先访问到redis,而不是直接访问MySQL等数据库 !

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1eGlhbmcxOTg1MTExNA==,size_16,color_FFFFFF,t_70

好了,我们现在引入缓存的概念,那么访问路程变成了如下:

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1eGlhbmcxOTg1MTExNA==,size_16,color_FFFFFF,t_70

上面这个经典的读取缓存步骤一般没有什么问题,但是一旦涉及到数据更新:数据库和缓存更新,就容易出现缓存(Redis)和数据库间的数据一致性问题

有以下这些不一致的场景:

  • 当更新数据时,如更新某商品的库存,当前商品的库存是100,现在要更新为99,先更新数据库更改成99,然后更新缓存,发现更新缓存失败了,这意味着数据库存的是99,而缓存还是100,这导致数据库和缓存不一致

  • 如果删除了缓存Redis记录,还没有来得及删除对应的数据库记录,另一个线程就来读取,发现缓存为空,则去数据库中读取数据写入缓存,此时缓存中为脏数据。

因为写和读是并发的,没法保证顺序,就会出现缓存和数据库的数据不一致的问题。

1.2 策略之争

我们讨论三种更新策略:

  • 先更新数据库,再更新缓存

    这套方案,大家是普遍反对的。为什么呢?有如下两点原因。

    原因一(线程安全角度) 同时有请求A和请求B进行更新操作,那么会出现

    (1)、线程A更新了数据库

    (2)、线程B更新了数据库

    (3)、线程B更新了缓存

    (4)、线程A更新了缓存

    这就出现请求A更新缓存应该比请求B更新缓存早才对,但是因为网络等原因,B却比A更早更新了缓存。这就导致了脏数据,因此不考虑。

    原因二(业务场景角度) 有如下两点:

    (1)、如果你是一个写数据库场景比较多,而读数据场景比较少的业务需求,采用这种方案就会导致,数据压根还没读到,缓存就被频繁的更新,浪费性能。

    (2)、如果你写入数据库的值,并不是直接写入缓存的,而是要经过一系列复杂的计算再写入缓存。那么,每次写入数据库后,都再次计算写入缓存的值,无疑是浪费性能的。显然,删除缓存更为适合。

  • 先删除缓存,再更新数据库

    该方案会导致不一致的原因是。同时有一个请求A进行更新操作,另一个请求B进行查询操作。那么会出现如下情形:

    (1)、请求A进行写操作,删除缓存

    (2)、请求B查询发现缓存不存在

    (3)、请求B去数据库查询得到旧值

    (4)、请求B将旧值写入缓存

    (5)、请求A将新值写入数据库 上述情况就会导致不一致的情形出现。而且,如果不采用给缓存设置过期时间策略,该数据永远都是脏数据。

  • 先更新数据库,再删除缓存

    知名社交网站facebook也在论文《Scaling Memcache at Facebook》中提出,他们用的也是先更新数据库,再删缓存的策略。

    脸书很牛B,但是代表他用这套方案就没问题吗?不是的!

    假设这会有两个请求,一个请求A做查询操作,一个请求B做更新操作,那么会有如下情形产生

    (1)请求A查询数据库,得一个旧值

    (2)请求B将新值写入数据库

    (3)请求B删除缓存

    (4)请求A将查到的旧值写入缓存,如果发生上述情况,确实是会发生脏数据。

    然而,发生这种情况的概率又有多少呢?

    发生上述情况有一个先天性条件,就是步骤(2)的写数据库操作比步骤(1)的读数据库操作耗时更短,才有可能使得步骤(3)先于步骤(4)。

    可是,大家想想,数据库的读操作的速度远快于写操作的(不然做读写分离干嘛,做读写分离的意义就是因为读操作比较快,耗资源少),因此步骤(2)耗时比步骤(1)更短,这一情形很难出现。

    但是难出现不代表不出现!因为我们的数据库写跟表数据量无关,而读会随着数据量的增加而变慢!

    但是从业务层面讲,对于海量数据一般都不会放入缓存,毕竟内存相对于机械硬盘都要贵很多!并且就算你不缺钱,当数据量达到1千万左右时,由于内存中不能存储如此大量数目的数据,频繁同磁盘进行数据交换(持久化),导致数据查询、存储性能的急剧下降,将导致服务不可用。当前还没有好的产品可以实现key-value保证数据完整性,千万级条数量级的,高效存储和查询支持产品。

  • 总结

    对于上述三种方案,我们抛弃第一种,但是第二种和第三种还有一个共同的毛病,那就是更新数据库和删除缓存不是一个原子性操作!所以引入了以下两种业界普遍使用的标准方案

2、标准解决方案

2.1 延时双删策略

2.1.1 基本流程图

 

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1eGlhbmcxOTg1MTExNA==,size_16,color_FFFFFF,t_70

2.1.2 伪代码

  1. public void write(String key,Object data){
  2.        redis.delKey(key);
  3.        db.updateData(data);
  4.        Thread.sleep(1000);
  5.        redis.delKey(key);
  6.   }
  7. /*转化为中文描述就是
  8. (1)先淘汰缓存
  9. (2)再写数据库
  10. (3)休眠1秒(根据具体的读操作业务时间来定)
  11. (4)再次淘汰缓存
  12. 这么做,可以将1秒内所造成的缓存脏数据,再次删除。
  13. 那么,这个1秒怎么确定的,具体该休眠多久呢?
  14. 针对上面的情形,读者应该自行评估自己的项目的读数据业务逻辑的耗时。然后写数据的休眠时间则在读数据业务逻辑的耗时基础上,加几百ms即可。这么做的目的,就是确保读请求结束,写请求可以删除读请求造成的缓存脏数据。
  15. 如果你用了mysql的读写分离架构怎么办?
  16. ok,在这种情况下,造成数据不一致的原因如下,还是两个请求,一个请求A进行更新操作,另一个请求B进行查询操作。
  17. (1)请求A进行写操作,删除缓存
  18. (2)请求A将数据写入数据库了,
  19. (3)请求B查询缓存发现,缓存没有值
  20. (4)请求B去从库查询,这时,还没有完成主从同步,因此查询到的是旧值
  21. (5)请求B将旧值写入缓存
  22. (6)数据库完成主从同步,从库变为新值
  23. (7)请求A将缓存中B写入的旧值数据删除
  24. 上述情形,如果休眠时间没有考虑数据同步时间消耗,那么第七步先于第五步执行了会造成数据不一致。还是使用双删延时策略。只是,睡眠时间修改为在主从同步的延时时间基础上,加几百ms。
  25. 采用这种同步淘汰策略,吞吐量降低怎么办?
  26. ok,那就将第二次删除作为异步的。自己起一个线程,异步删除。这样,写的请求就不用沉睡一段时间后了,再返回。这么做,加大吞吐量。
  27. 第二次删除,如果删除失败怎么办?
  28. 这是个非常好的问题,因为第二次删除失败,就会出现如下情形。还是有两个请求,一个请求A进行更新操作,另一个请求B进行查询操作,为了方便,假设是单库:
  29. (1)请求A进行写操作,删除缓存
  30. (2)请求B查询发现缓存不存在
  31. (3)请求B去数据库查询得到旧值
  32. (4)请求B将旧值写入缓存
  33. (5)请求A将新值写入数据库
  34. (6)请求A试图去删除请求B写入的缓存值,结果失败了。
  35. ok,这也就是说。如果第二次删除缓存失败,会再次出现缓存和数据库不一致的问题。咋办?
  36. 们需要提供一个保障重试的方案:
  37. 定时任务,这样会压力太大,并且一直阻塞会影响性能
  38. 消息队列,异步处理,可以让性能提升,但是对业务代码造成大量的侵入
  39. 最后的最后,这个延时的时间,你真的好把握吗?而且就算你能把握好,更新数据库和删除缓存的操作不是原子性的,你怎么解决掉?
  40. 1、你可以使用重试删除,但是这样做的结果是以牺牲响应时间为代价
  41. 2、另起一个异步线程处理,这又是以系统资源消耗作为代价并且凭空造成代码的复杂度

2.1.3 模拟代码

  1. /*
  2.        延时双删
  3.     */
  4.    @RequestMapping("update2")
  5.    public void update2() throws InterruptedException {
  6.        //将redis中该缓存删除
  7.        redisTemplate.delete("20200101010101");
  8.        //这个位置另外一个读操作进程进来了
  9.        Store oldStore = storeMapper.getStore("20200101010101");
  10.        //写入数据库
  11.        Store newStore = new Store("20200101010101", 97);
  12.        storeMapper.update(newStore);
  13.    try {
  14.            Thread.sleep(3000);//这个地方需要自己去评估项目的读数据业务逻辑的耗时,然后加几百ms,如果是主从同步,还应该加上同步时间
  15.       } catch (InterruptedException e) {
  16.            e.printStackTrace();
  17.       }
  18.        //读操作在这个位置进行数据缓存
  19.        redisTemplate.opsForValue().append(oldStore.getCode(),JSON.toJSONString(oldStore));
  20.        //再次删除缓存数据
  21.        //问题,如果该处删除失败,则缓存里面还是旧数据(脏数据)
  22.        Boolean isDelete = false;
  23.        while (!isDelete){
  24.            isDelete = redisTemplate.delete("20200101010101");
  25.       }
  26.        System.out.println("缓存删除成功");
  27.   }

 

2.2 异步更新缓存(基于订阅binlog的同步机制)

2.2.1 基本流程图

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1eGlhbmcxOTg1MTExNA==,size_16,color_FFFFFF,t_70

这个也是我们今天的主要讲解,也是业界用得最多的方案,该方案的核心在于使用队列对读写操作进行排队操作,保证了数据的最终一致性,当然,性能相对上一种要差,但是还是那句话,数据的准确性才是最重要的!

2.2.2 初识Canal

阿里Canal主要是听过伪装成mysql从节点来向主节点拉取binlog日志解析成消息推送到MQ消息队列

Canal在双写一致性中所处的位置:

watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1eGlhbmcxOTg1MTExNA==,size_16,color_FFFFFF,t_70

  • canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议

  • mysql master收到dump请求,开始推送binary log给slave(也就是canal)

  • canal解析binary log对象(原始为byte流)

  • canal将解析后的对象数据推送给监听的消息中间件(实时主动推送,rabbitmq需要是在线状态)

 

3 、基于binlog订阅实现步骤

3.1 准备材料

1、需要部署一个阿里巴巴的Canal服务端,用于订阅Mysql binlog日志并推送到MQ消息队列

  1. #下载解压canal server
  2. wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-1/canal.deployer-1.1.5-SNAPSHOT.tar.gz  #下载canal部署包
  3. mkdir canal
  4. tar -zxvf canal.deployer-1.1.1.tar.gz -C canal #解压
  5. #编辑conf/canal.properties,修改MQ配置
  6. canal.ip = 1 #canal服务器标识
  7. canal.serverMode = rabbitmq # 指定rabbitmq
  8. canal.mq.servers = 192.168.223.128 ## 注意不要加端口号,不然会报IPV6错误。
  9. canal.mq.vhost=canal  #MQ虚拟机名称
  10. canal.mq.exchange=exchange.trade #交换机名称,用于将消息发送到绑定的队列
  11. canal.mq.username=guest #MQ登录账号,注意要有上面vhost的权限
  12. canal.mq.password=guest #MQ密码
  13. ---------------------------------------------------------------------------------
  14.    
  15. #编辑conf/example/instance.properties实例配置,配置数据库信息
  16. canal.instance.dbUsername=root
  17. canal.instance.dbPassword=root
  18. canal.instance.mysql.slaveId=1234 #不要与my.cnf中server_id相同,因为我要伪装为mysql的slave
  19. canal.instance.master.address=192.168.223.128:3306 ## 数据库地址
  20. canal.instance.defaultDatabaseName=test ## 数据库名
  21. canal.mq.topic=example # 路由键,需要跟MQ中交换机队列的绑定路由key保持一致

2、安装RabbitMQ

  1. mkdir /usr/local/rabbitmq;
  2. cd /usr/local/rabbitmq;
  3. yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz tcp_wrappers;
  4. wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm;
  5. wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm;
  6. wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm;
  7. rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm;
  8. rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm;
  9. rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm;
  10. vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
  11. #找到loopback_users 修改后台登录用户为[guest]
  12. #启动rabbitmq:service rabbitmq-server start
  13. #启动监控管理器:service rabbitmq-plugins enable rabbitmq_management
  14. #开启端口:firewall-cmd --zone=public --add-port=15672/tcp --permanent
  15. #重启防火墙:firewall-cmd --reload

更多精彩请移步《RabbitMQ工作模型及Java编程》

3、安装mysql并开启binlog

  1. rpm -Uvh http://dev.mysql.com/get/mysql-community-release-el7-5.noarch.rpm #下载
  2. yum -y install mysql-community-server #rpm安装
  3. #编辑my.cnf配置文件,开启binlog
  4. vim /etc/my.cnf
  5. #增加以下配置
  6. log-bin=mysql-bin # 开启 binlog
  7. binlog-format=ROW # 选择 ROW 模式
  8. server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复(没有数据库主从不配也行)
  9. #加入开机启动
  10. systemctl enable mysqld
  11. #启动mysql服务进程
  12. systemctl start mysqld
  13. #初始化,执行命令,重置密码
  14. mysql_secure_installation
  15. #会依次出现以下问题。
  16. Set root password? [Y/n]
  17.   是否设置root用户的密码 (y后【设置登录密码】)
  18.   Remove anonymous users? [Y/n]
  19.   是否删除匿名用户 (y)
  20.   Disallow root login remotely? [Y/n]
  21.   是否禁止root远程登录 (n)
  22.   Remove test database and access to it? [Y/n]
  23.   是否删除test数据库(y)
  24.   Reload privilege tables now? [Y/n]
  25.   是否重新加载授权信息 (y)
  26. # 先进入mysql
  27. mysql -u root -p
  28. # 授权(root用户)远程连接权限(不建议)
  29. GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY '远程登录密码' WITH GRANT OPTION;
  30. FLUSH PRIVILEGES;
  31. # 使用单独的远程登录用户(推荐)
  32. GRANT ALL PRIVILEGES ON *.* TO '新用户名'@'%' IDENTIFIED BY '远程登录密码' WITH GRANT OPTION;
  33. FLUSH PRIVILEGES;
  34. #查看是否已经开启了binlog日志
  35. #登录mysql后输入如下命令:
  36. show variables like '%log_bin%';
  37. | Variable_name                   | Value                         |
  38. +---------------------------------+--------------------------------+
  39. | log_bin                         | ON                             |
  40. | log_bin_basename               | /var/lib/mysql/mysql-bin       |
  41. | log_bin_index                   | /var/lib/mysql/mysql-bin.index |
  42. | log_bin_trust_function_creators | OFF                           |
  43. | log_bin_use_v1_row_events       | OFF                           |
  44. | sql_log_bin                     | ON  
  45. #查看binlog日志:
  46. #1、查看第一个binlog文件的内容
  47. mysql> show binlog events;
  48. +------------------+-----+-------------+-----------+-------------+---------------------------------------+
  49. | Log_name         | Pos | Event_type | Server_id | End_log_pos | Info                                 |
  50. +------------------+-----+-------------+-----------+-------------+---------------------------------------+
  51. | mysql-bin.000001 |   4 | Format_desc |         1 |         120 | Server ver: 5.6.49-log, Binlog ver: 4 |
  52. | mysql-bin.000001 | 120 | Query       |         1 |         192 | BEGIN                                 |
  53. | mysql-bin.000001 | 192 | Table_map   |         1 |         249 | table_id: 70 (test.goods_store)       |
  54. | mysql-bin.000001 | 249 | Delete_rows |         1 |         294 | table_id: 70 flags: STMT_END_F       |
  55. | mysql-bin.000001 | 294 | Xid         |         1 |         325 | COMMIT /* xid=11 */                   |
  56. +------------------+-----+-------------+-----------+-------------+---------------------------------------+

分别启动mysql,rabbitmq,canal

  1. service mysql start #启动mysql
  2. service rabbitmq-server start #启动rabbitmq
  3. canal目录/bin/startup.sh #启动canal服务

问题:

  1. {"identity":{"slaveId":-1,"sourceAddress":{"address":"ydt1","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000026","position":551,"serverId":1,"timestamp":1594283137000}}
  2. 2020-07-31 17:27:24.973 [destination = example , address = /192.168.223.128:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000026,position=551,serverId=1,gtid=,timestamp=1594283137000] cost : 617ms , the next step is binlog dump
  3. 2020-07-31 17:27:25.106 [destination = example , address = /192.168.223.128:3306 , EventParser] ERROR c.a.o.canal.parse.inbound.mysql.dbsync.DirectLogFetcher - I/O error while reading from client socket
  4. #如果出现了以上问题,可能是mysql数据库的binlog日志位置不对,重新设置一下
  5. #先找出当前mysql的binlog日志position,进入mysql客户端,输入如下命令:
  6. show master status;
  7. #找到当前binlog以及position,编辑canal目录/conf/example/meta.dat元数据脚本
  8. vim /usr/local/canal/conf/example/meta.dat
  9. 将----》"journalName":"mysql-bin.000003","position":499改为自己查到的或者比查到的小即可
  10. #或者直接将该文件删除,重新生成当前数据库执行命令的position位置对应的元数据脚本

3.2 代码实现

3.2.1 pom.xml

  1. <dependency>
  2.            <groupId>org.mybatis.spring.boot</groupId>
  3.            <artifactId>mybatis-spring-boot-starter</artifactId>
  4.            <version>2.1.2</version>
  5.        </dependency>
  6.        <dependency>
  7.            <groupId>mysql</groupId>
  8.            <artifactId>mysql-connector-java</artifactId>
  9.            <version>8.0.16</version>
  10.        </dependency>
  11.        <dependency>
  12.            <groupId>org.springframework.boot</groupId>
  13.            <artifactId>spring-boot-starter-web</artifactId>
  14.        </dependency>
  15.        <dependency>
  16.            <groupId>org.springframework.boot</groupId>
  17.            <artifactId>spring-boot-starter-data-redis</artifactId>
  18.        </dependency>
  19.        <dependency>
  20.            <groupId>org.springframework.boot</groupId>
  21.            <artifactId>spring-boot-starter-amqp</artifactId>
  22.        </dependency>
  23.        <dependency>
  24.            <groupId>com.alibaba</groupId>
  25.            <artifactId>fastjson</artifactId>
  26.            <version>1.2.3</version>
  27.        </dependency>

3.2.2 application.yml

  1. spring:
  2. rabbitmq:
  3.   virtual-host: canal
  4.   host: 192.168.223.128
  5.   publisher-confirms: true
  6.  #数据源
  7. datasource:
  8.   url: jdbc:mysql://192.168.223.128:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=UTC
  9.   username: root
  10.   password: root
  11.   driver-class-name: com.mysql.jdbc.Driver
  12. redis:
  13.   host: 192.168.223.128

 

3.2.3 消息队列配置类

  1. package com.ydt.test.message;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.DirectExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. @Configuration
  9. public class DirectRabbitConfig {
  10.    //队列 起名:exchange.trade.canal
  11.    @Bean
  12.    public Queue TestDirectQueue() {
  13.        return new Queue("exchange.trade.canal",true);
  14.   }
  15.    //Direct交换机 起名:exchange.trade
  16.    @Bean
  17.    DirectExchange TestDirectExchange() {
  18.        return new DirectExchange("exchange.trade");
  19.   }
  20.    //绑定 将队列和交换机绑定, 并设置用于匹配键:example
  21.    @Bean
  22.    Binding bindingDirect() {
  23.        return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("example");
  24.   }
  25. }

3.2.4 消息监听者类

  1. package com.ydt.test.message;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONArray;
  4. import com.alibaba.fastjson.JSONObject;
  5. import org.springframework.amqp.core.Message;
  6. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.data.redis.core.RedisTemplate;
  9. import org.springframework.data.redis.core.StringRedisTemplate;
  10. import org.springframework.stereotype.Component;
  11. import org.springframework.util.StringUtils;
  12. import java.util.Map;
  13. @Component
  14. public class DirectReceiver {
  15.    @Autowired
  16.    private StringRedisTemplate redisTemplate;
  17.    @RabbitListener(queues = "exchange.trade.canal")
  18.    public void process(Message message){
  19.        String json = new String(message.getBody());
  20.        System.out.println("消费的消息:" + json);
  21.        Map map = JSON.parseObject(json,Map.class);
  22.        JSONArray array = null;
  23.        String sqlType = (String) map.get("type");
  24.        if(StringUtils.endsWithIgnoreCase("SELECT",sqlType)){
  25.            array = JSONArray.parseArray((String)map.get("data"));
  26.       }else{
  27.            array = (JSONArray)map.get("data");
  28.       }
  29.        if(array == null){
  30.            return;
  31.       }
  32.        JSONObject object = array.getJSONObject(0);
  33.       /* if(StringUtils.endsWithIgnoreCase("UPDATE",sqlType)
  34.                || StringUtils.endsWithIgnoreCase("INSERT",sqlType)
  35.                || StringUtils.endsWithIgnoreCase("SELECT",sqlType)){
  36.            redisTemplate.boundValueOps(object.get("code").toString()).set(object.toString());
  37.        }else if(StringUtils.endsWithIgnoreCase("DELETE",sqlType)){
  38.            redisTemplate.delete(object.get("code").toString());
  39.        }*/
  40.        if(StringUtils.endsWithIgnoreCase("SELECT",sqlType)){
  41.            redisTemplate.boundValueOps(object.get("code").toString()).set(object.toString());
  42.       }else{
  43.            redisTemplate.delete(object.get("code").toString());
  44.       }
  45.   }
  46. }

 

3.2.5 Controller调用

  1. package com.ydt.test.controller;
  2. import com.alibaba.fastjson.JSON;
  3. import com.ydt.test.domain.Store;
  4. import com.ydt.test.mapper.StoreMapper;
  5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.data.redis.core.RedisTemplate;
  8. import org.springframework.util.StringUtils;
  9. import org.springframework.web.bind.annotation.RequestMapping;
  10. import org.springframework.web.bind.annotation.RestController;
  11. import java.util.HashMap;
  12. import java.util.Map;
  13. @RestController
  14. public class MessageController {
  15.    @Autowired
  16.    private StoreMapper storeMapper;
  17.    @Autowired
  18.    private RabbitTemplate rabbitTemplate;
  19.    @Autowired
  20.    private RedisTemplate redisTemplate;
  21.    /**
  22.     * 案例1
  23.     */
  24.    @RequestMapping("update1")
  25.    public void update1(){
  26.        Store store = new Store("20200101010101", 98);
  27.        storeMapper.update(store);
  28.        int i = 1/0;
  29.        redisTemplate.delete("20200101010101");
  30.   }
  31.    @RequestMapping("get")
  32.    public String getMessage(){
  33.        //查询操作
  34.        Store store = storeMapper.getStore("20200101010101");
  35.        System.out.println("-----------我进行了查询,现在我要开始进行redis缓存了-----------");
  36.        //同一数据源
  37.        Map map = new HashMap();
  38.        map.put("type", "SELECT");
  39.        map.put("data", "[{'code':'20200101010101','store':"+store.getStore()+"}]");
  40.        rabbitTemplate.convertAndSend("exchange.trade", "example", JSON.toJSONString(map));
  41.        return  "";
  42.   }
  43. }

3.2.6 启动测试

1、先将库存表中库存修改为111,会通过canal伪slave拿到binlog日志,然后推送到rabbitmq

2、然后调用get方法,拿到数据库中数据,同时将数据推送到rabbitmq

3、重复1操作,将库存改为321

4、重复2操作

打开监听,启动服务可以看到我们的消费者会按照顺序消费队列中的数据!

  1. DirectReceiver消费者收到消息 : {"data":[{"code":"20200101010101","store":"111"}],"database":"test","es":1596198062000,"id":13,"isDdl":false,"mysqlType":{"code":"varchar(255)","store":"int(11)"},"old":[{"store":"123"}],"pkNames":["code"],"sql":"","sqlType":{"code":12,"store":4},"table":"goods_store","ts":1596198062527,"type":"UPDATE"}
  2. DirectReceiver消费者收到消息 : {"data":"[{'code':'20200101010101','store':111}]","type":"SELECT"}
  3. DirectReceiver消费者收到消息 : {"data":[{"code":"20200101010101","store":"321"}],"database":"test","es":1596198075000,"id":14,"isDdl":false,"mysqlType":{"code":"varchar(255)","store":"int(11)"},"old":[{"store":"111"}],"pkNames":["code"],"sql":"","sqlType":{"code":12,"store":4},"table":"goods_store","ts":1596198075520,"type":"UPDATE"}
  4. DirectReceiver消费者收到消息 : {"data":"[{'code':'20200101010101','store':321}]","type":"SELECT"}

 

 

 

 

 

 

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/721937
推荐阅读
相关标签
  

闽ICP备14008679号