当前位置:   article > 正文

canal 增量数据同步es 自定义客户端(1)_linux canal

linux canal

1、准备材料

1.1  centos7 

1.2、openJDK 11下载

ES7 依赖 jdk11

  1. 镜像地址
  2. https://mirrors.tuna.tsinghua.edu.cn/Adoptium/
  3. https://mirrors.tuna.tsinghua.edu.cn/Adoptium/11/jdk/x64/linux/

1.3、elasticsearch-no-jdk 下载

  1. https://www.elastic.co/cn/downloads/past-releases/elasticsearch-no-jdk-7-17-16
  2. 不需要自带JDK 版本,因为要安装openJDK 11来支持

1.3.1、 修改配置文件elasticsearch.yml

  1. 修改说明:
  2. https://blog.csdn.net/weixin_46085718/article/details/130302400
  3. https://blog.csdn.net/qq_45939223/article/details/126622699
  4. vim elasticsearch.yml
  5. cluster.name: my-es
  6. node.name: node-1
  7. path.data: /data/midware/es/es_kibana/elasticsearch-7.17.16/data
  8. path.logs: /data/midware/es/es_kibana/elasticsearch-7.17.16/logs
  9. network.host: 0.0.0.0
  10. http.port: 9200
  11. transport.tcp.port: 9300
  12. discovery.seed_hosts: ["127.0.0.1:9200"]
  13. cluster.initial_master_nodes: ["node-1"]
  14. http.cors.enabled: true
  15. http.cors.allow-origin: /.*/
  16. vim jvm.options
  17. -Xms512m
  18. -Xmx512m

1.3.2、指定jdk home

  1. pwd
  2. /data/midware/es/es_kibana/elasticsearch-7.17.16/bin
  3. vi elasticsearch-env
  4. 添加:
  5. JAVA=/data/midware/es/jdk/jdk-11.0.21+9/bin/java

1.3.3、创建es运行用户

  1. useradd es
  2. passwd es

1.3.4、运行ES

  1. pwd
  2. /data/midware/es/es_kibana/elasticsearch-7.17.16/bin
  3. 切换用户
  4. su es
  5. # Starts Elasticsearch in the background 后台启动
  6. ./elasticsearch -d
  7. ps-ef|grep elasticsearch
  8. kill -9 pid

1.3.5、如果运行报错

  1. 3] bootstrap checks failed. You must address the points described in the following [3] lines before starting Elasticsearch.
  2. bootstrap check failure [1] of [3]: max file descriptors [4096] for elasticsearch process is too low, increase to at least [65535]
  3. bootstrap check failure [2] of [3]: max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]
  4. bootstrap check failure [3] of [3]: the default discovery settings are unsuitable for production use; at least one of [discovery.seed_hosts, discovery.seed_providers, cluster.initial_master_nodes] must be configured

1.3.6 解决方式

  1. su root
  2. sudo vi /etc/security/limits.conf
  3. es soft nofile 65536
  4. es hard nofile 65536
  5. sudo vi /etc/security/limits.d/20-nproc.conf
  6. es soft nofile 65536
  7. es hard nofile 65536
  8. sysctl -p # 配置生效
  9. exist 退出当前用户 [退出,再进去,则可以正常读取配置]
  10. su es

1.3.7、访问-安装成功后,输入以下网址:

  1. http://192.168.5.180:9200/?pretty
  2. {
  3. "name" : "hismk-1",
  4. "cluster_name" : "hismk-backend",
  5. "cluster_uuid" : "hMPu08v6QguP0Wc59Pr8pQ",
  6. "version" : {
  7. "number" : "7.17.16",
  8. "build_flavor" : "default",
  9. "build_type" : "tar",
  10. "build_hash" : "2b23fa076334f8d4651aeebe458a955a2ae23218",
  11. "build_date" : "2023-12-08T10:06:54.672540567Z",
  12. "build_snapshot" : false,
  13. "lucene_version" : "8.11.1",
  14. "minimum_wire_compatibility_version" : "6.8.0",
  15. "minimum_index_compatibility_version" : "6.0.0-beta1"
  16. },
  17. "tagline" : "You Know, for Search"
  18. }

1.4、Kibana 下载安装

  1. # 下载
  2. wget https://artifacts.elastic.co/downloads/kibana/kibana-7.17.16-linux-x86_64.tar.gz
  3. pwd
  4. /data/midware/es/es_kibana/kibana-7.17.16
  5. vi config/kibana.yml
  6. #修改端口
  7. server.port: 5601
  8. #对外暴露服务的地址
  9. server.host: "0.0.0.0"
  10. #配置Elasticsearch所在的IP地址
  11. elasticsearch.hosts: ["http://127.0.0.1:9200"]
  12. #安装成功并访问
  13. http://192.168.5.180:5601/app/dev_tools#/console

