赞
踩
目录
在高并发的业务场景下,数据库大多数情况都是用户并发访问最薄弱的环节。所以,就需要使用redis做一个缓冲操作,让请求先访问到redis,而不是直接访问MySQL等数据库 !
好了,我们现在引入缓存的概念,那么访问路程变成了如下:
上面这个经典的读取缓存步骤一般没有什么问题,但是一旦涉及到数据更新:数据库和缓存更新,就容易出现缓存(Redis)和数据库间的数据一致性问题。
有以下这些不一致的场景:
当更新数据时,如更新某商品的库存,当前商品的库存是100,现在要更新为99,先更新数据库更改成99,然后更新缓存,发现更新缓存失败了,这意味着数据库存的是99,而缓存还是100,这导致数据库和缓存不一致
如果删除了缓存Redis记录,还没有来得及删除对应的数据库记录,另一个线程就来读取,发现缓存为空,则去数据库中读取数据写入缓存,此时缓存中为脏数据。
因为写和读是并发的,没法保证顺序,就会出现缓存和数据库的数据不一致的问题。
我们讨论三种更新策略:
先更新数据库,再更新缓存
这套方案,大家是普遍反对的。为什么呢?有如下两点原因。
原因一(线程安全角度) 同时有请求A和请求B进行更新操作,那么会出现
(1)、线程A更新了数据库
(2)、线程B更新了数据库
(3)、线程B更新了缓存
(4)、线程A更新了缓存
这就出现请求A更新缓存应该比请求B更新缓存早才对,但是因为网络等原因,B却比A更早更新了缓存。这就导致了脏数据,因此不考虑。
原因二(业务场景角度) 有如下两点:
(1)、如果你是一个写数据库场景比较多,而读数据场景比较少的业务需求,采用这种方案就会导致,数据压根还没读到,缓存就被频繁的更新,浪费性能。
(2)、如果你写入数据库的值,并不是直接写入缓存的,而是要经过一系列复杂的计算再写入缓存。那么,每次写入数据库后,都再次计算写入缓存的值,无疑是浪费性能的。显然,删除缓存更为适合。
先删除缓存,再更新数据库
该方案会导致不一致的原因是。同时有一个请求A进行更新操作,另一个请求B进行查询操作。那么会出现如下情形:
(1)、请求A进行写操作,删除缓存
(2)、请求B查询发现缓存不存在
(3)、请求B去数据库查询得到旧值
(4)、请求B将旧值写入缓存
(5)、请求A将新值写入数据库 上述情况就会导致不一致的情形出现。而且,如果不采用给缓存设置过期时间策略,该数据永远都是脏数据。
先更新数据库,再删除缓存
知名社交网站facebook也在论文《Scaling Memcache at Facebook》中提出,他们用的也是先更新数据库,再删缓存的策略。
脸书很牛B,但是代表他用这套方案就没问题吗?不是的!
假设这会有两个请求,一个请求A做查询操作,一个请求B做更新操作,那么会有如下情形产生
(1)请求A查询数据库,得一个旧值
(2)请求B将新值写入数据库
(3)请求B删除缓存
(4)请求A将查到的旧值写入缓存,如果发生上述情况,确实是会发生脏数据。
然而,发生这种情况的概率又有多少呢?
发生上述情况有一个先天性条件,就是步骤(2)的写数据库操作比步骤(1)的读数据库操作耗时更短,才有可能使得步骤(3)先于步骤(4)。
可是,大家想想,数据库的读操作的速度远快于写操作的(不然做读写分离干嘛,做读写分离的意义就是因为读操作比较快,耗资源少),因此步骤(2)耗时比步骤(1)更短,这一情形很难出现。
但是难出现不代表不出现!因为我们的数据库写跟表数据量无关,而读会随着数据量的增加而变慢!
但是从业务层面讲,对于海量数据一般都不会放入缓存,毕竟内存相对于机械硬盘都要贵很多!并且就算你不缺钱,当数据量达到1千万左右时,由于内存中不能存储如此大量数目的数据,频繁同磁盘进行数据交换(持久化),导致数据查询、存储性能的急剧下降,将导致服务不可用。当前还没有好的产品可以实现key-value保证数据完整性,千万级条数量级的,高效存储和查询支持产品。
总结
对于上述三种方案,我们抛弃第一种,但是第二种和第三种还有一个共同的毛病,那就是更新数据库和删除缓存不是一个原子性操作!所以引入了以下两种业界普遍使用的标准方案
2.1.1 基本流程图
2.1.2 伪代码
public void write(String key,Object data){ redis.delKey(key); db.updateData(data); Thread.sleep(1000); redis.delKey(key); } /*转化为中文描述就是 (1)先淘汰缓存 (2)再写数据库 (3)休眠1秒(根据具体的读操作业务时间来定) (4)再次淘汰缓存 这么做,可以将1秒内所造成的缓存脏数据,再次删除。 那么,这个1秒怎么确定的,具体该休眠多久呢? 针对上面的情形,读者应该自行评估自己的项目的读数据业务逻辑的耗时。然后写数据的休眠时间则在读数据业务逻辑的耗时基础上,加几百ms即可。这么做的目的,就是确保读请求结束,写请求可以删除读请求造成的缓存脏数据。 如果你用了mysql的读写分离架构怎么办? ok,在这种情况下,造成数据不一致的原因如下,还是两个请求,一个请求A进行更新操作,另一个请求B进行查询操作。 (1)请求A进行写操作,删除缓存 (2)请求A将数据写入数据库了, (3)请求B查询缓存发现,缓存没有值 (4)请求B去从库查询,这时,还没有完成主从同步,因此查询到的是旧值 (5)请求B将旧值写入缓存 (6)数据库完成主从同步,从库变为新值 (7)请求A将缓存中B写入的旧值数据删除 上述情形,如果休眠时间没有考虑数据同步时间消耗,那么第七步先于第五步执行了会造成数据不一致。还是使用双删延时策略。只是,睡眠时间修改为在主从同步的延时时间基础上,加几百ms。 采用这种同步淘汰策略,吞吐量降低怎么办? ok,那就将第二次删除作为异步的。自己起一个线程,异步删除。这样,写的请求就不用沉睡一段时间后了,再返回。这么做,加大吞吐量。 第二次删除,如果删除失败怎么办? 这是个非常好的问题,因为第二次删除失败,就会出现如下情形。还是有两个请求,一个请求A进行更新操作,另一个请求B进行查询操作,为了方便,假设是单库: (1)请求A进行写操作,删除缓存 (2)请求B查询发现缓存不存在 (3)请求B去数据库查询得到旧值 (4)请求B将旧值写入缓存 (5)请求A将新值写入数据库 (6)请求A试图去删除请求B写入的缓存值,结果失败了。 ok,这也就是说。如果第二次删除缓存失败,会再次出现缓存和数据库不一致的问题。咋办? 们需要提供一个保障重试的方案: 定时任务,这样会压力太大,并且一直阻塞会影响性能 消息队列,异步处理,可以让性能提升,但是对业务代码造成大量的侵入 最后的最后,这个延时的时间,你真的好把握吗?而且就算你能把握好,更新数据库和删除缓存的操作不是原子性的,你怎么解决掉? 1、你可以使用重试删除,但是这样做的结果是以牺牲响应时间为代价 2、另起一个异步线程处理,这又是以系统资源消耗作为代价并且凭空造成代码的复杂度
2.1.3 模拟代码
/* 延时双删 */ @RequestMapping("update2") public void update2() throws InterruptedException { //将redis中该缓存删除 redisTemplate.delete("20200101010101"); //这个位置另外一个读操作进程进来了 Store oldStore = storeMapper.getStore("20200101010101"); //写入数据库 Store newStore = new Store("20200101010101", 97); storeMapper.update(newStore); try { Thread.sleep(3000);//这个地方需要自己去评估项目的读数据业务逻辑的耗时,然后加几百ms,如果是主从同步,还应该加上同步时间 } catch (InterruptedException e) { e.printStackTrace(); } //读操作在这个位置进行数据缓存 redisTemplate.opsForValue().append(oldStore.getCode(),JSON.toJSONString(oldStore)); //再次删除缓存数据 //问题,如果该处删除失败,则缓存里面还是旧数据(脏数据) Boolean isDelete = false; while (!isDelete){ isDelete = redisTemplate.delete("20200101010101"); } System.out.println("缓存删除成功"); }
2.2.1 基本流程图
这个也是我们今天的主要讲解,也是业界用得最多的方案,该方案的核心在于使用队列对读写操作进行排队操作,保证了数据的最终一致性,当然,性能相对上一种要差,但是还是那句话,数据的准确性才是最重要的!
2.2.2 初识Canal
阿里Canal主要是听过伪装成mysql从节点来向主节点拉取binlog日志解析成消息推送到MQ消息队列。
Canal在双写一致性中所处的位置:
canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
mysql master收到dump请求,开始推送binary log给slave(也就是canal)
canal解析binary log对象(原始为byte流)
canal将解析后的对象数据推送给监听的消息中间件(实时主动推送,rabbitmq需要是在线状态)
1、需要部署一个阿里巴巴的Canal服务端,用于订阅Mysql binlog日志并推送到MQ消息队列
#下载解压canal server wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-1/canal.deployer-1.1.5-SNAPSHOT.tar.gz #下载canal部署包 mkdir canal tar -zxvf canal.deployer-1.1.1.tar.gz -C canal #解压 #编辑conf/canal.properties,修改MQ配置 canal.ip = 1 #canal服务器标识 canal.serverMode = rabbitmq # 指定rabbitmq canal.mq.servers = 192.168.223.128 ## 注意不要加端口号,不然会报IPV6错误。 canal.mq.vhost=canal #MQ虚拟机名称 canal.mq.exchange=exchange.trade #交换机名称,用于将消息发送到绑定的队列 canal.mq.username=guest #MQ登录账号,注意要有上面vhost的权限 canal.mq.password=guest #MQ密码 --------------------------------------------------------------------------------- #编辑conf/example/instance.properties实例配置,配置数据库信息 canal.instance.dbUsername=root canal.instance.dbPassword=root canal.instance.mysql.slaveId=1234 #不要与my.cnf中server_id相同,因为我要伪装为mysql的slave canal.instance.master.address=192.168.223.128:3306 ## 数据库地址 canal.instance.defaultDatabaseName=test ## 数据库名 canal.mq.topic=example # 路由键,需要跟MQ中交换机队列的绑定路由key保持一致
2、安装RabbitMQ
mkdir /usr/local/rabbitmq; cd /usr/local/rabbitmq; yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz tcp_wrappers; wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm; wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm; wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm; rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm; rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm; rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm; vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app #找到loopback_users 修改后台登录用户为[guest] #启动rabbitmq:service rabbitmq-server start #启动监控管理器:service rabbitmq-plugins enable rabbitmq_management #开启端口:firewall-cmd --zone=public --add-port=15672/tcp --permanent #重启防火墙:firewall-cmd --reload
更多精彩请移步《RabbitMQ工作模型及Java编程》
3、安装mysql并开启binlog
rpm -Uvh http://dev.mysql.com/get/mysql-community-release-el7-5.noarch.rpm #下载 yum -y install mysql-community-server #rpm安装 #编辑my.cnf配置文件,开启binlog vim /etc/my.cnf #增加以下配置 log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复(没有数据库主从不配也行) #加入开机启动 systemctl enable mysqld #启动mysql服务进程 systemctl start mysqld #初始化,执行命令,重置密码 mysql_secure_installation #会依次出现以下问题。 Set root password? [Y/n] 是否设置root用户的密码 (y后【设置登录密码】) Remove anonymous users? [Y/n] 是否删除匿名用户 (y) Disallow root login remotely? [Y/n] 是否禁止root远程登录 (n) Remove test database and access to it? [Y/n] 是否删除test数据库(y) Reload privilege tables now? [Y/n] 是否重新加载授权信息 (y) # 先进入mysql mysql -u root -p # 授权(root用户)远程连接权限(不建议) GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY '远程登录密码' WITH GRANT OPTION; FLUSH PRIVILEGES; # 使用单独的远程登录用户(推荐) GRANT ALL PRIVILEGES ON *.* TO '新用户名'@'%' IDENTIFIED BY '远程登录密码' WITH GRANT OPTION; FLUSH PRIVILEGES; #查看是否已经开启了binlog日志 #登录mysql后输入如下命令: show variables like '%log_bin%'; | Variable_name | Value | +---------------------------------+--------------------------------+ | log_bin | ON | | log_bin_basename | /var/lib/mysql/mysql-bin | | log_bin_index | /var/lib/mysql/mysql-bin.index | | log_bin_trust_function_creators | OFF | | log_bin_use_v1_row_events | OFF | | sql_log_bin | ON #查看binlog日志: #1、查看第一个binlog文件的内容 mysql> show binlog events; +------------------+-----+-------------+-----------+-------------+---------------------------------------+ | Log_name | Pos | Event_type | Server_id | End_log_pos | Info | +------------------+-----+-------------+-----------+-------------+---------------------------------------+ | mysql-bin.000001 | 4 | Format_desc | 1 | 120 | Server ver: 5.6.49-log, Binlog ver: 4 | | mysql-bin.000001 | 120 | Query | 1 | 192 | BEGIN | | mysql-bin.000001 | 192 | Table_map | 1 | 249 | table_id: 70 (test.goods_store) | | mysql-bin.000001 | 249 | Delete_rows | 1 | 294 | table_id: 70 flags: STMT_END_F | | mysql-bin.000001 | 294 | Xid | 1 | 325 | COMMIT /* xid=11 */ | +------------------+-----+-------------+-----------+-------------+---------------------------------------+
分别启动mysql,rabbitmq,canal
- service mysql start #启动mysql
- service rabbitmq-server start #启动rabbitmq
- canal目录/bin/startup.sh #启动canal服务
问题:
- {"identity":{"slaveId":-1,"sourceAddress":{"address":"ydt1","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000026","position":551,"serverId":1,"timestamp":1594283137000}}
- 2020-07-31 17:27:24.973 [destination = example , address = /192.168.223.128:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000026,position=551,serverId=1,gtid=,timestamp=1594283137000] cost : 617ms , the next step is binlog dump
- 2020-07-31 17:27:25.106 [destination = example , address = /192.168.223.128:3306 , EventParser] ERROR c.a.o.canal.parse.inbound.mysql.dbsync.DirectLogFetcher - I/O error while reading from client socket
-
- #如果出现了以上问题,可能是mysql数据库的binlog日志位置不对,重新设置一下
- #先找出当前mysql的binlog日志position,进入mysql客户端,输入如下命令:
- show master status;
- #找到当前binlog以及position,编辑canal目录/conf/example/meta.dat元数据脚本
- vim /usr/local/canal/conf/example/meta.dat
- 将----》"journalName":"mysql-bin.000003","position":499改为自己查到的或者比查到的小即可
- #或者直接将该文件删除,重新生成当前数据库执行命令的position位置对应的元数据脚本
-
3.2.1 pom.xml
- <dependency>
- <groupId>org.mybatis.spring.boot</groupId>
- <artifactId>mybatis-spring-boot-starter</artifactId>
- <version>2.1.2</version>
- </dependency>
-
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>8.0.16</version>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-data-redis</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>1.2.3</version>
- </dependency>
3.2.2 application.yml
- spring:
- rabbitmq:
- virtual-host: canal
- host: 192.168.223.128
- publisher-confirms: true
- #数据源
- datasource:
- url: jdbc:mysql://192.168.223.128:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=UTC
- username: root
- password: root
- driver-class-name: com.mysql.jdbc.Driver
- redis:
- host: 192.168.223.128
-
3.2.3 消息队列配置类
package com.ydt.test.message; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class DirectRabbitConfig { //队列 起名:exchange.trade.canal @Bean public Queue TestDirectQueue() { return new Queue("exchange.trade.canal",true); } //Direct交换机 起名:exchange.trade @Bean DirectExchange TestDirectExchange() { return new DirectExchange("exchange.trade"); } //绑定 将队列和交换机绑定, 并设置用于匹配键:example @Bean Binding bindingDirect() { return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("example"); } }
3.2.4 消息监听者类
package com.ydt.test.message; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import java.util.Map; @Component public class DirectReceiver { @Autowired private StringRedisTemplate redisTemplate; @RabbitListener(queues = "exchange.trade.canal") public void process(Message message){ String json = new String(message.getBody()); System.out.println("消费的消息:" + json); Map map = JSON.parseObject(json,Map.class); JSONArray array = null; String sqlType = (String) map.get("type"); if(StringUtils.endsWithIgnoreCase("SELECT",sqlType)){ array = JSONArray.parseArray((String)map.get("data")); }else{ array = (JSONArray)map.get("data"); } if(array == null){ return; } JSONObject object = array.getJSONObject(0); /* if(StringUtils.endsWithIgnoreCase("UPDATE",sqlType) || StringUtils.endsWithIgnoreCase("INSERT",sqlType) || StringUtils.endsWithIgnoreCase("SELECT",sqlType)){ redisTemplate.boundValueOps(object.get("code").toString()).set(object.toString()); }else if(StringUtils.endsWithIgnoreCase("DELETE",sqlType)){ redisTemplate.delete(object.get("code").toString()); }*/ if(StringUtils.endsWithIgnoreCase("SELECT",sqlType)){ redisTemplate.boundValueOps(object.get("code").toString()).set(object.toString()); }else{ redisTemplate.delete(object.get("code").toString()); } } }
3.2.5 Controller调用
package com.ydt.test.controller; import com.alibaba.fastjson.JSON; import com.ydt.test.domain.Store; import com.ydt.test.mapper.StoreMapper; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.util.StringUtils; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.HashMap; import java.util.Map; @RestController public class MessageController { @Autowired private StoreMapper storeMapper; @Autowired private RabbitTemplate rabbitTemplate; @Autowired private RedisTemplate redisTemplate; /** * 案例1 */ @RequestMapping("update1") public void update1(){ Store store = new Store("20200101010101", 98); storeMapper.update(store); int i = 1/0; redisTemplate.delete("20200101010101"); } @RequestMapping("get") public String getMessage(){ //查询操作 Store store = storeMapper.getStore("20200101010101"); System.out.println("-----------我进行了查询,现在我要开始进行redis缓存了-----------"); //同一数据源 Map map = new HashMap(); map.put("type", "SELECT"); map.put("data", "[{'code':'20200101010101','store':"+store.getStore()+"}]"); rabbitTemplate.convertAndSend("exchange.trade", "example", JSON.toJSONString(map)); return ""; } }
3.2.6 启动测试
1、先将库存表中库存修改为111,会通过canal伪slave拿到binlog日志,然后推送到rabbitmq
2、然后调用get方法,拿到数据库中数据,同时将数据推送到rabbitmq
3、重复1操作,将库存改为321
4、重复2操作
打开监听,启动服务可以看到我们的消费者会按照顺序消费队列中的数据!
- DirectReceiver消费者收到消息 : {"data":[{"code":"20200101010101","store":"111"}],"database":"test","es":1596198062000,"id":13,"isDdl":false,"mysqlType":{"code":"varchar(255)","store":"int(11)"},"old":[{"store":"123"}],"pkNames":["code"],"sql":"","sqlType":{"code":12,"store":4},"table":"goods_store","ts":1596198062527,"type":"UPDATE"}
- DirectReceiver消费者收到消息 : {"data":"[{'code':'20200101010101','store':111}]","type":"SELECT"}
- DirectReceiver消费者收到消息 : {"data":[{"code":"20200101010101","store":"321"}],"database":"test","es":1596198075000,"id":14,"isDdl":false,"mysqlType":{"code":"varchar(255)","store":"int(11)"},"old":[{"store":"111"}],"pkNames":["code"],"sql":"","sqlType":{"code":12,"store":4},"table":"goods_store","ts":1596198075520,"type":"UPDATE"}
- DirectReceiver消费者收到消息 : {"data":"[{'code':'20200101010101','store':321}]","type":"SELECT"}
-
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。