当前位置:   article > 正文

使用redis-shake进行数据迁移(rdb->cluster;cluster->cluster 两种方式)_redis-shake迁移后数据不一致

redis-shake迁移后数据不一致

redis-shake 的中文文档

Sync Reader | RedisShake (tair-opensource.github.io)

redis-shake 数据恢复

1环境(工具)部署

redis 数据迁移 master 对应dump.rdb文件通过redis-shake单个导入

安装redis-shake v4 ( redis-shake-linux-amd64.tar.gz)

//停止旧集群数据写入,这个可以自己找一下

2 新集群数据清除

登录新集群 执行flushall

关闭集群删除对应的rdb 文件和aof文件(好像 直接flushall就可以了,以防万一 直接删了)

启动集群

3 redis数据迁移方式(redis-shake v4  )

3.0 可能出现的问题

[rdb_reader]+[redis_writer]:

测试集群问题: key缺失 前后节点 dbsize 数量应该是一致的(旧集群停止写入)

这种情况是正常的 因为旧集群没关在不停写数据 ,再一个key有过期时间 ;

要测试最好把旧集群写入数据停止 (生产环境 目前没有试过...谨慎)

3.1 [rdb_reader]+[redis_writer]

备份redis数据 //.rdb

1 登录旧集群  在每个节点的 master 上执行 bgsave(3个节点都要执行以下4个端口的数据备份)

./bin/redis-cli -p 7000 -a password

./bin/redis-cli -p 7001 -a password

./bin/redis-cli -p 7002 -a password

./bin/redis-cli -p 7003 -a password

2 查看备份状态 为0 将对应的 rdb文件下载

./bin/redis-cli -p 7000 -a password info Persistence | grep 'rdb_bgsave_in_progress'

./bin/redis-cli -p 7001 -a password info Persistence | grep 'rdb_bgsave_in_progress'

./bin/redis-cli -p 7002 -a password info Persistence | grep 'rdb_bgsave_in_progress'

./bin/redis-cli -p 7003 -a password info Persistence | grep 'rdb_bgsave_in_progress'

编写脚本

vim shake.toml

function = ""

[rdb_reader]

# 分别执行即可 可以自己优化一下

filepath = "/usr/local/redis/data/data/dump01.rdb"

#filepath = "/usr/local/redis/data/data/dump02.rdb"

#filepath = "/usr/local/redis/data/data/dump03.rdb"

[redis_writer]

cluster = true            # set to true if target is a redis cluster

#address = "192.168.202.128:6379;192.168.202.129:6379;192.168.202.130:6379"

#集群模式只用 选择其中一个master即可

address = "192.168.202.129:6379"

username = ""              # keep empty if not using ACL

password = 123456              # keep empty if no authentication is required

tls = false

off_reply = false       # ture off the server reply

# log

log_file = "/opt/module/shake.log"

log_level = "info"     # debug, info or warn

log_interval = 5       # in seconds

rdb_restore_command_behavior = "rewrite"

pipeline_count_limit = 1024

#客户端查询缓冲区的最大长度,默认为 1GB。

target_redis_client_max_querybuf_len = 1024_000_000

#单字符串元素的最大长度,默认为 512MB。

target_redis_proto_max_bulk_len = 512_000_000

# If the source is Elasticache or MemoryDB, you can set this item.

aws_psync = "" # example: aws_psync = "10.0.0.1:6379@nmfu2sl5osync,10.0.0.1:6379@xhma21xfkssync"

empty_db_before_sync = false

[module]

# The data format for BF.LOADCHUNK is not compatible in different versions. v2.6.3 <=> 20603

#target_mbbloom_version = 20603

//修改完保存,每次导入一个rbd文件

执行 ./redis-shake shake.toml

可 另外开一个会话 提前 进入data目录 tail -f log 查看迁移日志

3.2 sync_reader + redis_writer

同样  vim shake.toml 修改为自己集群的配置即可 然后  ./redis-shake shake.toml 就行

3.3 可视化工具分享

推荐一个工具 RedisInsight  一个可视化工具(自己添加 数据源就行 )

链接:https://pan.baidu.com/s/14kBydxkaR_IMxixZgIu-1Q?pwd=48xd 
提取码:48xd 