1.5、IK分词器

  1. https://github.com/medcl/elasticsearch-analysis-ik
  2. release 版本 V7.17.16
  3. https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.17.16/elasticsearch-analysis-ik-7.17.16.zip
  4. 将下载的安装包在es安装目录下的plugins下新建一个ik文件夹、将文件解压。
  5. pwd
  6. /data/midware/es/es_kibana/elasticsearch-7.17.16/plugins
  7. unzip elasticsearch-analysis-ik-7.17.16.zip -d ik # 指定解压目录

1.5.1、重启ES,生效IK插件

1.5.2、分词效果

  1. GET /_analyze
  2. {
  3. "analyzer":"ik_max_word",
  4. "text":"曾舒琪董事长早上好收到刚发的world"
  5. }
  6. GET /_analyze
  7. {
  8. "analyzer":"ik_smart",
  9. "text":"曾舒琪董事长早上好"
  10. }

在kabana中查看

2、canal 安装

canal官网下载

https://github.com/alibaba/canal/releases

本次集成,只关注 canal.deployer-1.1.7

2.1、canal.deployer 服务端

  1. pwd
  2. /data/midware/es/canal/canal_deployer
  3. tar -zxvf canal.deployer-1.1.7.tar.gz -C canal_deployer

2.1.1、实例配置修改

  1. # 拷贝example配置
  2. pwd
  3. /data/midware/es/canal/canal_deployer/conf
  4. /data/midware/es/canal/canal_deployer/conf/example
  5. cp -R example es # 拷贝文件夹配置
  6. # 调整es配置
  7. pwd
  8. /data/midware/es/canal/canal_deployer/conf/es
  9. cd es
  10. vim instance.properties
  11. # position info
  12. # 源数据库地址及端口
  13. canal.instance.master.address=192.168.244.17:3306
  14. # 开始同步的binlog日志文件,注意这里的binlog文件名以你自己查出来的为准
  15. canal.instance.master.journal.name=mysql-bin.000001
  16. # 开始同步的binlog文件位置
  17. canal.instance.master.position=0
  18. # 开始同步时间点 时间戳形式
  19. # https://xsgongju.com/xsgongju/timestamp 时间戳转换
  20. # 2019-01-01 00:00:00
  21. canal.instance.master.timestamp=1546272000000
  22. # 数据库账号密码
  23. canal.instance.dbUsername=canal
  24. canal.instance.dbPassword=canal
  25. # 配置不同步mysql库
  26. # 白名单 某数据库下所有表
  27. canal.instance.filter.regex=xxxxDB\\..*
  28. # 黑名单 mysql 数据不允许同步
  29. canal.instance.filter.black.regex=mysql\..*

2.1.2 、配置详解 -canal.instance.filter.regex

  1. mysql 数据解析关注的表,Perl正则
  2. 表达式.
  3. 多个正则之间以逗号(,)分隔,转义
  4. 符需要双斜杠(W
  5. 常见例子:
  6. 1.所有表: .* or ...*
  7. canal.instance.filter.regex
  8. 2.canal schema下所有表:
  9. canall.*
  10. 3.canal下的以canal打头的表:
  11. canalll.canal.*
  12. 4.canal schema下的一张表:
  13. canal.test1
  14. 5.多个规则组合使用:
  15. canalll..*,mysgl.test1,mysql.test2 (逗号分隔)

2.1.3、MySQL 数据同步起点说明:

  1. - canal.instance.master.journal.name + canal.instance.master.position : 精确指定一个 binlog 位点,进行启动
  2. - canal.instance.master.timestamp : 指定一个时间戳,canal 会自动遍历 MySQL binlog,找到对应时间戳的 binlog 位点后,进行启动
  3. - 不指定任何信息:默认从当前数据库的位点,进行启动。(show master status)

2.1.4、mysql binlog日志管理

  1. 1、mysql show binlog events命令的格式
  2. show binlog events [IN 'log_name'] [FROM pos] [LIMIT [offset,] row_count];
  3. 说明:
  4. 1IN ‘log_name’:指定要查询的binlog文件名(如果省略此参数,则默认指定第一个binlog文件);
  5. 2FROM pos:指定从哪个pos起始点开始查起(如果省略此参数,则从整个文件的第一个pos点开始算);
  6. 3LIMIT【offset】:偏移量(默认为0);
  7. 4)row_count:查询总条数(如果省略,则显示所有行)。
  8. 1.1、查看binlog文件信息
  9. show binary logs; # 查看所有的binlog
  10. show master logs; # 查看当前master的log信息
  11. 1.2、查询第一个binlog日志
  12. show binlog events;
  13. 1.3查询第二个binlog日志
  14. show binlog events in 'mysql-bin.000002';
  15. 1.4、查询mysql-bin.000002文件,从pos点417开始查询
  16. show binlog events in 'mysql-bin.000002' from 417;
  17. 1.5、查询mysql-bin.000002文件,从pos点219开始查询,查询5条记录
  18. show binlog events in 'mysql-bin.000002' from 219 limit 5;
  19. 1.6、查询mysql-bin.000002文件,从pos点219开始查询,查询5条记录,偏移2
  20. show binlog events in 'mysql-bin.000002' from 219 limit 2,5;
  21. binlog管理事件 (默认是Row模式--SHOW VARIABLES LIKE '%binlog_format%';)
  22. 2、Row模式下的事件说明
  23. show binlog events in '/var/lib/mysql/binlog.000005';
  24. show binlog events;
  25. 说明:
  26. 1)每个binlog文件总是以Format Description Event作为开始,以Rotate Event(Stop Event)作为结束。在开始和结束之间,穿插着其他各种事件。
  27. 2TABLE_MAP EVENT:其作用是记录INSERT、DELETE、UPDATE操作的表结构。
  28. 3Write_rows:插入记录。
  29. 4)Update_rows:更新记录。
  30. 5)Rotate Event:表示日志文件的结束。

