当前位置:   article > 正文

docker部署canal,并在springboot中使用_docker安装canal

docker安装canal

canal不了解的可以先去了解下https://github.com/alibaba/canal/wiki,这里直接进入正题

一、配置canal-server

拉取镜像,默认拉取latest,可以选择对应版本,这里使用v1.1.5版本演示

docker pull canal/canal-server:v1.1.5

查看镜像是否拉取成功

docker images

在这里插入图片描述

宿主机新增配置文件、日志文件目录,后面会将canal.properties及instance.properties配置出来

mkdir /home/canal/conf
mkdir /home/canal/logs
mkdir /home/canal/logs/canal
mkdir /home/canal/logs/example

拉取完成后,先启动下canal,主要是为了从里面copy出配置文件

#启动镜像
docker run -d --name canal canal/canal-server:v1.1.5

#进入容器 查看配置文件路径
docker exec -it canal bash

#找到文件位置后 exit退出容器 将容器内部文件copy到上面新建的目录中
docker cp canal:/home/admin/canal-server/conf/canal.properties /home/canal/conf
docker cp canal:/home/admin/canal-server/conf/example/instance.properties /home/canal/conf/example/

容器内配置文件位置
在这里插入图片描述

修改instance.properties:
第一个红框是你需要监听数据库的地址和端口;
第二个红框是你数据库的用户和密码,这个用户信息一定是要有全部权限的用户,非root用户;
第三个是匹配数据表的规则,我这里默认为全部表

在这里插入图片描述
在这里插入图片描述

修改完成后,将之前的canal容器关闭并删除后,重新起一个新的容器

#关闭容器
docker stop canal

#移除容器
docker rm canal

#启动新容器,并挂载上面配置的相关目录
docker run --name canal -p 11111:11111 -d
-v /home/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties
-v /home/canal/conf/canal.properties:/home/admin/canal-server/conf/canal.properties
-v /home/canal/logs/:/home/admin/canal-server/logs/ canal/canal-server:v1.1.5

查看是否启动成功

在这里插入图片描述

查看example中是否存在异常
若出现类似异常show master status的异常,说明当前配置的canal账号存在数据库权限问题,可在sql执行工具中查看用户权限:

在这里插入图片描述

其中:执行上述SQL(show master status)需要有SUPER或REPLICATION CLIENT权限

二、MySQL配置

查找mysql配置文件

find / -name mysqld.cnf

进入容器编辑

vi /etc/mysql/mysql.conf.d/mysqld.cnf

修改 MySQL 配置文件 my.cnf,开启 binlog 写入功能,并配置模式为 ROW

log-bin=mysql-bin # 开启 binlog

binlog-format=ROW # 选择 ROW 模式

server_id=66 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

重启数据库,查看配置是否生效

mysql> show variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW   |
+---------------+-------+
1 row in set (0.19 sec)
mysql>
mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | ON    |
+---------------+-------+
1 row in set (0.00 sec)
mysql>
mysql> show master status;
+------------------+----------+--------------+------------------+-------------------+
| File             | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+------------------+----------+--------------+------------------+-------------------+
| mysql-bin.000003 |     4230 |              |                  |                   |
+------------------+----------+--------------+------------------+-------------------+
1 row in set (0.00 sec)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

创建用户并授权

mysql> CREATE USER canal IDENTIFIED BY 'canal';
mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%%';
mysql> FLUSH PRIVILEGES;
mysql> show grants for 'canal'@'%%';
+----------------------------------------------------------------------------+
| Grants for canal@%%                                                        |
+----------------------------------------------------------------------------+
| GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO `canal`@`%%` |
+----------------------------------------------------------------------------+
1 row in set (0.00 sec)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

三、springboot中使用

package com.rn.test.demo01.cannal;

import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import com.rn.test.demo01.config.CanalConfig;
import com.rn.test.demo01.vo.CanalVo;
import io.micrometer.common.util.StringUtils;
import jakarta.annotation.Resource;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;


@Component
public class CanalClient implements InitializingBean {
    @Resource
    CanalConfig canalConfig;

