赞
踩
目录
3. 监听 canal 存储在 Kafka Topic 中数据
1. 开启并配置MySQL的 BinLog(MySQL 8.0 默认开启)
修改配置:C:\ProgramData\MySQL\MySQL Server 8.0\my.ini
- log-bin="HELONG-bin"
- binlog_format=ROW # 只能配置行模式, 因为 Cannal 不具备将SQL转化成数据的能力
- binlog-do-db=aicloud # 监控 AI Cloud 项目
如果要同步多个项目:
- binlog-do-db=aicloud
- binlog-do-db=aicloud2
- binlog-do-db=aicloud3
2. 重启MySQL服务
3. 赋值数据同步权限
- CREATE USER canal IDENTIFIED BY 'canal';
- GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
- FLUSH PRIVILEGES;
4. 安装并配置 Canal
下载地址:https://github.com/alibaba/canal/releases
① 修改canal.properties
- canal.serverMode=kafka
- canal.mq.servers=127.0.0.1:9092
canal 监控 binlog 日志,binlog 日志的传输默认使用 MySQL 的复制协议(基于 TCP/IP),
可以使用写代码的方式直接从 MySQL 服务器读取数据,此处使用本地 kafka 进行存储。
② 修改instance.properties
- canal.instance.mysql.slaveId=100 # 大于 1 即可
- canal.instance.master.address=127.0.0.1:3306
- canal.mq.topic=ai-cloud-canal-to-kafka
slaveId 表示从节点 id,canal 的执行原理就是伪装成一个从库去主库同步数据
(主节点的 slaveId = 1)
address 配置连接本地的 MySQL
topic 配置数据发送到 Kafka 的某个主题下
5. 拷贝 Jar 包到 lib
将 canal 下 plugin 下的所有 jar 包拷贝到 lib 目录下。
6. 删除 bin 目录下 startup.bat 里的参数
如果启动时报错:
Unrecognized VM option 'PermSize=128m'
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.
删除 -XX:PermSize=128m 参数即可。
7. 启动 canal
打开 cmd ,cd 到 bin 目录下,输入 startup.bat 回车
此时我将这个查询列表接口的数据,存储在 Redis 中:
- /**
- * 获取历史聊天记录(对话/绘图)
- *
- * @param type
- * @return {@link ResponseEntity }
- */
- @RequestMapping("/list")
- public ResponseEntity getHistoryList(Integer type, Integer model) {
- String listCacheKey = RedisUtil.getListCacheKey(SecurityUtil.getCurrentUser().getUid(), model, type);
- Object list = redisTemplate.opsForValue().get(listCacheKey);
- if (ObjectUtil.isNull(list)) {
- LambdaQueryWrapper<Answer> queryWrapper = new LambdaQueryWrapper<>();
- queryWrapper.eq(Answer::getUid, SecurityUtil.getCurrentUser().getUid());
- queryWrapper.eq(Answer::getType, type);
- queryWrapper.eq(Answer::getModel, model);
- queryWrapper.orderByDesc(Answer::getAid);
- List<Answer> answerList = answerService.list(queryWrapper);
- List<Long> userIds = answerList.stream().map(Answer::getUid).collect(Collectors.toList());
- Map<Long, User> userIdMap = userService.selectByIds(userIds).stream().collect(Collectors.toMap(User::getUid, Function.identity()));
- List<AnswerVo> answerVoList = answerList.stream().map(answer -> AnswerVoUtil.getListAnswerVo(answer, userIdMap)).collect(Collectors.toList());
- // 缓存 1 天
- redisTemplate.opsForValue().set(listCacheKey, answerVoList, 1, TimeUnit.DAYS);
- return ResponseEntity.success(answerVoList);
- } else {
- return ResponseEntity.success(list);
- }
- }
- /**
- * 查询列表存储 Redis 缓存
- *
- * @param uid
- * @param model
- * @param type
- * @return {@link String }
- */
- public static String getListCacheKey(Long uid, Integer model, Integer type) {
- return "LIST_CACHE_KEY_" + uid + "_" + model + "_" + type;
- }
首先对数据库中需要缓存的数据进行一些修改操作:
此时,使用 kafka ui(下载地址划到最底下),刷新 kafka 对应 topic 下的 message,就可以看到当前所作出的修改:
执行修改操作:将 “如何学习Spring???”修改成 “如何学习Spring??”
执行删除操作:
由此可见,对数据库的每一个修改操作,都是对应固定格式的一个数据,所以可以监听对应的 topic 并针对 data 中的数据进行一个提取,得到一个 cacheKey,然后删除对应的缓存,使得下一次的查询去访问数据库,并同步缓存。
【代码示例】
- /**
- * canal 监控 binlog 日志,将修改的数据存储 kafka topic 中
- * 监听 kafka topic 中的数据
- *
- * @param data
- * @param ack
- * @throws JsonProcessingException
- */
- @KafkaListener(topics = {KafkaConstant.CANAL_TOPIC})
- public void canalListen(String data, Acknowledgment ack) throws JsonProcessingException {
- HashMap<String, Object> map = objectMapper.readValue(data, HashMap.class);
- if (map.isEmpty()) {
- ack.acknowledge();
- return;
- }
- // 匹配上对应的数据库和数据表
- if (KafkaConstant.TARGET_DATABASE.equals(map.get(KafkaConstant.DATABASE_KEY).toString()) &&
- KafkaConstant.TARGET_TABLE.equals(map.get(KafkaConstant.TABLE_KEY).toString())) {
- // 更新缓存
- List<Map<String, Object>> list = (List<Map<String, Object>>) map.get(KafkaConstant.DATA_KEY);
- if (!CollectionUtils.isEmpty(list)) {
- for (Map<String, Object> answerMap : list) {
- String answerListCacheKey = RedisUtil.getListCacheKey(
- Long.valueOf(answerMap.get("uid").toString()),
- Integer.parseInt(answerMap.get("model").toString()),
- Integer.parseInt(answerMap.get("type").toString()));
- // 删除缓存,让下一次查询走数据库,并同步缓存
- redisTemplate.delete(answerListCacheKey);
- }
- }
- }
- // 手动确认应答
- ack.acknowledge();
- }
- /**
- * canal 同步数据到 kafka
- */
- public static final String CANAL_TOPIC = "ai-cloud-canal-to-kafka";
-
-
- /**
- * 数据库,缓存数据一致性的
- */
-
- public static final String DATABASE_KEY = "database";
-
- public static final String TABLE_KEY = "table";
-
- public static final String DATA_KEY = "data";
-
- public static final String TARGET_DATABASE = "aicloud";
-
- public static final String TARGET_TABLE = "answer";
【补充】
kafka ui 下载地址:https://github.com/provectus/kafka-ui/tags
修改配置
- kafka:
- clusters:
- - name: kafka3_cluster
- bootstrapServers: 127.0.0.1:9092
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。