3、自定义客户端

客户端采用springboot 项目集成:

3.1 技术栈如下:

MyBatis-Plus

Easy-Es

JPA  : 数据字段映射

3.2、架构图如下

3.3 、开发自定义客户端,可以基于下面的demo去调整

easy-es-springboot-demo: springboot集成easy-es使用demo

多表同步集成

canal_client_es: 自定义canal客户端,实现mysql多表同步elasticsearch

application.yml配置

  1. easy-es:
  2. address: 192.168.5.180:49200
  3. canal:
  4. server: 192.168.5.180:11111
  5. destination: es
  6. logging:
  7. level:
  8. top.javatool.canal.client: info
  9. org.springframework.data.convert.CustomConversions: info

目录结构

  1. ├─src
  2. │ ├─main
  3. │ │ ├─java
  4. │ │ │ └─com
  5. │ │ │ └─example
  6. │ │ │ └─eeuse
  7. │ │ │ │ EeUseApplication.java
  8. │ │ │ │
  9. │ │ │ ├─controller
  10. │ │ │ │ TestUseEeController.java
  11. │ │ │ │
  12. │ │ │ ├─domain
  13. │ │ │ │ ├─doc
  14. │ │ │ │ │ Activity.java
  15. │ │ │ │ │ Document.java
  16. │ │ │ │ │
  17. │ │ │ │ ├─entity
  18. │ │ │ │ │ ActivityPO.java
  19. │ │ │ │ │
  20. │ │ │ │ └─mapping
  21. │ │ │ │ ActivityMapping.java
  22. │ │ │ │
  23. │ │ │ ├─eemapper
  24. │ │ │ │ ActivityMapper.java
  25. │ │ │ │ DocumentMapper.java
  26. │ │ │ │
  27. │ │ │ └─handler
  28. │ │ │ ActivityHandler.java
  29. │ │ │
  30. │ │ └─resources
  31. │ │ application.yml
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <parent>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-parent</artifactId>
  8. <version>2.7.13</version>
  9. <relativePath/> <!-- lookup parent from repository -->
  10. </parent>
  11. <groupId>com.example</groupId>
  12. <artifactId>ee-use</artifactId>
  13. <version>0.0.1-SNAPSHOT</version>
  14. <name>ee-use</name>
  15. <description>Demo project for Spring Boot</description>
  16. <properties>
  17. <java.version>1.8</java.version>
  18. <canal-starter.version>1.2.1-RELEASE</canal-starter.version>
  19. <hutool.version>5.8.23</hutool.version>
  20. <mapstruct.version>1.4.2.Final</mapstruct.version>
  21. </properties>
  22. <dependencies>
  23. <dependency>
  24. <groupId>org.springframework.boot</groupId>
  25. <artifactId>spring-boot-starter-web</artifactId>
  26. <exclusions>
  27. <exclusion>
  28. <groupId>org.elasticsearch.client</groupId>
  29. <artifactId>elasticsearch-rest-high-level-client</artifactId>
  30. </exclusion>
  31. <exclusion>
  32. <groupId>org.elasticsearch.client</groupId>
  33. <artifactId>elasticsearch-rest-client</artifactId>
  34. </exclusion>
  35. <exclusion>
  36. <groupId>org.elasticsearch</groupId>
  37. <artifactId>elasticsearch</artifactId>
  38. </exclusion>
  39. </exclusions>
  40. </dependency>
  41. <dependency>
  42. <groupId>org.elasticsearch.client</groupId>
  43. <artifactId>elasticsearch-rest-high-level-client</artifactId>
  44. <version>7.14.0</version>
  45. </dependency>
  46. <dependency>
  47. <groupId>org.elasticsearch.client</groupId>
  48. <artifactId>elasticsearch-rest-client</artifactId>
  49. <version>7.14.0</version>
  50. </dependency>
  51. <dependency>
  52. <groupId>org.elasticsearch</groupId>
  53. <artifactId>elasticsearch</artifactId>
  54. <version>7.14.0</version>
  55. </dependency>
  56. <dependency>
  57. <groupId>org.dromara.easy-es</groupId>
  58. <artifactId>easy-es-boot-starter</artifactId>
  59. <version>2.0.0-beta4</version>
  60. </dependency>
  61. <!-- 2019-07-03 https://mvnrepository.com/artifact/top.javatool/canal-client -->
  62. <dependency>
  63. <groupId>top.javatool</groupId>
  64. <artifactId>canal-spring-boot-starter</artifactId>
  65. <version>${canal-starter.version}</version>
  66. </dependency>
  67. <dependency>
  68. <groupId>cn.hutool</groupId>
  69. <artifactId>hutool-all</artifactId>
  70. <version>${hutool.version}</version>
  71. </dependency>
  72. <dependency>
  73. <groupId>org.mapstruct</groupId>
  74. <artifactId>mapstruct</artifactId>
  75. <version>${mapstruct.version}</version>
  76. </dependency>
  77. <dependency>
  78. <groupId>org.mapstruct</groupId>
  79. <artifactId>mapstruct-processor</artifactId>
  80. <version>${mapstruct.version}</version>
  81. </dependency>
  82. <dependency>
  83. <groupId>org.springframework.boot</groupId>
  84. <artifactId>spring-boot-starter-test</artifactId>
  85. <scope>test</scope>
  86. </dependency>
  87. </dependencies>
  88. <build>
  89. <finalName>${project.artifactId}</finalName>
  90. <plugins>
  91. <plugin>
  92. <groupId>org.springframework.boot</groupId>
  93. <artifactId>spring-boot-maven-plugin</artifactId>
  94. <executions>
  95. <execution>
  96. <goals>
  97. <goal>repackage</goal>
  98. </goals>
  99. </execution>
  100. </executions>
  101. </plugin>
  102. </plugins>
  103. </build>
  104. </project>

