当前位置:   article > 正文

docker安装mysql和canal开启监听测试(傻瓜式配置)_docker怎么启动监听端口

docker怎么启动监听端口

安装mysql镜像

1 拉取mysql镜像容器

docker pull mysql:5.7.36

2 创建容器运行并拷贝配置文件

  1. docker run --name mysql5736 \
  2. -p 3306:3306 \
  3. -e MYSQL_ROOT_PASSWORD=root \
  4. -d mysql:5.7.36

3 创建配置文件

  1. mkdir /mydata
  2. mkdir mysql
  3. mkdir conf

4 拷贝配置文件

  1. docker cp mysql5736:/etc/mysql/mysql.conf.d/mysqld.cnf /mydata/mysql/conf
  2. 拷贝后配置文件的修改就不用连接容器后进行修改了,直接进入拷贝位置来修改配置文件内容

5 进入拷贝的配置文件加上如下内容

  1. cd /mydata
  2. cd mysql
  3. cd conf
  4. ls -a #可以看到 mysqld.cnf 这个配置文件
  5. vi mysqld.cnf # 进入配置文件加上如下内容
  6. log-bin=mysql-bin # 开启 binlog
  7. binlog-format=ROW # 选择 ROW 模式
  8. server-id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

6 删除容器重新启动

  1. # 删除之前启动的docker容器
  2. docker rm -f mysql5736
  3. # 运行docker容器命令
  4. # --name mysql5736:容器名为mysql5736
  5. # /etc/localtime:时间同步
  6. # /mydata/mysql/conf:同步配置文件(这个是关键,上面配置的内容就会覆盖容器中的配置文件)
  7. # /mydata/mysql/log:同步日志目录
  8. # /mydata/mysql/data:同步mysql的一些文件内容(对数据进行备份)
  9. # MYSQL_ROOT_PASSWORD=root:默认root的密码是root
  10. docker run --name mysql5736 \
  11. -p 3306:3306 \
  12. -v /etc/localtime:/etc/localtime \
  13. -v /mydata/mysql/conf:/etc/mysql/mysql.conf.d \
  14. -v /mydata/mysql/log:/var/log/mysql \
  15. -v /mydata/mysql/data:/var/lib/mysql \
  16. -e MYSQL_ROOT_PASSWORD=root \
  17. -d mysql:5.7.36

7 进入mysql容器并连接客户端配置账号

  1. # 使用mysql容器中的命令行
  2. docker exec -it mysql5736 /bin/bash
  3. # 使用MySQL命令打开客户端:
  4. mysql -uroot -proot --default-character-set=utf8
  5. # 接着创建一个账户,该账号所有ip都能够访问
  6. grant all privileges on *.* to 'root' @'%' identified by 'root';
  7. # 刷新生效
  8. FLUSH PRIVILEGES;
  9. -- 创建用户名密码都是canal的账号
  10. CREATE USER canal IDENTIFIED BY 'canal';
  11. -- 为canal进行授权读取、client从、客户端权限
  12. grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%';
  13. -- 刷新生效
  14. FLUSH PRIVILEGES;

8 查看binlog日志是否成功开启

show variables like 'log_%';

安装canal镜像

1 拉取canal镜像

docker pull canal/canal-server:v1.1.5

2 启动容器并拷贝配置文件

  1. # 启动容器(目的是拷贝配置文件)
  2. docker run --name canal115 \
  3. -p 11111:11111 \
  4. -id canal/canal-server:v1.1.5
  5. # 提前创建 /mydata/canal/conf目录,接着来执行拷贝配置文件命令
  6. docker cp canal115:/home/admin/canal-server/conf/example/instance.properties /mydata/canal/conf/

3 进入拷贝配置文件位置后修改配置文件

  1. cd #退出
  2. cd /mydata
  3. cd canal
  4. cd conf
  5. ls -a #可以看到instance.properties配置文件
  6. vi instance.properties #进入配置文件做如下修改
  7. # 从机id(与之前mysql配置的id不同)
  8. canal.instance.mysql.slaveId=2
  9. # 指定ip地址也是可以的,查看方式docker inspect --format '{{ .NetworkSettings.IPAddress }}' mysql
  10. # 容器进行了--link,这里mysql5736就是对应的服务地址
  11. canal.instance.master.address=mysql5736:3306
  12. # 用户名与密码
  13. canal.instance.dbUsername=canal
  14. canal.instance.dbPassword=canal
  15. # 匹配全部数据库
  16. canal.instance.filter.regex = .\*\\\\..\*
  17. canal.mq.topic=example # 目的地队列

