当前位置:   article > 正文

Redis学习笔记:Redis与MySQL数据双写一致性 Java Canal配置Redis,MySQL步骤_java 实现mysql和redis数据一致性 实战代码

java 实现mysql和redis数据一致性 实战代码

Redis学习笔记:Redis与MySQL数据双写一致性 Java Canal配置Redis,MySQL步骤

MySQL8配置

修改my.ini文件

文件位置在mysql的安装目录下,博主的目录如下:D:\Environments\Mysql\mysql-8.0.34-winx64
打开my.ini文件,在**[mysqld]**下添加:

log-bin=mysql-bin
binlog-format=ROW 
server_id=1 
  • 1
  • 2
  • 3

重启mysql

首先以管理员的身份打开命令行工具,WIN+R后,Ctrl+Shift+Enter即可
关闭mysql服务

net stop mysql80
  • 1

开启mysql服务

net start mysql80
  • 1

创建用户,授权

在mysql数据库中原生的mysql数据表中新建查询,输入以下命令:

drop user canal;
CREATE USER canal IDENTIFIED WITH mysql_native_password BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
  • 1
  • 2
  • 3
  • 4

Canal服务端配置

修改配置

# 进入conf/example目录
cd conf/example
# 修改配置文件
vi instance.properties
  • 1
  • 2
  • 3
  • 4

修改如下:
在这里插入图片描述
第二个红框中个的ip地址即mysql所在的ip,博主的mysql是在windows上的
在windows查看本机ip地址的方式是:WIN+R cmd后输入:

ipconfig
  • 1

启动服务并验证

# 进入bin目录
cd bin
# 启动脚本
sh startup.sh
  • 1
  • 2
  • 3
  • 4

在这里插入图片描述

Java整合Canal

监听

@Component
public class RedisCanalClientExample {

    public static final Integer _60SECONDS = 60;
    public static final String REDIS_IP_ADDR = "192.168.81.128";

    private static void redisInsert(List<CanalEntry.Column> columns)
    {
        JSONObject jsonObject = new JSONObject();
        for (CanalEntry.Column column : columns)
        {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            jsonObject.put(column.getName(),column.getValue());
        }
        if(columns.size() > 0)
        {
            try(Jedis jedis = RedisUtils.getJedis())
            {
                jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    private static void redisDelete(List<CanalEntry.Column> columns)
    {
        JSONObject jsonObject = new JSONObject();
        for (CanalEntry.Column column : columns)
        {
            jsonObject.put(column.getName(),column.getValue());
        }
        if(columns.size() > 0)
        {
            try(Jedis jedis = RedisUtils.getJedis())
            {
                jedis.del(columns.get(0).getValue());
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    private static void redisUpdate(List<CanalEntry.Column> columns)
    {
        JSONObject jsonObject = new JSONObject();
        for (CanalEntry.Column column : columns)
        {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            jsonObject.put(column.getName(),column.getValue());
        }
        if(columns.size() > 0)
        {
            try(Jedis jedis = RedisUtils.getJedis())
            {
                jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
                System.out.println("---------update after: "+jedis.get(columns.get(0).getValue()));
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    public static void printEntry(List<CanalEntry.Entry> entries) {
        for (CanalEntry.Entry entry : entries) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChage = null;
            try {
                //获取变更的row数据
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(), e);
            }
            //获取变动类型
            CanalEntry.EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));

            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.INSERT) {
                    redisInsert(rowData.getAfterColumnsList());
                } else if (eventType == CanalEntry.EventType.DELETE) {
                    redisDelete(rowData.getBeforeColumnsList());
                } else {//EventType.UPDATE
                    redisUpdate(rowData.getAfterColumnsList());
                }
            }
        }
    }

    public static void main(String[] args) {
        System.out.println("--------------init()--------------");

        // 创建链接canal服务器
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(REDIS_IP_ADDR, 11111),
                "example",
                "",
                "");
        int batchSize = 1000;
        //空闲空转计数器
        int emptyCount = 0;
        System.out.println("--------------canal init ok, 开始监听mysql变化--------------");
        try {
            connector.connect();
            connector.subscribe("redis_learn.t_user");
            connector.rollback();
            int totalEmptyCount = 10 * _60SECONDS;
            while (emptyCount < totalEmptyCount) {
                System.out.println("我是canal,每秒一次正在监听:" + UUID.randomUUID().toString());
                //获取指定数量的数据
                Message mes = connector.getWithoutAck(batchSize);
                long batchId = mes.getId();
                int size = mes.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    //计数器重新置零
                    emptyCount = 0;
                    printEntry(mes.getEntries());
                }
                //提交确认
                connector.ack(batchId);
            }
            System.out.println("已经监听了" + totalEmptyCount + "秒,无任何消息,请重启重试......");
        } finally {
            connector.disconnect();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138

修改mysql指定的表的数据,如下图所示:
在这里插入图片描述

控制台输出监控日志,运行结果如下:
在这里插入图片描述
redis中的数据,如下图所示:
在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/165182
推荐阅读
相关标签
  

闽ICP备14008679号