当前位置:   article > 正文

Springboot 基于canal实现MySQL和redis的数据一致性_springboot 集成canal 实现redis 和mysql 不一致问题

springboot 集成canal 实现redis 和mysql 不一致问题

基于canal实现MySQL和redis的数据一致性

安装mysql和redis教程跳过,如未安装,自行百度

1、安装canal

GitHub安装地址:https://bgithub.xyz/alibaba/canal/releases/tag/canal-1.1.7

2、修改canal配置文件

配置文件路径\canal.deployer-1.1.7\conf\example\instance.properties

修改连接MySQL数据库相关配置

在这里插入图片描述

3、查看mysql是否开启bin_log日志

value为ON代表已开启

show variables like '%log_bin%';
  • 1

在这里插入图片描述
如果未开启,需要修改mysql的my.ini配置

添加开启bin_log日志

添加配置后,重启mysql服务

重新输入sql查看是否开启

show variables like '%log_bin%';
  • 1

4、mysql数据库配置canal用户

CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT,
	REPLICATION SLAVE,
	REPLICATION CLIENT ON *.* TO 'canal' @'%';
GRANT ALL PRIVILEGES ON *.* TO 'canal' @'%';
FLUSH PRIVILEGES;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

5、启动canal

双击\canal.deployer-1.1.7\bin\startup.bat

如果双击出现闪退,

则需要修改startup.bat启动命令

删除PermSize=128m这段即可

截图中已经删除

在这里插入图片描述

6、代码实现

添加pom依赖

        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.0</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

简单代码实现

package com.ljt;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.test.context.junit4.SpringRunner;

import java.net.InetSocketAddress;
import java.util.List;

/**
 * @author ljt
 * @date 2024/1/23
 * @description
 */
@SpringBootTest
@RunWith(SpringRunner.class)
@Slf4j
public class CanalClientTest {
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @Test
    public void test1() {
        //创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1"
                , 11111), "example", "", "");
        //创建链接
        connector.connect();
        int batchSize = 1000;
        while (true) {
            //订阅数据库
//            connector.subscribe("");
            Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
            List<CanalEntry.Entry> entries = message.getEntries();
            if (entries.size() <= 0) {
                log.info("当前未监测到数据修改!");
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            } else {
                for (CanalEntry.Entry entry : entries) {
                    if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                        continue;
                    }
                    ByteString storeValue = entry.getStoreValue();
                    String tableName = entry.getHeader().getTableName();
                    CanalEntry.RowChange rowChange = null;
                    try {
                        rowChange = CanalEntry.RowChange.parseFrom(storeValue);
                    } catch (InvalidProtocolBufferException e) {
                        throw new RuntimeException(e);
                    }
                    CanalEntry.EventType eventType = rowChange.getEventType();
                    List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
                    System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                            entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                            entry.getHeader().getSchemaName(), tableName,
                            eventType));
                    for (CanalEntry.RowData rowData : rowDatasList) {
                        if (eventType == CanalEntry.EventType.DELETE) {
                            redisDel(rowData.getAfterColumnsList(), tableName);
                        } else if (eventType == CanalEntry.EventType.INSERT) {
                            redisSet(rowData.getAfterColumnsList(), tableName);
                        } else {
                            System.out.println("-------&gt; before");
                            printColumn(rowData.getBeforeColumnsList());
                            redisSet(rowData.getAfterColumnsList(), tableName);
                        }
                    }

                }
            }
        }
    }

    private void redisDel(List<CanalEntry.Column> columns, String tableName) {
        JSONObject jsonObject = new JSONObject();
        for (CanalEntry.Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            jsonObject.put(column.getName(), column.getValue());
        }
        if (columns.size() > 0) {
            redisTemplate.delete(tableName + ":" + columns.get(0).getValue());
        }
    }

    private void redisSet(List<CanalEntry.Column> columns, String tableName) {
        JSONObject jsonObject = new JSONObject();
        for (CanalEntry.Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            jsonObject.put(column.getName(), column.getValue());
        }
        if (columns.size() > 0) {
            redisTemplate.opsForValue().set(tableName + ":" + columns.get(0).getValue(), jsonObject.toJSONString());
        }
    }

    private void printColumn(List<CanalEntry.Column> columns) {
        for (CanalEntry.Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/721954
推荐阅读
相关标签
  

闽ICP备14008679号