赞
踩
安装mysql镜像
1 拉取mysql镜像容器
docker pull mysql:5.7.36
2 创建容器运行并拷贝配置文件
- docker run --name mysql5736 \
- -p 3306:3306 \
- -e MYSQL_ROOT_PASSWORD=root \
- -d mysql:5.7.36
3 创建配置文件
- mkdir /mydata
- mkdir mysql
- mkdir conf
4 拷贝配置文件
- docker cp mysql5736:/etc/mysql/mysql.conf.d/mysqld.cnf /mydata/mysql/conf
- 拷贝后配置文件的修改就不用连接容器后进行修改了,直接进入拷贝位置来修改配置文件内容
5 进入拷贝的配置文件加上如下内容
- cd /mydata
- cd mysql
- cd conf
- ls -a #可以看到 mysqld.cnf 这个配置文件
- vi mysqld.cnf # 进入配置文件加上如下内容
-
- log-bin=mysql-bin # 开启 binlog
- binlog-format=ROW # 选择 ROW 模式
- server-id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
6 删除容器重新启动
- # 删除之前启动的docker容器
- docker rm -f mysql5736
-
- # 运行docker容器命令
- # --name mysql5736:容器名为mysql5736
- # /etc/localtime:时间同步
- # /mydata/mysql/conf:同步配置文件(这个是关键,上面配置的内容就会覆盖容器中的配置文件)
- # /mydata/mysql/log:同步日志目录
- # /mydata/mysql/data:同步mysql的一些文件内容(对数据进行备份)
- # MYSQL_ROOT_PASSWORD=root:默认root的密码是root
- docker run --name mysql5736 \
- -p 3306:3306 \
- -v /etc/localtime:/etc/localtime \
- -v /mydata/mysql/conf:/etc/mysql/mysql.conf.d \
- -v /mydata/mysql/log:/var/log/mysql \
- -v /mydata/mysql/data:/var/lib/mysql \
- -e MYSQL_ROOT_PASSWORD=root \
- -d mysql:5.7.36
7 进入mysql容器并连接客户端配置账号
- # 使用mysql容器中的命令行
- docker exec -it mysql5736 /bin/bash
-
- # 使用MySQL命令打开客户端:
- mysql -uroot -proot --default-character-set=utf8
-
- # 接着创建一个账户,该账号所有ip都能够访问
- grant all privileges on *.* to 'root' @'%' identified by 'root';
-
- # 刷新生效
- FLUSH PRIVILEGES;
-
-
- -- 创建用户名密码都是canal的账号
- CREATE USER canal IDENTIFIED BY 'canal';
-
- -- 为canal进行授权读取、client从、客户端权限
- grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%';
-
- -- 刷新生效
- FLUSH PRIVILEGES;
8 查看binlog日志是否成功开启
show variables like 'log_%';
安装canal镜像
1 拉取canal镜像
docker pull canal/canal-server:v1.1.5
2 启动容器并拷贝配置文件
- # 启动容器(目的是拷贝配置文件)
- docker run --name canal115 \
- -p 11111:11111 \
- -id canal/canal-server:v1.1.5
-
- # 提前创建 /mydata/canal/conf目录,接着来执行拷贝配置文件命令
- docker cp canal115:/home/admin/canal-server/conf/example/instance.properties /mydata/canal/conf/
3 进入拷贝配置文件位置后修改配置文件
- cd #退出
- cd /mydata
- cd canal
- cd conf
- ls -a #可以看到instance.properties配置文件
- vi instance.properties #进入配置文件做如下修改
-
-
- # 从机id(与之前mysql配置的id不同)
- canal.instance.mysql.slaveId=2
-
- # 指定ip地址也是可以的,查看方式docker inspect --format '{{ .NetworkSettings.IPAddress }}' mysql
- # 容器进行了--link,这里mysql5736就是对应的服务地址
- canal.instance.master.address=mysql5736:3306
-
- # 用户名与密码
- canal.instance.dbUsername=canal
- canal.instance.dbPassword=canal
-
- # 匹配全部数据库
- canal.instance.filter.regex = .\*\\\\..\*
-
- canal.mq.topic=example # 目的地队列
4 删除原容器重新配置
- # 删除原先的canal服务
- docker rm -f canal115
-
- # 启动canal服务
- # -i:让容器的标准输入保持打开(特别特别重要,注意不要是-d,一定要加上i)
- docker run --name canal115 \
- -p 11111:11111 \
- -v /mydata/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties \
- --link mysql5736:mysql5736 \
- -id canal/canal-server:v1.1.5
5 进入容器查看日志,如果出现一下内容则开启监控成功
- # 进入到docker容器
- docker exec -it canal115 /bin/bash
-
- # 打开日志文件
- cd canal-server/logs/example/
-
- # 查看日志文件的最后100行内容
- tail -100 example.log
代码测试
- package com.tang.webServiceClient.canal;
-
- import com.alibaba.fastjson.JSONObject;
- import com.alibaba.otter.canal.client.CanalConnector;
- import com.alibaba.otter.canal.client.CanalConnectors;
- import com.alibaba.otter.canal.protocol.CanalEntry.*;
- import com.alibaba.otter.canal.protocol.Message;
- import java.net.InetSocketAddress;
- import java.util.List;
-
- public class CanalClientTest{
- public static void main(String[] args) {
- CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.25.158",
- 11111), "example", "", "");
- int batchSize = 1000;
- try {
- connector.connect();
- connector.subscribe(".*\\..*");
- connector.rollback();
- while (true) {
- Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
- long batchId = message.getId();
- int size = message.getEntries().size();
- if (batchId == -1 || size == 0) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- } else {
- printEntry(message.getEntries());
- }
- connector.ack(batchId); // 提交确认
- // connector.rollback(batchId); // 处理失败, 回滚数据
- }
- } finally {
- connector.disconnect();
- }
- }
-
- private static void printEntry( List<Entry> entrys) {
- for (Entry entry : entrys) {
- if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
- continue;
- }
- RowChange rowChage = null;
- try {
- rowChage = RowChange.parseFrom(entry.getStoreValue());
- } catch (Exception e) {
- throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
- e);
- }
- EventType eventType = rowChage.getEventType();
- System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
- entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
- entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
- eventType));
-
- for (RowData rowData : rowChage.getRowDatasList()) {
- if (eventType == EventType.DELETE) {
- redisDelete(rowData.getBeforeColumnsList());
- } else if (eventType == EventType.INSERT) {
- redisInsert(rowData.getAfterColumnsList());
- } else {
- System.out.println("-------> before");
- printColumn(rowData.getBeforeColumnsList());
- System.out.println("-------> after");
- printColumn(rowData.getAfterColumnsList());
- redisUpdate(rowData.getAfterColumnsList());
- }
- }
- }
- }
-
- private static void printColumn( List<Column> columns) {
- for (Column column : columns) {
- System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
- }
- }
-
- private static void redisInsert( List<Column> columns){
- JSONObject json=new JSONObject();
- for (Column column : columns) {
- json.put(column.getName(), column.getValue());
- }
- if(columns.size()>0){
- RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString());
- }
- }
-
- private static void redisUpdate( List<Column> columns){
- JSONObject json=new JSONObject();
- for (Column column : columns) {
- json.put(column.getName(), column.getValue());
- }
- if(columns.size()>0){
- RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString());
- }
- }
-
- private static void redisDelete( List<Column> columns){
- JSONObject json=new JSONObject();
- for (Column column : columns) {
- json.put(column.getName(), column.getValue());
- }
- if(columns.size()>0){
- RedisUtil.delKey("user:"+ columns.get(0).getValue());
- }
- }
- }
工具类
- package com.tang.webServiceClient.canal;
-
- import redis.clients.jedis.Jedis;
- import redis.clients.jedis.JedisPool;
- import redis.clients.jedis.JedisPoolConfig;
-
- public class RedisUtil {
-
- // Redis服务器IP
- private static String ADDR = "192.168.25.158";
-
- // Redis的端口号
- private static int PORT = 6379;
-
- // 访问密码
- private static String AUTH = "root";
-
- // 可用连接实例的最大数目,默认值为8;
- // 如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。
- private static int MAX_ACTIVE = 1024;
-
- // 控制一个pool最多有多少个状态为idle(空闲的)的jedis实例,默认值也是8。
- private static int MAX_IDLE = 200;
-
- // 等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出JedisConnectionException;
- private static int MAX_WAIT = 10000;
-
- // 过期时间
- protected static int expireTime = 60 * 60 *24;
-
- // 连接池
- protected static JedisPool pool;
-
- /**
- * 静态代码,只在初次调用一次
- */
- static {
- JedisPoolConfig config = new JedisPoolConfig();
- //最大连接数
- config.setMaxTotal(MAX_ACTIVE);
- //最多空闲实例
- config.setMaxIdle(MAX_IDLE);
- //超时时间
- config.setMaxWaitMillis(MAX_WAIT);
- //
- config.setTestOnBorrow(false);
- pool = new JedisPool(config, ADDR, PORT, 1000,AUTH);
- }
-
- /**
- * 获取jedis实例
- */
- protected static synchronized Jedis getJedis() {
- Jedis jedis = null;
- try {
- jedis = pool.getResource();
- } catch (Exception e) {
- e.printStackTrace();
- }
- return jedis;
- }
-
- /**
- * 释放jedis资源
- * @param jedis
- * @param isBroken
- */
- protected static void closeResource(Jedis jedis, boolean isBroken) {
- try {
- if (isBroken) {
- pool.returnBrokenResource(jedis);
- } else {
- pool.returnResource(jedis);
- }
- } catch (Exception e) {
-
- }
- }
-
- /**
- * 是否存在key
- * @param key
- */
- public static boolean existKey(String key) {
- Jedis jedis = null;
- boolean isBroken = false;
- try {
- jedis = getJedis();
- jedis.select(2);
- return jedis.exists(key);
- } catch (Exception e) {
- isBroken = true;
- } finally {
- closeResource(jedis, isBroken);
- }
- return false;
- }
-
- /**
- * 删除key
- * @param key
- */
- public static void delKey(String key) {
- Jedis jedis = null;
- boolean isBroken = false;
- try {
- jedis = getJedis();
- jedis.select(2);
- jedis.del(key);
- } catch (Exception e) {
- isBroken = true;
- } finally {
- closeResource(jedis, isBroken);
- }
- }
-
- /**
- * 取得key的值
- * @param key
- */
- public static String stringGet(String key) {
- Jedis jedis = null;
- boolean isBroken = false;
- String lastVal = null;
- try {
- jedis = getJedis();
- jedis.select(2);
- lastVal = jedis.get(key);
- jedis.expire(key, expireTime);
- } catch (Exception e) {
- isBroken = true;
- } finally {
- closeResource(jedis, isBroken);
- }
- return lastVal;
- }
-
- /**
- * 添加string数据
- * @param key
- * @param value
- */
- public static String stringSet(String key, String value) {
- Jedis jedis = null;
- boolean isBroken = false;
- String lastVal = null;
- try {
- jedis = getJedis();
- jedis.select(2);
- lastVal = jedis.set(key, value);
- jedis.expire(key, expireTime);
- } catch (Exception e) {
- e.printStackTrace();
- isBroken = true;
- } finally {
- closeResource(jedis, isBroken);
- }
- return lastVal;
- }
-
- /**
- * 添加hash数据
- * @param key
- * @param field
- * @param value
- */
- public static void hashSet(String key, String field, String value) {
- boolean isBroken = false;
- Jedis jedis = null;
- try {
- jedis = getJedis();
- if (jedis != null) {
- jedis.select(2);
- jedis.hset(key, field, value);
- jedis.expire(key, expireTime);
- }
- } catch (Exception e) {
- isBroken = true;
- } finally {
- closeResource(jedis, isBroken);
- }
- }
- }
匹配规则
- connector.subscribe(".*\\..*") //全库全表
-
-
- connector.subscribe("test\\..*")//指定库全表
-
-
- connector.subscribe("test.user") //单表
-
-
- connector.subscribe("test\\..*,test2.user1,test3.user2")//多规则组合使用
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。