4 删除原容器重新配置

  1. # 删除原先的canal服务
  2. docker rm -f canal115
  3. # 启动canal服务
  4. # -i:让容器的标准输入保持打开(特别特别重要,注意不要是-d,一定要加上i)
  5. docker run --name canal115 \
  6. -p 11111:11111 \
  7. -v /mydata/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties \
  8. --link mysql5736:mysql5736 \
  9. -id canal/canal-server:v1.1.5

5 进入容器查看日志,如果出现一下内容则开启监控成功

  1. # 进入到docker容器
  2. docker exec -it canal115 /bin/bash
  3. # 打开日志文件
  4. cd canal-server/logs/example/
  5. # 查看日志文件的最后100行内容
  6. tail -100 example.log

 代码测试

  1. package com.tang.webServiceClient.canal;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.alibaba.otter.canal.client.CanalConnector;
  4. import com.alibaba.otter.canal.client.CanalConnectors;
  5. import com.alibaba.otter.canal.protocol.CanalEntry.*;
  6. import com.alibaba.otter.canal.protocol.Message;
  7. import java.net.InetSocketAddress;
  8. import java.util.List;
  9. public class CanalClientTest{
  10. public static void main(String[] args) {
  11. CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.25.158",
  12. 11111), "example", "", "");
  13. int batchSize = 1000;
  14. try {
  15. connector.connect();
  16. connector.subscribe(".*\\..*");
  17. connector.rollback();
  18. while (true) {
  19. Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
  20. long batchId = message.getId();
  21. int size = message.getEntries().size();
  22. if (batchId == -1 || size == 0) {
  23. try {
  24. Thread.sleep(1000);
  25. } catch (InterruptedException e) {
  26. e.printStackTrace();
  27. }
  28. } else {
  29. printEntry(message.getEntries());
  30. }
  31. connector.ack(batchId); // 提交确认
  32. // connector.rollback(batchId); // 处理失败, 回滚数据
  33. }
  34. } finally {
  35. connector.disconnect();
  36. }
  37. }
  38. private static void printEntry( List<Entry> entrys) {
  39. for (Entry entry : entrys) {
  40. if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
  41. continue;
  42. }
  43. RowChange rowChage = null;
  44. try {
  45. rowChage = RowChange.parseFrom(entry.getStoreValue());
  46. } catch (Exception e) {
  47. throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
  48. e);
  49. }
  50. EventType eventType = rowChage.getEventType();
  51. System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
  52. entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
  53. entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
  54. eventType));
  55. for (RowData rowData : rowChage.getRowDatasList()) {
  56. if (eventType == EventType.DELETE) {
  57. redisDelete(rowData.getBeforeColumnsList());
  58. } else if (eventType == EventType.INSERT) {
  59. redisInsert(rowData.getAfterColumnsList());
  60. } else {
  61. System.out.println("-------> before");
  62. printColumn(rowData.getBeforeColumnsList());
  63. System.out.println("-------> after");
  64. printColumn(rowData.getAfterColumnsList());
  65. redisUpdate(rowData.getAfterColumnsList());
  66. }
  67. }
  68. }
  69. }
  70. private static void printColumn( List<Column> columns) {
  71. for (Column column : columns) {
  72. System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
  73. }
  74. }
  75. private static void redisInsert( List<Column> columns){
  76. JSONObject json=new JSONObject();
  77. for (Column column : columns) {
  78. json.put(column.getName(), column.getValue());
  79. }
  80. if(columns.size()>0){
  81. RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString());
  82. }
  83. }
  84. private static void redisUpdate( List<Column> columns){
  85. JSONObject json=new JSONObject();
  86. for (Column column : columns) {
  87. json.put(column.getName(), column.getValue());
  88. }
  89. if(columns.size()>0){
  90. RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString());
  91. }
  92. }
  93. private static void redisDelete( List<Column> columns){
  94. JSONObject json=new JSONObject();
  95. for (Column column : columns) {
  96. json.put(column.getName(), column.getValue());
  97. }
  98. if(columns.size()>0){
  99. RedisUtil.delKey("user:"+ columns.get(0).getValue());
  100. }
  101. }
  102. }

