赞
踩
之前写过一个用python迁移redis的脚本,但是对于小批量的数据、单节点的,还可以用。对于大量数据,集群模式的就不太适合了。所以写了下面的脚本,而且做了一定的优化,使用的pipeline和多线程,保证了迁移数据的速度,本人测试,大概100s复制了110万键值对的数据,差不多是每秒一万键值对的复制速度。
最新进展20240429:
代码变化不大的情况下,用了multiprocessing模块,这个效率要高一些,大概80-90s复制了110万键值对的数据
而且加大线程数之后,能做到60s左右复制了110万键值对;140s约300万键值对,;50线程,1051万键值对,用了460s;大约2万多键值对每秒。
再同时用多进程和多线程的情况下,对比只用多线程,提升的并不明显。因为此时redis导入新库只能用一个个键值设置的方式,不能用redis pipe提交了,提交速度变慢了。
注意:
pip install redis rediscluster
下面是一些需要注意的:
暂时就这些了。
内容如下,根据实际情况进行调整
#!/usr/bin/env python # -*- coding: utf-8 -*- # 2024/4/24 from datetime import datetime import time import threading import redis from rediscluster import RedisCluster def split_list(big_list, num=1): """ 原来是[1,2,3,4,5,6]的列表,拆分成[[1,2], [3,4], [5,6]]小列表,主要是为了多线程 :param big_list: 大列表 :param num: 拆分多少个列表,这个主要对应后面的线程数,或者说redis的连接数,不能设置的太大,否则会报错Too many connections :return: 新的列表 """ list_len = len(big_list) new_list = [] if list_len > num: if list_len % num == 0: small_list_len = list_len // num else: small_list_len = list_len // num + 1 start = 0 for i in range(num): # print(i) new_list.append(big_list[start: start + small_list_len]) start += small_list_len else: new_list.append(big_list) return new_list def redis_get_set(redis_source, redis_target, redis_list, batch_size=100): """ 读取redis“键”列表,获取Key/Value值,写入到新的redis :param redis_source: 原redis实例 :param redis_target: 新redis实例 :param redis_list: 要迁移的redis Key值列表 :param batch_size: 使用pipeline写入新的redis实例,提高写入效率 :return: """ count = 0 with redis_target.pipeline() as pipe: for k in redis_list: data_type = redis_source.type(k) # 判断key值数据类型,分别处理,没有stream数据类型的处理,后面有必要再添加 if data_type == 'string': v = redis_source.get(k) pipe.set(k, v) elif data_type == 'list': v = redis_source.lrange(k, 0, -1) pipe.lpush(k, *v) elif data_type == 'set': v = redis_source.smembers(k) pipe.sadd(k, *v) elif data_type == 'hash': fields = redis_source.hgetall(k) pipe.hset(k, mapping=fields) elif data_type == 'zset': v = redis_source.zrange(k, 0, -1, withscores=True) # 需要将元组数据转化为字典数据 pipe.zadd(k, dict(v)) else: print('not known type') count += 1 # 如果数据量较大,循环batch_size次数后提交一次 if count % batch_size == 0: print(f'\n当前时间:{datetime.now()},进程:{threading.current_thread()},已完成{count}对读写操作') pipe.execute() pipe.reset() # 最后再提交一次pipeline pipe.execute() pipe.reset() print(f'\n当前时间:{datetime.now()},进程:{threading.current_thread()},已完成所有读写操作!') def redis_copy(redis_source, redis_target, thread_num=5, batch_size=100): """ 将原始redis的Key值大列表进行拆分,然后拆分后的列表进行多线程处理 :param redis_source: 原redis实例 :param redis_target: 新redis实例 :param thread_num: 线程数,将大列表拆分为几个小列表,这个数不要太大,一般10个就行,不然程序会报错 :param batch_size: :return: """ # 检查两个redis是否可用 try: redis_source.ping() redis_target.ping() print("Redis节点可连接") except Exception as e: print(f"连接Redis失败: {e}") redis_target = None # 线程列表 threads = [] if redis_target: new_list = split_list(redis_source.keys('*'), thread_num) for data in new_list: t = threading.Thread(target=redis_get_set, args=(redis_source, redis_target, data, batch_size)) threads.append(t) t.start() for t in threads: t.join() print("所有线程执行完毕") def single_to_single(thread_num, batch_size): """ 单节点迁移到单节点 """ # 原始redis,单节点 source_pool = redis.ConnectionPool( host='192.168.10.1', port=6379, db=0, password='123456', encoding='utf-8', decode_responses=True, socket_timeout=10, max_connections=100 ) redis_source = redis.Redis(connection_pool=source_pool) # 目标redis,单节点 target_pool = redis.ConnectionPool( host='192.168.10.2', port=6369, db=0, password='123456', encoding='utf-8', decode_responses=True, socket_timeout=10, max_connections=100 ) redis_target = redis.Redis(connection_pool=target_pool) redis_copy(redis_source, redis_target, thread_num=10, batch_size=10000) def single_to_cluster(thread_num, batch_size): """ 单节点迁移到单节点 """ # 原始redis,单节点 source_pool = redis.ConnectionPool( host='192.168.10.1', port=6379, db=0, password='123456', encoding='utf-8', decode_responses=True, socket_timeout=10, max_connections=100 ) redis_source = redis.Redis(connection_pool=source_pool) # 目标redis,集群 target_node_list = [ {"host": "192.168.11.1", "port": "6379"}, {"host": "192.168.11.2", "port": "6379"}, {"host": "192.168.11.3", "port": "6379"}, {"host": "192.168.11.4", "port": "6379"}, {"host": "192.168.11.5", "port": "6379"}, {"host": "192.168.11.6", "port": "6379"}, ] # 创建RedisCluster的实例 # decode_responses设置为True会自动将响应数据解码为utf-8编码的字符串 redis_cluster_target = RedisCluster( startup_nodes=target_node_list, decode_responses=True, password='123456' ) redis_copy(redis_source, redis_cluster_target, thread_num=10, batch_size=10000) def cluster_to_single(thread_num, batch_size): """ 集群迁移到集群 """ # 原始redis,集群 source_node_list = [ {"host": "192.168.0.1", "port": "6379"}, {"host": "192.168.0.2", "port": "6379"}, {"host": "192.168.0.3", "port": "6379"}, {"host": "192.168.0.4", "port": "6379"}, {"host": "192.168.0.5", "port": "6379"}, {"host": "192.168.0.6", "port": "6379"}, ] # 创建RedisCluster的实例 # decode_responses设置为True会自动将响应数据解码为utf-8编码的字符串 redis_cluster_source = RedisCluster( startup_nodes=source_node_list, decode_responses=True, password='123456' ) # 目标redis,单节点 target_pool = redis.ConnectionPool( host='192.168.10.2', port=6369, db=0, password='123456', encoding='utf-8', decode_responses=True, socket_timeout=10, max_connections=100 ) redis_target = redis.Redis(connection_pool=target_pool) redis_copy(redis_cluster_source, redis_target, thread_num=10, batch_size=10000) def cluster_to_cluster(thread_num, batch_size): """ 集群迁移到集群 """ # 原始redis,集群 source_node_list = [ {"host": "192.168.0.1", "port": "6379"}, {"host": "192.168.0.2", "port": "6379"}, {"host": "192.168.0.3", "port": "6379"}, {"host": "192.168.0.4", "port": "6379"}, {"host": "192.168.0.5", "port": "6379"}, {"host": "192.168.0.6", "port": "6379"}, ] # 创建RedisCluster的实例 # decode_responses设置为True会自动将响应数据解码为utf-8编码的字符串 redis_cluster_source = RedisCluster( startup_nodes=source_node_list, decode_responses=True, password='123456' ) # 目标redis,集群 target_node_list = [ {"host": "192.168.11.1", "port": "6379"}, {"host": "192.168.11.2", "port": "6379"}, {"host": "192.168.11.3", "port": "6379"}, {"host": "192.168.11.4", "port": "6379"}, {"host": "192.168.11.5", "port": "6379"}, {"host": "192.168.11.6", "port": "6379"}, ] # 创建RedisCluster的实例 # decode_responses设置为True会自动将响应数据解码为utf-8编码的字符串 redis_cluster_target = RedisCluster( startup_nodes=target_node_list, decode_responses=True, password='123456' ) redis_copy(redis_cluster_source, redis_cluster_target, thread_num=10, batch_size=10000) if __name__ == '__main__': # 性能与效率控制 # 线程数 thread_num = 10 # 写入批量提交数 batch_size = 10000 start_time = time.perf_counter() # 单节点迁移到单节点 single_to_single(thread_num, batch_size) # 单节点迁移到集群 # single_to_cluster(thread_num, batch_size) # 集群迁移到单节点 # cluster_to_single(thread_num, batch_size) # 集群迁移到集群 # cluster_to_cluster(thread_num, batch_size) end_time = time.perf_counter() # 计算执行时间 execution_time = end_time - start_time print(f"代码执行时间: {execution_time} 秒")
上面的代码,为了优化性能,改了好几次。刚开始的时候,50万键值对数据(5个数据类型各10万左右),迁移复制大概需要300s-400s左右,平均每秒钟大约复制1300-1700的键值对,经过多次优化,平均每秒钟大约复制9000的键值对,提升了6-7倍左右。
优化思路:
其它思考:
1 还能进行哪些优化呢?我看有些商业软件能做到每秒钟10万级别KV的复制,想不出来怎么做的。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。