自己写的脚本 从rdb数据恢复// 这里用 gpt 写了一个脚本 (保存所有的key//包括过期的  该脚本进行同步的时候 未保留 ttl (数据同步之后 过期时间为无限制.....不建议使用!!! 可以自己优化 期待分享)) 

1 工具安装:

 pip3 install redis-py-cluster 

2 编写脚本://修改一下自己的集群配置  以及rdb文件保存地址 已标红 

vim rdb_to_redis.py

import redis
from rediscluster import RedisCluster
from rdbtools import RdbParser, RdbCallback
import logging

# 配置日志记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# 配置Redis集群信息 集群只用写一个master节点信息即可 执行脚本会自己导入
NEW_REDIS_HOST = "192.168.202.128"
NEW_REDIS_PORT = 6379
NEW_REDIS_PASSWORD = "123456"  # 添加密码配置

# 连接到新的Redis集群
startup_nodes = [{"host": NEW_REDIS_HOST, "port": NEW_REDIS_PORT}]
new_redis = RedisCluster(startup_nodes=startup_nodes, password=NEW_REDIS_PASSWORD, decode_responses=True, skip_full_coverage_check=True)

class RedisRdbCallback(RdbCallback):
    def __init__(self, redis_client):
        self.redis_client = redis_client
        self.keys_imported = 0
        self.errors = []

    def set(self, key, value, expiry, info):
        try:
            logging.info(f"Setting key: {key} without considering expiry")
            self.redis_client.set(key, value)
            self.keys_imported += 1
        except Exception as e:
            logging.error(f"Error setting key: {key}, Error: {str(e)}")
            self.errors.append((key, str(e)))

    def hset(self, key, field, value):
        try:
            logging.info(f"HSET key: {key}, field: {field}")
            self.redis_client.hset(key, field, value)
            self.keys_imported += 1
        except Exception as e:
            logging.error(f"Error hsetting key: {key}, field: {field}, Error: {str(e)}")
            self.errors.append((key, str(e)))

    def sadd(self, key, member):
        try:
            logging.info(f"SADD key: {key}, member: {member}")
            self.redis_client.sadd(key, member)
            self.keys_imported += 1
        except Exception as e:
            logging.error(f"Error sadd key: {key}, member: {member}, Error: {str(e)}")
            self.errors.append((key, str(e)))

    def rpush(self, key, value):
        try:
            logging.info(f"RPUSH key: {key}, value: {value}")
            self.redis_client.rpush(key, value)
            self.keys_imported += 1
        except Exception as e:
            logging.error(f"Error rpush key: {key}, value: {value}, Error: {str(e)}")
            self.errors.append((key, str(e)))

    def zadd(self, key, score, member):
        try:
            logging.info(f"ZADD key: {key}, score: {score}, member: {member}")
            self.redis_client.zadd(key, {member: score})
            self.keys_imported += 1
        except Exception as e:
            logging.error(f"Error zadd key: {key}, score: {score}, member: {member}, Error: {str(e)}")
            self.errors.append((key, str(e)))

    def start_hash(self, key, length, expiry, info):
        logging.info(f"Starting hash key: {key} without considering expiry")
        self.current_key = key
        self.current_type = 'hash'

    def start_set(self, key, cardinality, expiry, info):
        logging.info(f"Starting set key: {key} without considering expiry")
        self.current_key = key
        self.current_type = 'set'

    def start_list(self, key, length, expiry, info):
        logging.info(f"Starting list key: {key} without considering expiry")
        self.current_key = key
        self.current_type = 'list'

    def start_sorted_set(self, key, length, expiry, info):
        logging.info(f"Starting sorted set key: {key} without considering expiry")
        self.current_key = key
        self.current_type = 'zset'

def migrate_rdb_to_redis(rdb_file_path, redis_client):
    callback = RedisRdbCallback(redis_client)
    parser = RdbParser(callback)
    parser.parse(rdb_file_path)
    logging.info(f"Total keys imported: {callback.keys_imported}")
    if callback.errors:
        logging.error(f"Errors encountered during migration: {callback.errors}")

# 迁移RDB文件中的数据到新的Redis集群
migrate_rdb_to_redis('/usr/local/redis/data/data/dump01.rdb', new_redis)

期待大家点赞评论 互帮互助 一起进步!!
 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/784548
推荐阅读
相关标签
  

闽ICP备14008679号