当前位置:   article > 正文

基于python的MySQL和redis数据同步实现(redis做缓存)_python redis 映射mysql

python redis 映射mysql

一、背景原理

1、MySQL数据库

MySQL是一种关系型数据库,主要用于存放持久化数据,将数据存储在硬盘中,读取速度较慢。每次请求访问数据库时,都存在着I/O操作,如果反复频繁的访问数据库:会在反复链接数据库上花费大量时间,从而导致运行效率过慢;反复的访问数据库也会导致数据库的负载过高。所以,针对MySQL的缺点,衍生出了缓存的概念。

2、redis数据库

redis是一款非关系型数据库,是一种缓存数据库,数据存放在内存中,用于存储使用频繁的数据,这样减少访问数据库的次数,提高运行效率。所以redis数据库读取速度比较快,运行效率高。

3、二者区别与联系

(1) 类型:MySQL是关系型数据库,redis是缓存数据库;

(2) 作用:MySQL用于持久化的存储数据到硬盘,功能强大,速度较慢,基于磁盘,读写redis快,但是不受空间容量限制,性价比高;redis用于存储使用较为频繁的数据到缓存中,读取速度快,基于内存,读写速度快,也可做持久化,但是内存空间有限,当数据量超过内存空间时,需扩充内存,但内存价格贵;

(3) 需求:mysql和redis因为需求的不同,一般都是配合使用。需要高性能的地方使用Redis,不需要高性能的地方使用MySQL。存储数据在MySQL和Redis之间做同步。所以一般情形下,使用MySQL作为持久化存储数据库存储数据,使用redis作为缓存提升读取速度。

二、数据同步实现方案

本文实验,设计一个学生信息表,持久化数据存储在MySQL数据库中,然后利用redis作为缓存数据库,实现数据的快速读取。这样就需要保持redis和MySQL数据库的数据一致性,接下来,主要讲解查询和数据更新过程的数据库一致性实现。

1、查询一致性

查询数据时,由于redis作为缓存实现快速读取数据,所以首先查询redis中是否存在数据,若存在则返回查询结果,若不存在,则向MySQL数据库请求查询数据,然后由MySQL数据库返回结果。查询流程如下如所示。而且,由于本文中redis作为缓存使用,所以需要添加过期时间,也就是为redis的每条数据记录添加过期时间,若过期时间数据没有被查询则清除,若此时间内,数据被查询,则过期时间重置,这样可以定时清除查询不频繁的数据存在redis中,增加数据读取速度。

 2、数据更新一致性

更新数据时,如果先更新MySQL数据库,在尚未更新redis时,如果此时有查询进行,则redis返回尚未更新的数据,返回结果有误。所以为了避免这种情况,采用先更新redis,再更新MySQL的方案。也就是,首先查询redis是否存在要更新的数据,存在则清除redis的这条数据,进行重新添加更新,然后再更新MySQL数据库数据,当MySQL更新成功后,再次更新redis,防止MySQL更新失败,而redis更新成功的情况发生;若不存在,则更新MySQL数据库数据,MySQL更新成功后,然后再更新redis。

三、数据库设计

1、MySQL设计

设计一张学生信息表,存储学生学号、姓名、出生日期、电话号码等信息,主键为学生学号。SQL语句如下。

  1. CREATE DATABASE python_mysql_test01;
  2. CREATE TABLE tb_student(
  3. stu_id INT PRIMARY KEY NOT NULL,
  4. stu_name VARCHAR(20) NOT NULL,
  5. stu_birth DATE,
  6. stu_phone VARCHAR(100)
  7. );

 2、redis设计

这里redis表的设计采用hash类型的数据,这样可以存在多个key-value对,以用户ID作为hash表的名称,stu_name、stu_birth等作为hash表的键值对,即一个记录:

hmset stu_id:1001 stu_name 'Alice' stu_birth '1990-12-15' stu_phone '15522222141'

同时设置过期时间:expire stu_id:1001 600,即过期时间为10分钟。

四、程序编写

1、查询数据

查询数据,先去缓存redis中查找,如存在数据则返回结果,若不存在,则去MySQL中查找。

  1. # 查询数据
  2. def get_data(self, stu_id):
  3. # redis hash表名称
  4. find_info = 'stu_id:' + str(stu_id)
  5. # 先查询redis数据库是否存在数据,如果存在数据则返回输出,若不存在则去MySQL中查询,然后再将结果更新到redis中
  6. result = self.r0.hgetall(find_info)
  7. # 长度>0 即redis存在查询的信息,直接输出信息,否则redis中不存在,需要查询MySQL
  8. if len(result) > 0:
  9. """
  10. 每次在redis中更新或者写入数据都需要设置过期时间10分钟,然后每查询到一次就重置过期时间10分钟,
  11. 若10分钟没有查询到这个数据,就会被清除。这样设置过期时间主要防止redis缓存数据过多,清除不常用缓存数据"""
  12. self.r0.expire(find_info, 600)
  13. print(result)
  14. return result
  15. else:
  16. with self.conn.cursor() as cursor:
  17. try:
  18. # 执行MySQL的查询操作
  19. cursor.execute('select stu_name, stu_birth, stu_phone from tb_student '
  20. 'where stu_id=%s', (stu_id,))
  21. result_sql = cursor.fetchall()
  22. print(result_sql)
  23. # 将查询结果更新写入redis数据库中
  24. stu_name, stu_birth, stu_phone = result_sql[0][0], result_sql[0][1], result_sql[0][2]
  25. data_info = {'stu_name': stu_name,
  26. 'stu_birth': str(stu_birth),
  27. 'stu_phone': stu_phone}
  28. self.r0.hmset(find_info, data_info)
  29. self.r0.expire(find_info, 600) # 设置过期时间
  30. return result_sql
  31. except Exception as error:
  32. print(error)
  33. finally:
  34. self.conn.close()

