当前位置:   article > 正文

使用canal同步数据到es_canal 同步es

canal 同步es

一.环境准备


mysql 5.7
es7
docker任意版本
jdk 1.8

本文canal安装时,要安装如上软件,es和mysql可以使用docker的镜像解决。

二.配置mysql

        canal是根据binlog来进行同步数据,所以需要开启该功能,设置binlog_format格式为row。对于docker安装的mysql镜像,进入容器内部修改:

docker exec -it mysql /bin/bash

修改配置文件:

vim /etc/mysql/mysql.conf.d/mysqld.cnf

若提示没有vim,安装即可:

apt-get install vim

参照下方配置修改自己的配置文件为:

  1. [mysqld]
  2. # 开启 binlog
  3. log-bin=mysql-bin
  4. # 选择 ROW 模式
  5. binlog-format=ROW
  6. # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
  7. server_id=1

重启mysql,在navicat中,查询是否修改成功:

  1. SHOW VARIABLES LIKE 'log_bin%';
  2. SHOW VARIABLES LIKE 'binlog_format%';

创建canal用户,并授权(或者直接用自己的root账户):

  1. CREATE USER canal IDENTIFIED BY '123456';
  2. GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
  3. FLUSH PRIVILEGES;

最后,创建test数据库。 

三.安装canal-admin

 安装canal客户端为了方便管理,不然设置canal-server时很头痛。

运行如下命令:

docker run -d -p 8089:8089 -e server.port=8089 --restart=always --name canal-admin canal/canal-admin

在浏览器输入:http://ip:8089检查是否安装成功。

默认用户名:admin
默认密码:123456

三.安装canal-server

安装命令:

  1. docker run -d -p 9100:9100 -p 11110:11110 -p 11111:11111 -p 11112:11112 \
  2. -v /home/docker/canal-server/logs:/home/admin/canal-server/logs \
  3. -e canal.admin.manager=127.0.0.1:8089 \
  4. -e canal.admin.port=11110 \
  5. -e canal.admin.user=admin \
  6. -e canal.admin.passwd=4ACFE3202A5FF5CF467898FC58AAB1D615029441 \
  7. --name=canal-server \
  8. --restart=always \
  9. canal/canal-server

进入canal-admin控制台,会自动识别刚刚安装的canal-server,配置canal-server:

点击Instance管理新建一个实例example:

参照如下配置,对example进行配置:

  1. ....
  2. #配置连接的数据库
  3. canal.instance.master.address=你的ip:3306
  4. .....
  5. #配置数据库连接的密码:
  6. canal.instance.dbUsername=canal
  7. canal.instance.dbPassword=123456
  8. .....
  9. #配置要扫描的数据库
  10. canal.instance.filter.regex=数据库名\\..*
  11. #我自己配置的是 canal.instance.filter.regex=.*\\..*
  12. #用官方的配置,不知道为什么增量同步会失败,这里我采用了匹配所有的数据库

四. 安装canal-adapter

官方并没有提供canal-adapter的镜像,可能是不容易配置,当初这个配置我修改了两天(脏话),可能官方也知道这个adapter容易出错,从官网下载对应的文件,解压,修改conf文件夹下面的application.yml文件:

  1. server:
  2. port: 8081
  3. spring:
  4. jackson:
  5. date-format: yyyy-MM-dd HH:mm:ss
  6. time-zone: GMT+8
  7. default-property-inclusion: non_null
  8. canal.conf:
  9. mode: tcp # 客户端的模式,可选tcp kafka rocketMQ
  10. flatMessage: true # 扁平message开关, 是否以json字符串形式投递数据, 仅在kafka/rocketMQ模式下有效
  11. zookeeperHosts: # 对应集群模式下的zk地址
  12. syncBatchSize: 1000 # 每次同步的批数量
  13. retries: 0 # 重试次数, -1为无限重试
  14. timeout: # 同步超时时间, 单位毫秒
  15. accessKey:
  16. secretKey:
  17. consumerProperties:
  18. # canal tcp consumer
  19. canal.tcp.server.host: 127.0.0.1:11111 #设置canal-server的地址
  20. canal.tcp.zookeeper.hosts:
  21. canal.tcp.batch.size: 500
  22. canal.tcp.username:
  23. canal.tcp.password:
  24. srcDataSources: # 源数据库配置
  25. defaultDS:
  26. url: jdbc:mysql://ip:3306/test?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf8&useSSL=false&allowMultiQueries=true #测试数据库连接
  27. username: canal #数据库账号
  28. password: 123456 #数据库密码
  29. canalAdapters: # 适配器列表
  30. - instance: example # canal实例名或者MQ topic名
  31. groups: # 分组列表
  32. - groupId: g1 # 分组id, 如果是MQ模式将用到该值
  33. outerAdapters:
  34. - name: logger # 日志打印适配器
  35. - name: es7 # ES同步适配器
  36. hosts: http://ip:9200 # ES连接地址
  37. properties:
  38. mode: rest # 模式可选transport(9300) 或者 rest(9200)
  39. #security.auth: elastic:123456 # 连接es的用户和密码,仅rest模式使用
  40. cluster.name: elasticsearch # ES集群名称, 与es目录下 elasticsearch.yml文件cluster.name对应

