赞
踩
我们都知道一个系统最重要的是数据,数据是保存在数据库里。但是很多时候不单止要保存在数据库中,还要同步保存到Elastic Search、HBase、Redis等等。
这时我注意到阿里开源的框架Canal,他可以很方便地同步数据库的增量数据到其他的存储应用。
在很多业务情况下,我们都会在系统中引入ElasticSearch搜索引擎作为做全文检索的优化方案,引入redis缓存作为缓存优化查询显示。
如果数据库数据发生更新,这时候就需要在业务代码中写一段同步更新ElasticSearch的代码,同步更新缓存的代码。
这种数据同步的代码跟业务代码耦合性非常高,并且使得代码的可读性降低,我们能不能把这些数据同步的代码抽出来形成一个独立的模块呢?肯定是可以的。
下面我会以一个CMS文章管理为例来演示canal+RocketMQ实现MySQL与ElasticSearch,redis数据同步。
SpringBoot、canal、RocketMQ、MySQL、ElasticSearch,redis
介绍一下canal,其他的自行学习。
2.1 canal定义
canal
[kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费.。
2.2 canal工作原理
canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
canal 解析 binary log 对象(原始为 byte 流)
canal能做什么
以下参考canal官网。
与其问canal能做什么,不如说数据同步有什么作用。
但是canal的数据同步不是全量的,而是增量。基于binary log增量订阅和消费,canal可以做:
数据库镜像
数据库实时备份
索引构建和实时维护
业务cache(缓存)刷新
带业务逻辑的增量数据处理
2.3 架构
说明:
server代表一个canal运行实例,对应于一个jvm
instance对应于一个数据队列 (1个server对应1…n个instance)
instance模块:
eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
eventStore (数据存储)
metaManager (增量订阅&消费信息管理器)
到这里我们对canal有了一个初步的认识,接下我们就进入实战环节。
3.环境准备
3.1 MySQL 配置
对于自建 MySQL , 需要先开启 Binlog写入功能,配置binlog-format为ROW 模式,my.cnf 中配置如下
[mysqld]
- log-bin=mysql-bin # 开启 binlog
- binlog-format=ROW # 选择 ROW 模式
- server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
**注意:**针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步
授权canal 连接 MySQL 账号具有作为 MySQL slave的权限, 如果已有账户可直接 使用grant 命令授权。
#创建用户名和密码都为canal
- CREATE USER canal IDENTIFIED BY 'canal';
- GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
- FLUSH PRIVILEGES;
3.2 canal的安装和配置
3.2.1 canal.admin安装和配置
canal提供web ui 进行Server管理、Instance管理。
3.2.1.1 下载 canal.admin, 访问 https://github.com/alibaba/canal/releases 页面 , 选择需要的包下载,
wget https://github.com/alibaba/canal/releases/download/canal-XXX/canal.admin-XXX.tar.gz
3.2.1.2 解压完成
我们先配置canal.admin之后。通过web ui来配置 cancal server,这样使用界面操作非常的方便。
3.2.1.3 配置修改
vi conf/application.yml
- server:
- port: 8089
- spring:
- jackson:
- date-format: yyyy-MM-dd HH:mm:ss
- time-zone: GMT+8
-
- spring.datasource:
- address: 127.0.0.1:3306
- database: canal_manager
- username: canal
- password: canal
- driver-class-name: com.mysql.jdbc.Driver
- url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
- hikari:
- maximum-pool-size: 30
- minimum-idle: 1
-
- canal:
- adminUser: admin
- adminPasswd: admin
3.2.1.4 初始化元数据库
初始化元数据库
# 导入初始化SQL
> source conf/canal_manager.sql
初始化SQL脚本里会默认创建canal_manager的数据库,建议使用root等有超级权限的账号进行初始化
canal_manager.sql默认会在conf目录下
3.2.1.5 启动
sh bin/startup.sh
3.2.1.6 启动成功,使用浏览器输入http://ip:8089/ 会跳转到登录界面
使用用户名:admin 密码为:123456 登录
这时候我们的canal.admin就搭建成功了。
3.2.2 下载 canal.deployer, 访问 https://github.com/alibaba/canal/releases页面 , 选择需要的包下载
解压完成
进入conf 目录。
我们先对canal.properties 不做任何修改。
使用canal_local.properties的配置覆盖canal.properties
- # register ip
- canal.register.ip =
-
- # canal admin config
- canal.admin.manager = 127.0.0.1:8089
- canal.admin.port = 11110
- canal.admin.user = admin
- canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
- # admin auto register
- canal.admin.register.auto = true
- canal.admin.register.cluster =
使用如下命令启动canal server
sh bin/startup.sh local
启动成功。同时我们在canal.admin web ui中刷新 server 管理,可以到canal server 已经启动成功。
这时候我们的canal.server 搭建已经成功。
3.2.3 在canal admin ui 中配置Instance管理
3.2.3.1 新建 Instance
选择Instance 管理-> 新建Instance
填写 Instance名称:cms_article
选择 选择所属主机集群
选择 载入模板
修改默认信息
#mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
#改成自己的数据库信息(需要监听的数据库)
canal.instance.defaultDatabaseName = cms-manage
canal.instance.connectionCharset = UTF-8
#table regex 需要过滤的表 这里数据库的中所有表
canal.instance.filter.regex = .\*\\..\*
# MQ 配置 日志数据会发送到cms_article这个topic上
canal.mq.topic=cms_article
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
#单分区处理消息
canal.mq.partition=0
我们这里为了演示之创建一张cms_articla表。
配置好之后,我需要点击保存。此时在Instances 管理中就可以看到此时的实例信息。
3.2.4 修改canal server 的配置文件,选择消息队列处理binlog
canal 1.1.1版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ, 目前默认支持的MQ系统有:
kafka: https://github.com/apache/kafka
RocketMQ : https://github.com/apache/rocketmq
本案例以RocketMQ为例
我们仍然使用web ui 界面操作。点击 server 管理 - > 点击配置
修改配置文件
- # ...
- # 可选项: tcp(默认), kafka, RocketMQ
- canal.serverMode = RocketMQ
- # ...
- # kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092
- canal.mq.servers = 192.168.0.200:9078
- canal.mq.retries = 0
- # flagMessage模式下可以调大该值, 但不要超过MQ消息体大小上限
- canal.mq.batchSize = 16384
- canal.mq.maxRequestSize = 1048576
- # flatMessage模式下请将该值改大, 建议50-200
- canal.mq.lingerMs = 1
- canal.mq.bufferMemory = 33554432
- # Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
- canal.mq.canalBatchSize = 50
- # Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
- canal.mq.canalGetTimeout = 100
- # 是否为flat json格式对象
- canal.mq.flatMessage = false
- canal.mq.compressionType = none
- canal.mq.acks = all
- # kafka消息投递是否使用事务
- canal.mq.transaction = false
修改好之后保存。会自动重启。
此时我们就可以在rocketmq的控制台看到一个cms_article topic已经自动创建了。
3.2.5 配置ElasticSearch启动
我们使用 elasticsearch-head 连接是可以看到节点信息。一会我们就使用 elasticsearch-head 查询es中数据。
4.代码实战
4.1 创建一个springboot 项目
4.2 pom.xml文件
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>rockmq-samples</artifactId>
- <groupId>com.lidong.rocketmq</groupId>
- <version>1.0.0</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
- <artifactId>springboot-canal-rocketmq-es</artifactId>
- <properties>
- <java.version>1.8</java.version>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
- <spring-boot.version>2.2.5.RELEASE</spring-boot.version>
- </properties>
-
- <dependencies>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-spring-boot-starter</artifactId>
- <version>2.0.4</version>
- </dependency>
- </dependencies>
-
- <dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-dependencies</artifactId>
- <version>${spring-boot.version}</version>
- <type>pom</type>
- <scope>import</scope>
- </dependency>
-
- </dependencies>
- </dependencyManagement>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.8.1</version>
- <configuration>
- <source>1.8</source>
- <target>1.8</target>
- <encoding>UTF-8</encoding>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- <version>2.3.0.RELEASE</version>
- <configuration>
- <mainClass>com.lidong.RocketmqSyncSamplesApplication</mainClass>
- </configuration>
- <executions>
- <execution>
- <id>repackage</id>
- <goals>
- <goal>repackage</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-
- </project>
-
spring-boot-starter-data-elasticsearch:操作es依赖库
rocketmq-spring-boot-starter:操作rocketmq依赖库
其他就不过多介绍了。大家一看就明白了。
4.3 application的配置
- server:
- port: 8085
- rocketmq:
- name-server: localhost:9876
- spring:
- data:
- elasticsearch:
- cluster-nodes: localhost:9300
- cluster-name: my-application
- repositories:
- enabled: true
4.4 创建es操作的实体类和仓库类
4.4.1 EsCmsArticle实体类
-
- import org.springframework.data.annotation.Id;
- import org.springframework.data.elasticsearch.annotations.Document;
- import java.io.Serializable;
- import java.util.Date;
-
- /**
- * 文章详情
- *
- * String indexName();//索引库的名称,个人建议以项目的名称命名
- * String type() default "";//类型,个人建议以实体的名称命名
- * short shards() default 5;//默认分区数
- * short replicas() default 1;//每个分区默认的备份数
- * String refreshInterval() default "1s";//刷新间隔
- * String indexStoreType() default "fs";//索引文件存储类型
- *
- **/
- @Document(indexName = "canal-rocketmq-es", type = "cms-article")
- public class EsCmsArticle implements Serializable {
-
- @Id
- private Long courseId;
-
- /** 标题 */
- private String title;
-
- /** 摘要 */
- private String abstractX;
-
- /** 内容 */
- private String content;
-
- /** 年龄段 */
- private String ageRange;
-
- /** 图片 */
- private String image;
-
- /** 查看次数 */
- private Long viewNumber;
-
- /** 作者 */
- private String author;
-
- /** 来源 */
- private String source;
-
- /** 所属分类 */
- private Long classId;
-
- /** 关键字 */
- private String keyWords;
-
- /** 描述 */
- private String description;
-
- /** 文章url */
- private String url;
-
- /**
- * 文章状态
- */
- private Integer status;
-
- /**
- * 创建时间
- */
- private Date createTime;
-
- /**
- * 修改时间
- */
- private Date updateTime;
-
- public void setCourseId(Long courseId)
- {
- this.courseId = courseId;
- }
-
- public Long getCourseId()
- {
- return courseId;
- }
- public void setTitle(String title)
- {
- this.title = title;
- }
-
- public String getTitle()
- {
- return title;
- }
- public void setAbstractX(String abstractX)
- {
- this.abstractX = abstractX;
- }
-
- public String getAbstractX()
- {
- return abstractX;
- }
- public void setContent(String content)
- {
- this.content = content;
- }
-
- public String getContent()
- {
- return content;
- }
- public void setAgeRange(String ageRange)
- {
- this.ageRange = ageRange;
- }
-
- public String getAgeRange()
- {
- return ageRange;
- }
- public void setImage(String image)
- {
- this.image = image;
- }
-
- public String getImage()
- {
- return image;
- }
- public void setViewNumber(Long viewNumber)
- {
- this.viewNumber = viewNumber;
- }
-
- public Long getViewNumber()
- {
- return viewNumber;
- }
- public void setAuthor(String author)
- {
- this.author = author;
- }
-
- public String getAuthor()
- {
- return author;
- }
- public void setSource(String source)
- {
- this.source = source;
- }
-
- public String getSource()
- {
- return source;
- }
- public void setClassId(Long classId)
- {
- this.classId = classId;
- }
-
- public Long getClassId()
- {
- return classId;
- }
- public void setKeyWords(String keyWords)
- {
- this.keyWords = keyWords;
- }
-
- public String getKeyWords()
- {
- return keyWords;
- }
- public void setDescription(String description)
- {
- this.description = description;
- }
-
- public String getDescription()
- {
- return description;
- }
- public void setUrl(String url)
- {
- this.url = url;
- }
-
- public String getUrl()
- {
- return url;
- }
-
-
- public Integer getStatus() {
- return status;
- }
-
- public void setStatus(Integer status) {
- this.status = status;
- }
-
- public Date getCreateTime() {
- return createTime;
- }
-
- public void setCreateTime(Date createTime) {
- this.createTime = createTime;
- }
-
- public Date getUpdateTime() {
- return updateTime;
- }
-
- public void setUpdateTime(Date updateTime) {
- this.updateTime = updateTime;
- }
-
- @Override
- public String toString() {
- return "CmsArticle{" +
- "courseId=" + courseId +
- ", title='" + title + '\'' +
- ", abstractX='" + abstractX + '\'' +
- ", content='" + content + '\'' +
- ", ageRange='" + ageRange + '\'' +
- ", image='" + image + '\'' +
- ", viewNumber=" + viewNumber +
- ", author='" + author + '\'' +
- ", source='" + source + '\'' +
- ", classId=" + classId +
- ", keyWords='" + keyWords + '\'' +
- ", description='" + description + '\'' +
- ", url='" + url + '\'' +
- ", status=" + status +
- ", createTime=" + createTime +
- ", updateTime=" + updateTime +
- '}';
- }
- }
4.4.2 CmsArticleRepository 仓库类
-
-
- import com.alibaba.fastjson.JSON;
- import com.lidong.canal.bean.CanalBean;
- import com.lidong.canal.bean.CmsArticle;
- import com.lidong.canal.es.entity.EsCmsArticle;
- import com.lidong.canal.es.repository.CmsArticleRepository;
- import org.apache.rocketmq.spring.annotation.ConsumeMode;
- import org.apache.rocketmq.spring.annotation.MessageModel;
- import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
- import org.apache.rocketmq.spring.core.RocketMQListener;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.BeanUtils;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- import java.util.List;
- import java.util.Optional;
-
- @Component
- @RocketMQMessageListener(
- topic = "cms_article",
- consumerGroup = "cms-article",
- selectorExpression = "*",
- consumeMode = ConsumeMode.ORDERLY,
- messageModel = MessageModel.CLUSTERING,
- consumeThreadMax = 1
- )
- public class SpringConsumer implements RocketMQListener<String> {
-
- private Logger logger = LoggerFactory.getLogger(SpringConsumer.class.getSimpleName());
-
- @Autowired
- CmsArticleRepository cmsArticleRepository;
-
-
-
- /**
- * 实现方式很简单吧,但是你也看见了代码中就没有消息能够消费是否成功后的确认方式,因为实现的onMessage()方法是个void的,还好看过原始的rocketmq的消费者实现方式,也就是rocketmq-client.jar的实现,它是MessageListener.java类来实现消息监听接收的,而它有2个继承接口类MessageListenerConcurrently.java和MessageListenerOrderly.java,这样就好找了,直接收一下这2个接口的实现类,乖乖,果然找到了在rocket-spring-boot的jar里面,就是DefaultRocketMQListenerContainer.java这个类,看下其中一个实现
- *
- *
- * @param msg
- */
- @Override
- public void onMessage(String msg) {
- System.out.println("接收到消息 -> " + msg);
- CanalBean canalBean = JSON.parseObject(msg, CanalBean.class);
- String table = canalBean.getTable();
- System.out.println(table.toString());
- String type = canalBean.getType();
- System.out.println(type);
- List<CmsArticle> data = canalBean.getData();
- data.stream().forEach(tbTest -> {
- EsCmsArticle esCmsArticle = new EsCmsArticle();
- System.out.println(tbTest.toString());
- if ("UPDATE".equals(type) && "cms_article".equals(table)) {
- Optional<EsCmsArticle> article = cmsArticleRepository.findById(tbTest.getCourseId());
- //删除缓存
- //操作es
-
- if (article.isPresent()) {
- EsCmsArticle cmsArticle = article.get();
- BeanUtils.copyProperties(tbTest, cmsArticle);
- cmsArticleRepository.save(cmsArticle);
- logger.info("id = {} 编辑es成功", cmsArticle.getCourseId());
- } else {
- BeanUtils.copyProperties(tbTest, esCmsArticle);
- cmsArticleRepository.save(esCmsArticle);
- logger.info("id = {} 添加es成功", esCmsArticle.getCourseId());
- }
- } else if ("INSERT".equals(type) && "cms_article".equals(table)) {
- BeanUtils.copyProperties(tbTest, esCmsArticle);
- //添加缓存
- //操作es
- cmsArticleRepository.save(esCmsArticle);
- logger.info("id = {} 添加es成功", esCmsArticle.getCourseId());
- }
- });
-
-
- }
- }
-
-
4.6 SpringBootApplication启动类
-
-
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
-
- @SpringBootApplication
- public class RocketmqToEsSamplesApplication {
-
- public static void main(String[] args) {
- SpringApplication.run(RocketmqToEsSamplesApplication.class, args);
- }
-
- }
4.7 CanalBean类 接收mq的数据实体
- public class CanalBean implements Serializable {
- //数据
- private List<CmsArticle> data;
- //数据库名称
- private String database;
- private long es;
- //递增,从1开始
- private int id;
- //是否是DDL语句
- private boolean isDdl;
- //表结构的字段类型
- private MysqlType mysqlType;
- //UPDATE语句,旧数据
- private List<CmsArticle> old;
- //主键名称
- private List<String> pkNames;
- //sql语句
- private String sql;
- private SqlType sqlType;
- //表名
- private String table;
-
- private long ts;
- //(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等
- private String type;
-
- //get set ...
-
- public class MysqlType implements Serializable {
- private String id;
- private String commodity_name;
- private String commodity_price;
- private String number;
- private String description;
- //get set..
- public class SqlType implements Serializable {
- private int id;
- private int commodity_name;
- private int commodity_price;
- private int number;
- private int description;
- //get set..
- }
-
-
-
- import java.io.Serializable;
- import java.util.Date;
-
- public class CmsArticle implements Serializable {
-
- /** $column.columnComment */
- private Long courseId;
-
- /** 标题 */
- private String title;
-
- /** 摘要 */
- private String abstractX;
-
- /** 内容 */
- private String content;
-
- /** 年龄段 */
- private String ageRange;
-
- /** 图片 */
- private String image;
-
- /** 查看次数 */
- private Long viewNumber;
-
- /** 作者 */
- private String author;
-
- /** 来源 */
- private String source;
-
- /** 所属分类 */
- private Long classId;
-
- /** 关键字 */
- private String keyWords;
-
- /** 描述 */
- private String description;
-
- /** 文章url */
- private String url;
-
- /**
- * 文章状态
- */
- private Integer status;
-
- /**
- * 创建时间
- */
- private Date createTime;
-
- /**
- * 修改时间
- */
- private Date updateTime;
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。