    public static void main(String[] args) {
        // 地址、端口、example默认就好(就是instance.properties文件所在文件夹名称)
        CanalConnector connector = CanalConnectors.newSingleConnector(new
                InetSocketAddress("192.168.116.128", 11111), "example", "", "");
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
//            connector.subscribe("management\t_stop_heating,t_recover_heating"); // 监听的库表Perl正则表达式(我这里是test库下所有表)
            connector.subscribe("management\\..*"); // 监听的库表Perl正则表达式
            connector.rollback();
            try {
                while (true) {
                    //尝试从master那边拉去数据batchSize条记录,有多少取多少
                    Message message = connector.getWithoutAck(batchSize);
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) {
                        emptyCount++;
                        System.out.println("empty count : " + emptyCount);
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                        }
                    } else {
                        emptyCount = 0;
                        dataHandle(message.getEntries()); //数据处理
                    }
                    connector.ack(batchId);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
//            catch (InvalidProtocolBufferException e) {
//                connector.connect();
//                e.printStackTrace();
//            }
        } finally {
            connector.disconnect();
        }
    }

    /**
     * 数据处理
     *
     * @param entrys
     */
    private static void dataHandle(List<CanalEntry.Entry> entrys) throws InvalidProtocolBufferException {
        for (CanalEntry.Entry entry : entrys) {
            if (CanalEntry.EntryType.ROWDATA == entry.getEntryType()) {
                CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                CanalEntry.EventType eventType = rowChange.getEventType();
                if (eventType == CanalEntry.EventType.DELETE) {
                    saveDeleteSql(entry);
                } else if (eventType == CanalEntry.EventType.UPDATE) {
                    saveUpdateSql(entry);
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    saveInsertSql(entry);
                }
            }
        }
    }

    /**
     * 保存更新语句
     *
     * @param entry
     */
    private static void saveUpdateSql(CanalEntry.Entry entry) {
        try {
            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
            for (CanalEntry.RowData rowData : rowDatasList) {
                List<CanalEntry.Column> newColumnList = rowData.getAfterColumnsList();
                Map<String, String> data = new HashMap<>();
                newColumnList.stream().forEach(item -> {
                    if (StringUtils.isNotEmpty(item.getValue())) {
                        data.put(lineToHump(item.getName()), item.getValue());
                    }
                });
                List<CanalEntry.Column> oldColumnList = rowData.getBeforeColumnsList();
                CanalVo update = new CanalVo(entry.getHeader().getTableName(), "UPDATE", data, oldColumnList.get(0).getValue());
                System.out.println("更新返回 : " + JSON.toJSONString(update));
            }
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }
    }

    /**
     * 保存删除语句
     *
     * @param entry
     */
    private static void saveDeleteSql(CanalEntry.Entry entry) {
        try {
            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
            for (CanalEntry.RowData rowData : rowDatasList) {
                List<CanalEntry.Column> oldColumnList = rowData.getBeforeColumnsList();
                CanalVo delete = new CanalVo(entry.getHeader().getTableName(), "DELETE", null, oldColumnList.get(0).getValue());
                System.out.println("删除返回 : " + JSON.toJSONString(delete));
            }
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }
    }

    /**
     * 保存插入语句
     *
     * @param entry
     */
    private static void saveInsertSql(CanalEntry.Entry entry) {
        try {
            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
            for (CanalEntry.RowData rowData : rowDatasList) {
                List<CanalEntry.Column> columnList = rowData.getAfterColumnsList();
                Map<String, String> data = new HashMap<>();
                columnList.stream().forEach(item -> {
                    if (StringUtils.isNotEmpty(item.getValue())) {
                        data.put(lineToHump(item.getName()), item.getValue());
                    }
                });
                CanalVo insert = new  CanalVo(entry.getHeader().getTableName(), "INSERT", data, null);
                System.out.println("插入返回 : " + JSON.toJSONString(insert));
            }
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }
    }

    public static String lineToHump(String str) {
        str = str.toLowerCase();
        Matcher matcher = Pattern.compile("_(\\w)").matcher(str);
        StringBuffer sb = new StringBuffer();
        while (matcher.find()) {
            matcher.appendReplacement(sb, matcher.group(1).toUpperCase());
        }
        matcher.appendTail(sb);
        return sb.toString();
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        // 地址、端口、example默认就好(就是instance.properties文件所在文件夹名称)
        CanalConnector connector
                = CanalConnectors.newSingleConnector(new InetSocketAddress(canalConfig.url, canalConfig.port),
                canalConfig.destination, "", "");
        try {
            connector.connect();
            connector.subscribe("management\\management.t_stop_heating,management.t_recover_heating"); // 监听的库表Perl正则表达式(我这里是test库下所有表)
            connector.rollback();
            try {
                while (true) {
                    //尝试从master那边拉去数据batchSize条记录,有多少取多少
                    Message message = connector.getWithoutAck(canalConfig.batchSize);
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) {
                        Thread.sleep(1000); //没有数据则等待1s
                    } else {
                        dataHandle(message.getEntries()); //数据处理
                    }
                    connector.ack(batchId);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (InvalidProtocolBufferException e) {
                connector.connect();
                e.printStackTrace();
            }
        } finally {
            connector.disconnect();
        }
    }
}


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209

其他说明

如果存在无法同步,记得看example目录下的日志进行分析

在这里插入图片描述

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号