进入bin目录,启动adapter,课进入log文件夹查看是否启动成功。

sh startup.sh

进入conf/es7,编辑配置文件,person.yml(必须是yml,文件名没有要求)

  1. dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值
  2. destination: example # canal的instance或者MQ的topic
  3. groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据
  4. esMapping:
  5. _index: book # es 的索引名称
  6. _id: _id # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配
  7. sql: "SELECT
  8. tb.id AS _id,
  9. tb.title,
  10. tb.isbn,
  11. tb.author,
  12. tb.publisher_name as publisherName
  13. FROM
  14. book tb" # sql映射
  15. etlCondition: "where p.id>={}" #etl的条件参数
  16. commitBatch: 3000 # 提交批大小

创建book表:

  1. CREATE TABLE `book` (
  2. `id` bigint unsigned NOT NULL AUTO_INCREMENT,
  3. `title` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '题名',
  4. `isbn` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT 'isbn',
  5. `author` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '作者',
  6. `publisher_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '出版社名',
  7. PRIMARY KEY (`id`) USING BTREE
  8. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC

开启增量同步:

curl http://127.0.0.1:8081/syncSwitch/example/on -X PUT

 example是你的destination,可以根据自己的实际情况修改。

查询实例example的状态:

curl http://127.0.0.1:8081/syncSwitch/example

确保自己的实例是on的状态,不然就只能去log里面查:

{"stauts":"on"}

 插入数据进行测试:

INSERT INTO `book`( `title`, `isbn`, `author`, `publisher_name`) VALUES (  '大聪明', '996996', '1111', '996');

log日志李输出这样的话或者查询es已经同步则大功告成:

  1. 2023-12-15 13:51:22.877 [pool-2-thread-1] DEBUG
  2. c.a.o.canal.client.adapter.es.core.service.ESSyncService - DML:
  3. {"data":[{"id":10,"title":"1","isbn":"1","author":"11","publisher_name":"1"}],
  4. "database":"test","destination":"example","es":1702619482000,"groupId":"g1","isDdl":false,"old":null,
  5. "pkNames":["id"],"sql":"","table":"book","ts":1702619482876,"type":"INSERT"}

五.常见错误

1. java.sql.SQLException: null, message from server: is blocked because of many connection errors; unblock with ‘mysqladmin flush-hosts’”

同一个ip在短时间内产生太多中断的数据库连接而导致的阻塞,执行:

flush hosts;

2. IllegalStateException: Extension instance(name: es7, class: interface com.alibaba.otter.canal.client.adapter.OuterAdapter) could not be instantiated: class could not be found


adapter的配置文件错误,修改配置文件中es的配置中的name为es7即可解决

3. IllegalArgumentException: Illegal character in scheme name at index 0: 

配置文件中使用的mode的方式为rest,要在hosts的ip前面加上http://,即:

hosts:http://ip:9200

4. com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource

官方代码中的包冲突,需要去官网下载canal-adapter的源文件,将escore/pom.xml的druid范围设置为provided,并重新打包,上传到服务器的adapter的plugin,替换对应的文件,并使用chmod 777对其设置权限:

  1. <dependency>
  2. <groupId>com.alibaba</groupId>
  3. <artifactId>druid</artifactId>
  4. <scope>provided</scope>
  5. </dependency>

我主页上传了打包好的1.15的jar包,下载不需要积分,各位读者可自行下载:

【免费】canal-adapter1.15德鲁伊连接池报错资源-CSDN文库

5.adapter出现something goes wrong when starting up the canal client adapters: java.lang.NullPointerException: null

配置文件某个地方出错,需要修改application.yml配置文件。可以复制本文上方的配置文件进行修改。

安装canal-adapter按了两天,真就踩了不少的坑。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/爱喝兽奶帝天荒/article/detail/801339
推荐阅读
  

闽ICP备14008679号