3.4、容易出现的问题

监听数据同步字段映射的实体,需要使用JPA的注解,否则字段无法进行映射和赋值,另外数据同步的ActivityHandler,很容易出现字段映射类型转换问题: 比如 mysql int 字段为空,同步binlog 报错, 空字符串无法转为int类型。 

  1. import lombok.Data;
  2. import javax.persistence.Column;
  3. import javax.persistence.Table;
  4. import java.io.Serializable;
  5. import java.util.Date;
  6. @Data
  7. @Table(name = "activity_t")
  8. public class ActivityPO implements Serializable {
  9. @Column(name = "activity_id")
  10. private Long activityId;
  11. /**
  12. * 活动名称
  13. */
  14. @Column(name = "activity_name")
  15. private String activityName;
  16. /**
  17. * 活动描述
  18. */
  19. @Column(name = "activity_desc")
  20. private String activityDesc;
  21. /**
  22. * 活动地址
  23. */
  24. @Column(name = "image_url")
  25. private String imageUrl;
  26. }

4、参考博客

  1. # 多表同步mysql 数据到 ES对应的索引
  2. https://zhuanlan.zhihu.com/p/649400398

5、 小结

选择canal的原因: 实时,增量同步。

数据同步,方案有很多,我们可以选择适合自己的。 canal、DataX等。了解自己同步的目的。

比如当前的mysql数据库不能满足分词检索功能,我们就需要把数据同步到ES,进行数据检索,提升系统性能,缺点就是 安装ES,Kabana,IK 繁琐。(可以采用docker安装)

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/码创造者/article/detail/801322
推荐阅读
相关标签
  

闽ICP备14008679号