工具类

  1. package com.tang.webServiceClient.canal;
  2. import redis.clients.jedis.Jedis;
  3. import redis.clients.jedis.JedisPool;
  4. import redis.clients.jedis.JedisPoolConfig;
  5. public class RedisUtil {
  6. // Redis服务器IP
  7. private static String ADDR = "192.168.25.158";
  8. // Redis的端口号
  9. private static int PORT = 6379;
  10. // 访问密码
  11. private static String AUTH = "root";
  12. // 可用连接实例的最大数目,默认值为8;
  13. // 如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。
  14. private static int MAX_ACTIVE = 1024;
  15. // 控制一个pool最多有多少个状态为idle(空闲的)的jedis实例,默认值也是8。
  16. private static int MAX_IDLE = 200;
  17. // 等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出JedisConnectionException;
  18. private static int MAX_WAIT = 10000;
  19. // 过期时间
  20. protected static int expireTime = 60 * 60 *24;
  21. // 连接池
  22. protected static JedisPool pool;
  23. /**
  24. * 静态代码,只在初次调用一次
  25. */
  26. static {
  27. JedisPoolConfig config = new JedisPoolConfig();
  28. //最大连接数
  29. config.setMaxTotal(MAX_ACTIVE);
  30. //最多空闲实例
  31. config.setMaxIdle(MAX_IDLE);
  32. //超时时间
  33. config.setMaxWaitMillis(MAX_WAIT);
  34. //
  35. config.setTestOnBorrow(false);
  36. pool = new JedisPool(config, ADDR, PORT, 1000,AUTH);
  37. }
  38. /**
  39. * 获取jedis实例
  40. */
  41. protected static synchronized Jedis getJedis() {
  42. Jedis jedis = null;
  43. try {
  44. jedis = pool.getResource();
  45. } catch (Exception e) {
  46. e.printStackTrace();
  47. }
  48. return jedis;
  49. }
  50. /**
  51. * 释放jedis资源
  52. * @param jedis
  53. * @param isBroken
  54. */
  55. protected static void closeResource(Jedis jedis, boolean isBroken) {
  56. try {
  57. if (isBroken) {
  58. pool.returnBrokenResource(jedis);
  59. } else {
  60. pool.returnResource(jedis);
  61. }
  62. } catch (Exception e) {
  63. }
  64. }
  65. /**
  66. * 是否存在key
  67. * @param key
  68. */
  69. public static boolean existKey(String key) {
  70. Jedis jedis = null;
  71. boolean isBroken = false;
  72. try {
  73. jedis = getJedis();
  74. jedis.select(2);
  75. return jedis.exists(key);
  76. } catch (Exception e) {
  77. isBroken = true;
  78. } finally {
  79. closeResource(jedis, isBroken);
  80. }
  81. return false;
  82. }
  83. /**
  84. * 删除key
  85. * @param key
  86. */
  87. public static void delKey(String key) {
  88. Jedis jedis = null;
  89. boolean isBroken = false;
  90. try {
  91. jedis = getJedis();
  92. jedis.select(2);
  93. jedis.del(key);
  94. } catch (Exception e) {
  95. isBroken = true;
  96. } finally {
  97. closeResource(jedis, isBroken);
  98. }
  99. }
  100. /**
  101. * 取得key的值
  102. * @param key
  103. */
  104. public static String stringGet(String key) {
  105. Jedis jedis = null;
  106. boolean isBroken = false;
  107. String lastVal = null;
  108. try {
  109. jedis = getJedis();
  110. jedis.select(2);
  111. lastVal = jedis.get(key);
  112. jedis.expire(key, expireTime);
  113. } catch (Exception e) {
  114. isBroken = true;
  115. } finally {
  116. closeResource(jedis, isBroken);
  117. }
  118. return lastVal;
  119. }
  120. /**
  121. * 添加string数据
  122. * @param key
  123. * @param value
  124. */
  125. public static String stringSet(String key, String value) {
  126. Jedis jedis = null;
  127. boolean isBroken = false;
  128. String lastVal = null;
  129. try {
  130. jedis = getJedis();
  131. jedis.select(2);
  132. lastVal = jedis.set(key, value);
  133. jedis.expire(key, expireTime);
  134. } catch (Exception e) {
  135. e.printStackTrace();
  136. isBroken = true;
  137. } finally {
  138. closeResource(jedis, isBroken);
  139. }
  140. return lastVal;
  141. }
  142. /**
  143. * 添加hash数据
  144. * @param key
  145. * @param field
  146. * @param value
  147. */
  148. public static void hashSet(String key, String field, String value) {
  149. boolean isBroken = false;
  150. Jedis jedis = null;
  151. try {
  152. jedis = getJedis();
  153. if (jedis != null) {
  154. jedis.select(2);
  155. jedis.hset(key, field, value);
  156. jedis.expire(key, expireTime);
  157. }
  158. } catch (Exception e) {
  159. isBroken = true;
  160. } finally {
  161. closeResource(jedis, isBroken);
  162. }
  163. }
  164. }

匹配规则

  1. connector.subscribe(".*\\..*") //全库全表
  2. connector.subscribe("test\\..*")//指定库全表
  3. connector.subscribe("test.user") //单表
  4. connector.subscribe("test\\..*,test2.user1,test3.user2")//多规则组合使用

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/587328
推荐阅读
相关标签
  

闽ICP备14008679号