赞
踩
问题 | 答案 |
如何保证mysql改动后,立即同步到Redis | canal |
https://github.com/alibaba/canal/wikihttps://github.com/alibaba/canal/wiki
基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
- 数据库镜像
- 数据库实时备份
- 多级索引 (卖家和买家各自分库索引)
- search build
- 业务cache刷新
- 价格变化等重要业务消息
官网 | https://github.com/alibaba/canal/releases/tag/canal-1.1.6https://github.com/alibaba/canal/releases/tag/canal-1.1.6 |
百度网盘 | 链接:https://pan.baidu.com/s/1Hs7JieAZA_q4lmvIdJZgFw?pwd=aqi2 提取码:aqi2 |
- canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
- canal 解析 binary log 对象(原始为 byte 流)
- # 查看mysql 版本
- SELECT VERSION();
-
- # 查看当前主机的二进制日志
- SHOW MASTER status;
-
- # 查看binlog 开启状态
-
- SHOW VARIABLES LIKE 'log_bin'
- # 在mysqld中加入一下内容
- [mysqld]
- log-bin=mysql-bin #开启 binlog
- binlog-format=ROW #选择 ROW 模式
- server_id=1 #配置MySQL replaction需要定义,不要和canal的 slaveId重复
- DROP USER IF EXISTS 'canal'@'%';
- CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
-
- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
-
- FLUSH PRIVILEGES;
-
- SELECT * FROM mysql.`user`
tar -zxvf canal.deployer-1.1.6.tar.gz
如果出现如下错误
Caused by: java.io.IOException: caching_sha2_password Auth failed
com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.negotiate(MysqlConnector.java:260)
- # 修改加密方式
- select host,user,plugin from mysql.user ;
- ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';
-
- CREATE TABLE `t_user` (
- `id` int NOT NULL AUTO_INCREMENT,
- `userName` varchar(255) NOT NULL,
- PRIMARY KEY (`id`)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
- package com.mco.utils;
-
- import cn.hutool.core.util.RandomUtil;
- import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import redis.clients.jedis.GeoCoordinate;
- import redis.clients.jedis.Jedis;
- import redis.clients.jedis.JedisPool;
- import redis.clients.jedis.JedisPoolConfig;
- import redis.clients.jedis.args.GeoUnit;
-
- import java.time.Duration;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.Objects;
-
- /**
- * @author :liao.wei
- * @date :2023/9/18 21:15
- * @package : com.mco.utils
- */
- public class RedisUtils {
- private static Logger logger = LoggerFactory.getLogger(JedisPoolUtil.class);
-
- public static final String REDIS_IP_ADDR = "120.77.64.190";
- public static final String REDIS_PWD = "111111";
-
- public static JedisPool jedisPool;
-
- static {
- JedisPoolConfig poolConfig = new JedisPoolConfig();
- poolConfig.setMaxIdle(8);
- poolConfig.setMinIdle(2);
- poolConfig.setMaxWait(Duration.ofSeconds(30000));
- jedisPool = new JedisPool(poolConfig, REDIS_IP_ADDR, 6379, 10000, REDIS_PWD);
- }
-
- public static Jedis getJedis() throws Exception {
- if (null != jedisPool) {
- return jedisPool.getResource();
- }
- throw new Exception("Jedispool is not ok");
- }
- }
- public class RedisCanalClient {
- public static final Integer _60SECONDS = 60;
- public static final String CANAL_IP_ADDR = "192.168.1.11";
-
- private static void redisInsert(List<Column> columns)
- {
- JSONObject jsonObject = new JSONObject();
- for (Column column : columns)
- {
- System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
- jsonObject.put(column.getName(),column.getValue());
- }
- if(columns.size() > 0)
- {
- try(Jedis jedis = RedisUtils.getJedis())
- {
- jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
- }catch (Exception e){
- e.printStackTrace();
- }
- }
- }
-
-
- private static void redisDelete(List<Column> columns)
- {
- JSONObject jsonObject = new JSONObject();
- for (Column column : columns)
- {
- jsonObject.put(column.getName(),column.getValue());
- }
- if(columns.size() > 0)
- {
- try(Jedis jedis = RedisUtils.getJedis())
- {
- jedis.del(columns.get(0).getValue());
- }catch (Exception e){
- e.printStackTrace();
- }
- }
- }
-
- private static void redisUpdate(List<Column> columns)
- {
- JSONObject jsonObject = new JSONObject();
- for (Column column : columns)
- {
- System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
- jsonObject.put(column.getName(),column.getValue());
- }
- if(columns.size() > 0)
- {
- try(Jedis jedis = RedisUtils.getJedis())
- {
- jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
- System.out.println("---------update after: "+jedis.get(columns.get(0).getValue()));
- }catch (Exception e){
- e.printStackTrace();
- }
- }
- }
-
- public static void printEntry(List<Entry> entrys)
- {
- for (Entry entry : entrys) {
- if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
- continue;
- }
-
- RowChange rowChage = null;
- try {
- //获取变更的row数据
- rowChage = RowChange.parseFrom(entry.getStoreValue());
- } catch (Exception e) {
- throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(),e);
- }
- //获取变动类型
- EventType eventType = rowChage.getEventType();
- System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
- entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
- entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
-
- for (RowData rowData : rowChage.getRowDatasList()) {
- if (eventType == EventType.INSERT) {
- redisInsert(rowData.getAfterColumnsList());
- } else if (eventType == EventType.DELETE) {
- redisDelete(rowData.getBeforeColumnsList());
- } else {//EventType.UPDATE
- redisUpdate(rowData.getAfterColumnsList());
- }
- }
- }
- }
-
-
- public static void main(String[] args)
- {
- System.out.println("---------O(∩_∩)O哈哈~ initCanal() main方法-----------");
-
- //=================================
- // 创建链接canal服务端
- CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(CANAL_IP_ADDR,
- 11111), "example", "", ""); // 这里用户名和密码如果在这写了,会覆盖canal配置文件的账号密码,如果不填从配置文件中读
- int batchSize = 1000;
- //空闲空转计数器
- int emptyCount = 0;
- System.out.println("---------------------canal init OK,开始监听mysql变化------");
- try {
- connector.connect();
- //connector.subscribe(".*\\..*");
- connector.subscribe("test.t_user"); // 设置监听哪个表
- connector.rollback();
- int totalEmptyCount = 10 * _60SECONDS;
- while (emptyCount < totalEmptyCount) {
- System.out.println("我是canal,每秒一次正在监听:"+ UUID.randomUUID().toString());
- Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
- long batchId = message.getId();
- int size = message.getEntries().size();
- if (batchId == -1 || size == 0) {
- emptyCount++;
- try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
- } else {
- //计数器重新置零
- emptyCount = 0;
- printEntry(message.getEntries());
- }
- connector.ack(batchId); // 提交确认
- // connector.rollback(batchId); // 处理失败, 回滚数据
- }
- System.out.println("已经监听了"+totalEmptyCount+"秒,无任何消息,请重启重试......");
- } finally {
- connector.disconnect();
- }
- }
- }
说明:
CANAL_IP_ADDR:canal 服务部署ipInetSocketAddress: 端口可从canal.log 中查看
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <maven.compiler.source>1.8</maven.compiler.source>
- <maven.compiler.target>1.8</maven.compiler.target>
- <junit.version>4.12</junit.version>
- <log4j.version>1.2.17</log4j.version>
- <lombok.version>1.16.18</lombok.version>
- <mysql.version>5.1.47</mysql.version>
- <druid.version>1.1.16</druid.version>
- <mapper.version>4.1.5</mapper.version>
- <mybatis.spring.boot.version>1.3.0</mybatis.spring.boot.version>
- </properties>
- <dependencies>
- <dependency>
- <groupId>redis.clients</groupId>
- <artifactId>jedis</artifactId>
- <version>4.3.1</version>
- </dependency>
- <!--canal-->
- <dependency>
- <groupId>com.alibaba.otter</groupId>
- <artifactId>canal.client</artifactId>
- <version>1.1.0</version>
- </dependency>
- <!--SpringBoot通用依赖模块-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-actuator</artifactId>
- </dependency>
- <!--SpringBoot与Redis整合依赖-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-data-redis</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-pool2</artifactId>
- </dependency>
- <!--SpringBoot与AOP-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-aop</artifactId>
- </dependency>
- <dependency>
- <groupId>org.aspectj</groupId>
- <artifactId>aspectjweaver</artifactId>
- </dependency>
- <!--Mysql数据库驱动-->
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>5.1.47</version>
- </dependency>
- <!--SpringBoot集成druid连接池-->
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>druid-spring-boot-starter</artifactId>
- <version>1.1.10</version>
- </dependency>
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>druid</artifactId>
- <version>${druid.version}</version>
- </dependency>
- <!--mybatis和springboot整合-->
- <dependency>
- <groupId>org.mybatis.spring.boot</groupId>
- <artifactId>mybatis-spring-boot-starter</artifactId>
- <version>${mybatis.spring.boot.version}</version>
- </dependency>
- <!--hutool-->
- <dependency>
- <groupId>cn.hutool</groupId>
- <artifactId>hutool-all</artifactId>
- <version>5.2.3</version>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>${junit.version}</version>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- <version>${log4j.version}</version>
- </dependency>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <version>${lombok.version}</version>
- <optional>true</optional>
- </dependency>
- <!--persistence-->
- <dependency>
- <groupId>javax.persistence</groupId>
- <artifactId>persistence-api</artifactId>
- <version>1.0.2</version>
- </dependency>
- <!--通用Mapper-->
- <dependency>
- <groupId>tk.mybatis</groupId>
- <artifactId>mapper</artifactId>
- <version>${mapper.version}</version>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-autoconfigure</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-pool2</artifactId>
- <version>2.11.1</version>
- </dependency>
- <dependency>
- <groupId>com.baomidou</groupId>
- <artifactId>mybatis-plus-boot-starter</artifactId>
- <version>3.4.1</version>
- </dependency>
- </dependencies>
源码地址https://gitee.com/UniQue006/redis_example.git
- 声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/165197?site推荐阅读
相关标签
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。