赞
踩
Sync Reader | RedisShake (tair-opensource.github.io)
redis 数据迁移 master 对应dump.rdb文件通过redis-shake单个导入
安装redis-shake v4 ( redis-shake-linux-amd64.tar.gz)
//停止旧集群数据写入,这个可以自己找一下
登录新集群 执行flushall
关闭集群删除对应的rdb 文件和aof文件(好像 直接flushall就可以了,以防万一 直接删了)
启动集群
[rdb_reader]+[redis_writer]:
测试集群问题: key缺失 前后节点 dbsize 数量应该是一致的(旧集群停止写入)
这种情况是正常的 因为旧集群没关在不停写数据 ,再一个key有过期时间 ;
要测试最好把旧集群写入数据停止 (生产环境 目前没有试过...谨慎)
1 登录旧集群 在每个节点的 master 上执行 bgsave(3个节点都要执行以下4个端口的数据备份)
./bin/redis-cli -p 7000 -a password
./bin/redis-cli -p 7001 -a password
./bin/redis-cli -p 7002 -a password
./bin/redis-cli -p 7003 -a password
2 查看备份状态 为0 将对应的 rdb文件下载
./bin/redis-cli -p 7000 -a password info Persistence | grep 'rdb_bgsave_in_progress'
./bin/redis-cli -p 7001 -a password info Persistence | grep 'rdb_bgsave_in_progress'
./bin/redis-cli -p 7002 -a password info Persistence | grep 'rdb_bgsave_in_progress'
./bin/redis-cli -p 7003 -a password info Persistence | grep 'rdb_bgsave_in_progress'
vim shake.toml
function = ""
[rdb_reader]
# 分别执行即可 可以自己优化一下
filepath = "/usr/local/redis/data/data/dump01.rdb"
#filepath = "/usr/local/redis/data/data/dump02.rdb"
#filepath = "/usr/local/redis/data/data/dump03.rdb"
[redis_writer]
cluster = true # set to true if target is a redis cluster
#address = "192.168.202.128:6379;192.168.202.129:6379;192.168.202.130:6379"
#集群模式只用 选择其中一个master即可
address = "192.168.202.129:6379"
username = "" # keep empty if not using ACL
password = 123456 # keep empty if no authentication is required
tls = false
off_reply = false # ture off the server reply
# log
log_file = "/opt/module/shake.log"
log_level = "info" # debug, info or warn
log_interval = 5 # in seconds
rdb_restore_command_behavior = "rewrite"
pipeline_count_limit = 1024
#客户端查询缓冲区的最大长度,默认为 1GB。
target_redis_client_max_querybuf_len = 1024_000_000
#单字符串元素的最大长度,默认为 512MB。
target_redis_proto_max_bulk_len = 512_000_000
# If the source is Elasticache or MemoryDB, you can set this item.
aws_psync = "" # example: aws_psync = "10.0.0.1:6379@nmfu2sl5osync,10.0.0.1:6379@xhma21xfkssync"
empty_db_before_sync = false
[module]
# The data format for BF.LOADCHUNK is not compatible in different versions. v2.6.3 <=> 20603
#target_mbbloom_version = 20603
//修改完保存,每次导入一个rbd文件
执行 ./redis-shake shake.toml
可 另外开一个会话 提前 进入data目录 tail -f log 查看迁移日志
同样 vim shake.toml 修改为自己集群的配置即可 然后 ./redis-shake shake.toml 就行
推荐一个工具 RedisInsight 一个可视化工具(自己添加 数据源就行 )
链接:https://pan.baidu.com/s/14kBydxkaR_IMxixZgIu-1Q?pwd=48xd
提取码:48xd
pip3 install redis-py-cluster
vim rdb_to_redis.py
import redis
from rediscluster import RedisCluster
from rdbtools import RdbParser, RdbCallback
import logging# 配置日志记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')# 配置Redis集群信息 集群只用写一个master节点信息即可 执行脚本会自己导入
NEW_REDIS_HOST = "192.168.202.128"
NEW_REDIS_PORT = 6379
NEW_REDIS_PASSWORD = "123456" # 添加密码配置# 连接到新的Redis集群
startup_nodes = [{"host": NEW_REDIS_HOST, "port": NEW_REDIS_PORT}]
new_redis = RedisCluster(startup_nodes=startup_nodes, password=NEW_REDIS_PASSWORD, decode_responses=True, skip_full_coverage_check=True)class RedisRdbCallback(RdbCallback):
def __init__(self, redis_client):
self.redis_client = redis_client
self.keys_imported = 0
self.errors = []def set(self, key, value, expiry, info):
try:
logging.info(f"Setting key: {key} without considering expiry")
self.redis_client.set(key, value)
self.keys_imported += 1
except Exception as e:
logging.error(f"Error setting key: {key}, Error: {str(e)}")
self.errors.append((key, str(e)))def hset(self, key, field, value):
try:
logging.info(f"HSET key: {key}, field: {field}")
self.redis_client.hset(key, field, value)
self.keys_imported += 1
except Exception as e:
logging.error(f"Error hsetting key: {key}, field: {field}, Error: {str(e)}")
self.errors.append((key, str(e)))def sadd(self, key, member):
try:
logging.info(f"SADD key: {key}, member: {member}")
self.redis_client.sadd(key, member)
self.keys_imported += 1
except Exception as e:
logging.error(f"Error sadd key: {key}, member: {member}, Error: {str(e)}")
self.errors.append((key, str(e)))def rpush(self, key, value):
try:
logging.info(f"RPUSH key: {key}, value: {value}")
self.redis_client.rpush(key, value)
self.keys_imported += 1
except Exception as e:
logging.error(f"Error rpush key: {key}, value: {value}, Error: {str(e)}")
self.errors.append((key, str(e)))def zadd(self, key, score, member):
try:
logging.info(f"ZADD key: {key}, score: {score}, member: {member}")
self.redis_client.zadd(key, {member: score})
self.keys_imported += 1
except Exception as e:
logging.error(f"Error zadd key: {key}, score: {score}, member: {member}, Error: {str(e)}")
self.errors.append((key, str(e)))def start_hash(self, key, length, expiry, info):
logging.info(f"Starting hash key: {key} without considering expiry")
self.current_key = key
self.current_type = 'hash'def start_set(self, key, cardinality, expiry, info):
logging.info(f"Starting set key: {key} without considering expiry")
self.current_key = key
self.current_type = 'set'def start_list(self, key, length, expiry, info):
logging.info(f"Starting list key: {key} without considering expiry")
self.current_key = key
self.current_type = 'list'def start_sorted_set(self, key, length, expiry, info):
logging.info(f"Starting sorted set key: {key} without considering expiry")
self.current_key = key
self.current_type = 'zset'def migrate_rdb_to_redis(rdb_file_path, redis_client):
callback = RedisRdbCallback(redis_client)
parser = RdbParser(callback)
parser.parse(rdb_file_path)
logging.info(f"Total keys imported: {callback.keys_imported}")
if callback.errors:
logging.error(f"Errors encountered during migration: {callback.errors}")# 迁移RDB文件中的数据到新的Redis集群
migrate_rdb_to_redis('/usr/local/redis/data/data/dump01.rdb', new_redis)
期待大家点赞评论 互帮互助 一起进步!!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。