2、更新数据

更新数据,这里主要是以插入数据为例。

  1. """
  2. 更新数据的操作,为了避免更新MySQL后,redis没更新的这一段空挡时间的查询,所以先更新redis,
  3. 再更新MySQL,然后MySQL成功提交后,再次对redis进行重新更新
  4. """
  5. def post_data(self):
  6. # 插入数据
  7. stu_id, stu_name, stu_birth, stu_phone = 1004, 'Tom', '1993-07-04', '19909092332'
  8. # redis hash表名称
  9. find_info = 'stu_id:' + str(stu_id)
  10. # 先查询redis数据库是否存在数据,如果存在数据则更新redis,再更新MySQL,若不存在则去MySQL中更新,提交成功再次更新redis
  11. result = self.r0.hgetall(find_info)
  12. # reids存在数据,则需要对数据进行更新,即先清除再写入; 写入redis后,再将数据写入MySQL
  13. if len(result) > 0:
  14. # 清除数据
  15. all_keys = self.r0.hkeys(find_info)
  16. self.r0.hdel(find_info, *all_keys)
  17. data_info = {'stu_name': stu_name,
  18. 'stu_birth': stu_birth,
  19. 'stu_phone': stu_phone}
  20. self.r0.hmset(find_info, data_info)
  21. self.r0.expire(find_info, 600) # 设置过期时间
  22. with self.conn.cursor() as cursor:
  23. try:
  24. # 插入SQL语句,result为返回的结果
  25. res_info = cursor.execute(
  26. 'insert into tb_student values (%s, %s, %s, %s)', (stu_id, stu_name, stu_birth, stu_phone,)
  27. )
  28. # 成功插入后需要提交才能同步在数据库中
  29. if isinstance(res_info, int):
  30. print('数据更新成功')
  31. self.conn.commit()
  32. all_keys = self.r0.hkeys(find_info)
  33. # 再次更新redis
  34. self.r0.hdel(find_info, *all_keys)
  35. self.r0.hmset(find_info, data_info)
  36. self.r0.expire(find_info, 600) # 设置过期时间
  37. except MySQLError as error:
  38. # 如果MySQL提交不成功,清除redis数据
  39. all_keys = self.r0.hkeys(find_info)
  40. self.r0.hdel(find_info, *all_keys)
  41. print(error)
  42. self.conn.rollback()
  43. finally:
  44. # 操作执行完成后,需要关闭连接
  45. self.conn.close()
  46. else:
  47. with self.conn.cursor() as cursor:
  48. try:
  49. # 插入SQL语句,result为返回的结果
  50. res_info = cursor.execute(
  51. 'insert into tb_student values (%s, %s, %s, %s)', (stu_id, stu_name, stu_birth, stu_phone,)
  52. )
  53. # 成功插入后需要提交才能同步在数据库中
  54. if isinstance(res_info, int):
  55. print('数据更新成功')
  56. self.conn.commit()
  57. except MySQLError as error:
  58. print(error)
  59. self.conn.rollback()
  60. finally:
  61. # 操作执行完成后,需要关闭连接
  62. self.conn.close()

附录代码:

  1. import pymysql
  2. import redis
  3. from pymysql import MySQLError
  4. import time, datetime
  5. class DatabaseSync:
  6. def __init__(self):
  7. # 连接MySQL数据库
  8. try:
  9. self.conn = pymysql.connect(host='1.1.1.1', port=3306,
  10. user='root', password='111111',
  11. database='python_mysql_test01', charset='utf8')
  12. except Exception as error:
  13. print('连接MySQL出现问题!')
  14. print('失败原因:', error)
  15. exit()
  16. try:
  17. # 建立redis连接池
  18. self.conn_pool = redis.ConnectionPool(host='1.1.1.1', port=6379, db=0, decode_responses=True,
  19. password='111111')
  20. # 客户端0连接数据库
  21. self.r0 = redis.StrictRedis(connection_pool=self.conn_pool)
  22. except Exception as error:
  23. print('连接redis出现问题!')
  24. print('失败原因:', error)
  25. exit()
  26. # 查询数据
  27. def get_data(self, stu_id):
  28. # redis hash表名称
  29. find_info = 'stu_id:' + str(stu_id)
  30. # 先查询redis数据库是否存在数据,如果存在数据则返回输出,若不存在则去MySQL中查询,然后再将结果更新到redis中
  31. result = self.r0.hgetall(find_info)
  32. # 长度>0 即redis存在查询的信息,直接输出信息,否则redis中不存在,需要查询MySQL
  33. if len(result) > 0:
  34. """
  35. 每次在redis中更新或者写入数据都需要设置过期时间10分钟,然后每查询到一次就重置过期时间10分钟,
  36. 若10分钟没有查询到这个数据,就会被清除。这样设置过期时间主要防止redis缓存数据过多,清除不常用缓存数据"""
  37. self.r0.expire(find_info, 600)
  38. print(result)
  39. return result
  40. else:
  41. with self.conn.cursor() as cursor:
  42. try:
  43. # 执行MySQL的查询操作
  44. cursor.execute('select stu_name, stu_birth, stu_phone from tb_student '
  45. 'where stu_id=%s', (stu_id,))
  46. result_sql = cursor.fetchall()
  47. print(result_sql)
  48. # 将查询结果更新写入redis数据库中
  49. stu_name, stu_birth, stu_phone = result_sql[0][0], result_sql[0][1], result_sql[0][2]
  50. data_info = {'stu_name': stu_name,
  51. 'stu_birth': str(stu_birth),
  52. 'stu_phone': stu_phone}
  53. self.r0.hmset(find_info, data_info)
  54. self.r0.expire(find_info, 600) # 设置过期时间
  55. return result_sql
  56. except Exception as error:
  57. print(error)
  58. finally:
  59. self.conn.close()
  60. """
  61. 更新数据的操作,为了避免更新MySQL后,redis没更新的这一段空挡时间的查询,所以先更新redis,
  62. 再更新MySQL,然后MySQL成功提交后,再次对redis进行重新更新
  63. """
  64. def post_data(self):
  65. # 插入数据
  66. stu_id, stu_name, stu_birth, stu_phone = 1004, 'Tom', '1993-07-04', '19909092332'
  67. # redis hash表名称
  68. find_info = 'stu_id:' + str(stu_id)
  69. # 先查询redis数据库是否存在数据,如果存在数据则更新redis,再更新MySQL,若不存在则去MySQL中更新,提交成功再次更新redis
  70. result = self.r0.hgetall(find_info)
  71. # reids存在数据,则需要对数据进行更新,即先清除再写入; 写入redis后,再将数据写入MySQL
  72. if len(result) > 0:
  73. # 清除数据
  74. all_keys = self.r0.hkeys(find_info)
  75. self.r0.hdel(find_info, *all_keys)
  76. data_info = {'stu_name': stu_name,
  77. 'stu_birth': stu_birth,
  78. 'stu_phone': stu_phone}
  79. self.r0.hmset(find_info, data_info)
  80. self.r0.expire(find_info, 600) # 设置过期时间
  81. with self.conn.cursor() as cursor:
  82. try:
  83. # 插入SQL语句,result为返回的结果
  84. res_info = cursor.execute(
  85. 'insert into tb_student values (%s, %s, %s, %s)', (stu_id, stu_name, stu_birth, stu_phone,)
  86. )
  87. # 成功插入后需要提交才能同步在数据库中
  88. if isinstance(res_info, int):
  89. print('数据更新成功')
  90. self.conn.commit()
  91. all_keys = self.r0.hkeys(find_info)
  92. # 再次更新redis
  93. self.r0.hdel(find_info, *all_keys)
  94. self.r0.hmset(find_info, data_info)
  95. self.r0.expire(find_info, 600) # 设置过期时间
  96. except MySQLError as error:
  97. # 如果MySQL提交不成功,清除redis数据
  98. all_keys = self.r0.hkeys(find_info)
  99. self.r0.hdel(find_info, *all_keys)
  100. print(error)
  101. self.conn.rollback()
  102. finally:
  103. # 操作执行完成后,需要关闭连接
  104. self.conn.close()
  105. else:
  106. with self.conn.cursor() as cursor:
  107. try:
  108. # 插入SQL语句,result为返回的结果
  109. res_info = cursor.execute(
  110. 'insert into tb_student values (%s, %s, %s, %s)', (stu_id, stu_name, stu_birth, stu_phone,)
  111. )
  112. # 成功插入后需要提交才能同步在数据库中
  113. if isinstance(res_info, int):
  114. print('数据更新成功')
  115. self.conn.commit()
  116. except MySQLError as error:
  117. print(error)
  118. self.conn.rollback()
  119. finally:
  120. # 操作执行完成后,需要关闭连接
  121. self.conn.close()
  122. if __name__ == '__main__':
  123. dbs = DatabaseSync()
  124. # dbs.get_data(1003)
  125. dbs.post_data()
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/670210
推荐阅读
相关标签
  

闽ICP备14008679号