当前位置:   article > 正文

数据采集工具之Canal

数据采集工具之Canal

本文主要介绍canal采集mysql数据的tcp、datahub(kafka)模式如何实现

1、下载canal

https://aliyun-datahub.oss-cn-hangzhou.aliyuncs.com/tools/canal.deployer-1.1.5-SNAPSHOT.tar.gz

canal的原理类似于mysql的主从复制,canal模拟的是从节点拉取主节点的binlog数据 

2、TCP模式的实现

a、canal.properties

打开看看即可,不需要调整

  1. #################################################
  2. ######### common argument #############
  3. #################################################
  4. # tcp bind ip
  5. canal.ip =
  6. # register ip to zookeeper
  7. canal.register.ip =
  8. canal.port = 11111
  9. canal.metrics.pull.port = 11112
  10. # canal instance user/passwd
  11. # canal.user = canal
  12. # canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
  13. # canal admin config
  14. #canal.admin.manager = 127.0.0.1:8089
  15. canal.admin.port = 11110
  16. canal.admin.user = admin
  17. canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
  18. canal.zkServers =
  19. # flush data to zk
  20. canal.zookeeper.flush.period = 1000
  21. canal.withoutNetty = false
  22. # tcp, kafka, rocketMQ, rabbitMQ
  23. canal.serverMode = tcp
  24. # flush meta cursor/parse position to file
  25. canal.file.data.dir = ${canal.conf.dir}
  26. canal.file.flush.period = 1000
  27. ## memory store RingBuffer size, should be Math.pow(2,n)
  28. canal.instance.memory.buffer.size = 16384
  29. ## memory store RingBuffer used memory unit size , default 1kb
  30. canal.instance.memory.buffer.memunit = 1024
  31. ## meory store gets mode used MEMSIZE or ITEMSIZE
  32. canal.instance.memory.batch.mode = MEMSIZE
  33. canal.instance.memory.rawEntry = true
  34. ## detecing config
  35. canal.instance.detecting.enable = false
  36. #canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
  37. canal.instance.detecting.sql = select 1
  38. canal.instance.detecting.interval.time = 3
  39. canal.instance.detecting.retry.threshold = 3
  40. canal.instance.detecting.heartbeatHaEnable = false
  41. # support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
  42. canal.instance.transaction.size = 1024
  43. # mysql fallback connected to new master should fallback times
  44. canal.instance.fallbackIntervalInSeconds = 60
  45. # network config
  46. canal.instance.network.receiveBufferSize = 16384
  47. canal.instance.network.sendBufferSize = 16384
  48. canal.instance.network.soTimeout = 30
  49. # binlog filter config
  50. canal.instance.filter.druid.ddl = true
  51. canal.instance.filter.query.dcl = false
  52. canal.instance.filter.query.dml = false
  53. canal.instance.filter.query.ddl = false
  54. canal.instance.filter.table.error = false
  55. canal.instance.filter.rows = false
  56. canal.instance.filter.transaction.entry = false
  57. # binlog format/image check
  58. canal.instance.binlog.format = ROW,STATEMENT,MIXED
  59. canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
  60. # binlog ddl isolation
  61. canal.instance.get.ddl.isolation = false
  62. # parallel parser config
  63. canal.instance.parser.parallel = true
  64. ## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
  65. #canal.instance.parser.parallelThreadSize = 16
  66. ## disruptor ringbuffer size, must be power of 2
  67. canal.instance.parser.parallelBufferSize = 256
  68. # table meta tsdb info
  69. canal.instance.tsdb.enable = true
  70. canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
  71. canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
  72. canal.instance.tsdb.dbUsername = canal
  73. canal.instance.tsdb.dbPassword = canal
  74. # dump snapshot interval, default 24 hour
  75. canal.instance.tsdb.snapshot.interval = 24
  76. # purge snapshot expire , default 360 hour(15 days)
  77. canal.instance.tsdb.snapshot.expire = 360
  78. #################################################
  79. ######### destinations #############
  80. #################################################
  81. canal.destinations = example ##这里可以设置多个逗号分开
  82. # conf root dir
  83. canal.conf.dir = ../conf
  84. # auto scan instance dir add/remove and start/stop instance
  85. canal.auto.scan = true
  86. canal.auto.scan.interval = 5
  87. canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
  88. #canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
  89. canal.instance.global.mode = spring
  90. canal.instance.global.lazy = false
  91. canal.instance.global.manager.address = ${canal.admin.manager}
  92. #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
  93. canal.instance.global.spring.xml = classpath:spring/file-instance.xml
  94. #canal.instance.global.spring.xml = classpath:spring/default-instance.xml
  95. ##################################################
  96. ######### MQ Properties #############
  97. ##################################################
  98. canal.mq.flat.message = true
  99. canal.mq.database.hash = true
  100. canal.mq.parallel.thread.size = 8
  101. canal.mq.canal.batch.size = 50
  102. canal.mq.canal.fetch.timeout = 100
  103. # Set this value to "cloud", if you want open message trace feature in aliyun.
  104. canal.mq.access.channel = local
  105. # aliyun ak/sk , support rds/mq
  106. canal.aliyun.accessKey =
  107. canal.aliyun.secretKey =
  108. canal.aliyun.uid=
  109. ##################################################
  110. ######### Kafka #############
  111. ##################################################
  112. kafka.bootstrap.servers = 127.0.0.1:9092
  113. kafka.acks = all
  114. kafka.compression.type = none
  115. kafka.batch.size = 16384
  116. kafka.linger.ms = 1
  117. kafka.max.request.size = 1048576
  118. kafka.buffer.memory = 33554432
  119. kafka.max.in.flight.requests.per.connection = 1
  120. kafka.retries = 0
  121. canal.mq.kafka.kerberos.enable = false
  122. canal.mq.kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
  123. canal.mq.kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"
  124. ##################################################
  125. ######### RocketMQ #############
  126. ##################################################
  127. rocketmq.producer.group = test
  128. rocketmq.enable.message.trace = false
  129. rocketmq.customized.trace.topic =
  130. rocketmq.namespace =
  131. rocketmq.namesrv.addr = 127.0.0.1:9876
  132. rocketmq.retry.times.when.send.failed = 0
  133. rocketmq.vip.channel.enabled = false
  134. ##################################################
  135. ######### RabbitMQ #############
  136. ##################################################
  137. rabbitmq.host =
  138. rabbitmq.virtual.host =
  139. rabbitmq.exchange =
  140. rabbitmq.username =

 b、example/instance.properties

canal.instance.master.address=192.168.140.1:3306  ###修改为自己的mysql信息

  1. #################################################
  2. ## mysql serverId , v1.0.26+ will autoGen
  3. # canal.instance.mysql.slaveId=0
  4. # enable gtid use true/false
  5. canal.instance.gtidon=false
  6. # position info
  7. canal.instance.master.address=192.168.140.1:3306 ###修改为自己的mysql信息
  8. canal.instance.master.journal.name=
  9. canal.instance.master.position=
  10. canal.instance.master.timestamp=
  11. canal.instance.master.gtid=
  12. # rds oss binlog
  13. canal.instance.rds.accesskey=
  14. canal.instance.rds.secretkey=
  15. canal.instance.rds.instanceId=
  16. # table meta tsdb info
  17. canal.instance.tsdb.enable=true
  18. #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
  19. #canal.instance.tsdb.dbUsername=canal
  20. #canal.instance.tsdb.dbPassword=canal
  21. #canal.instance.standby.address =
  22. #canal.instance.standby.journal.name =
  23. #canal.instance.standby.position =
  24. #canal.instance.standby.timestamp =
  25. #canal.instance.standby.gtid=
  26. # username/password
  27. canal.instance.dbUsername=flink
  28. canal.instance.dbPassword=flink
  29. canal.instance.connectionCharset = UTF-8
  30. # enable druid Decrypt database password
  31. canal.instance.enableDruid=false
  32. #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
  33. # table regex
  34. canal.instance.filter.regex=.*\\..*
  35. # table black regex
  36. canal.instance.filter.black.regex=
  37. # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
  38. #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
  39. # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
  40. #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
  41. # mq config
  42. canal.mq.topic=example
  43. # dynamic topic route by schema or table regex
  44. #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
  45. canal.mq.partition=0
  46. # hash partition config
  47. #canal.mq.partitionsNum=3
  48. #canal.mq.partitionHash=test.table:id^name,.*\\..*
  49. #################################################

启动:bin/startup.sh

jps:

到此,canal服务端配置完成 

 c、canal客户端开发

依赖

  1. <dependency>
  2. <groupId>com.alibaba.otter</groupId>
  3. <artifactId>canal.client</artifactId>
  4. <version>1.1.2</version>
  5. </dependency>

开发代码:

  1. package com.tbea;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.alibaba.otter.canal.client.CanalConnector;
  4. import com.alibaba.otter.canal.client.CanalConnectors;
  5. import com.alibaba.otter.canal.protocol.CanalEntry;
  6. import com.alibaba.otter.canal.protocol.Message;
  7. import com.google.protobuf.ByteString;
  8. import com.google.protobuf.InvalidProtocolBufferException;
  9. import java.net.InetSocketAddress;
  10. import java.util.List;
  11. public class CanalClient {
  12. public static void main(String[] args) throws InterruptedException, InvalidProtocolBufferException {
  13. //获取链接
  14. CanalConnector canalConnector = CanalConnectors.newSingleConnector(
  15. new InetSocketAddress("192.168.140.129", 11111)
  16. , "example"
  17. , "", ""
  18. );
  19. //尝试获取新数据
  20. while (true){
  21. //todo 连接
  22. canalConnector.connect();
  23. //todo 订阅数据库
  24. canalConnector.subscribe("flinkcdc.*");
  25. //todo 批量拉取数据
  26. Message message = canalConnector.get(100);
  27. //todo 获取entry
  28. List<CanalEntry.Entry> entries = message.getEntries();
  29. //todo 遍历 判断集合状态
  30. if (entries.size()<=0){
  31. System.out.println("当次未抓取到数据,休息一会~~~~~~~");
  32. Thread.sleep(1000);
  33. }else {
  34. //遍历解析
  35. for (CanalEntry.Entry entry:entries){
  36. //1.获取表名
  37. String tableName = entry.getHeader().getTableName();
  38. //2.获取类型
  39. CanalEntry.EntryType entryType = entry.getEntryType();
  40. //3.获取序列化数据
  41. ByteString storeValue = entry.getStoreValue();
  42. //4.判断entry是否rowdata类型
  43. if (CanalEntry.EntryType.ROWDATA.equals(entryType)){
  44. //5.反序列化数据
  45. CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
  46. //6.获取当前事件操作类型
  47. CanalEntry.EventType eventType = rowChange.getEventType();
  48. //7.获取数据集
  49. List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
  50. //8.遍历rowdatalist,并打印数据集
  51. for (CanalEntry.RowData rowData:rowDatasList){
  52. JSONObject beforeData = new JSONObject();
  53. JSONObject afterData = new JSONObject();
  54. List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
  55. List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
  56. for (CanalEntry.Column column:beforeColumnsList){
  57. beforeData.put(column.getName(),column.getValue());
  58. }
  59. for (CanalEntry.Column column:afterColumnsList){
  60. afterData.put(column.getName(),column.getValue());
  61. }
  62. System.out.println("Table:"+tableName+
  63. ",EventType:"+eventType+
  64. ",Before:"+beforeData+
  65. ",After:"+afterData);
  66. }
  67. }else {
  68. System.out.println("数据类型不是所需要的");
  69. }
  70. }
  71. }
  72. }
  73. }
  74. }

执行:

到此,我们可以实时获取到mysql数据库的各种操作日志,接下来需要将数据写到哪里 可以按需实现。

3、kafka模式的实现

a.canal.properties

修改:canal.serverMode = kafka

    kafka信息:kafka.bootstrap.servers = 192.168.140.128:9092

  1. #################################################
  2. ######### common argument #############
  3. #################################################
  4. # tcp bind ip
  5. canal.ip =
  6. # register ip to zookeeper
  7. canal.register.ip =
  8. canal.port = 11111
  9. canal.metrics.pull.port = 11112
  10. # canal instance user/passwd
  11. # canal.user = canal
  12. # canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
  13. # canal admin config
  14. #canal.admin.manager = 127.0.0.1:8089
  15. canal.admin.port = 11110
  16. canal.admin.user = admin
  17. canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
  18. canal.zkServers =
  19. # flush data to zk
  20. canal.zookeeper.flush.period = 1000
  21. canal.withoutNetty = false
  22. # tcp, kafka, rocketMQ, rabbitMQ
  23. canal.serverMode = kafka
  24. # flush meta cursor/parse position to file
  25. canal.file.data.dir = ${canal.conf.dir}
  26. canal.file.flush.period = 1000
  27. ## memory store RingBuffer size, should be Math.pow(2,n)
  28. canal.instance.memory.buffer.size = 16384
  29. ## memory store RingBuffer used memory unit size , default 1kb
  30. canal.instance.memory.buffer.memunit = 1024
  31. ## meory store gets mode used MEMSIZE or ITEMSIZE
  32. canal.instance.memory.batch.mode = MEMSIZE
  33. canal.instance.memory.rawEntry = true
  34. ## detecing config
  35. canal.instance.detecting.enable = false
  36. #canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
  37. canal.instance.detecting.sql = select 1
  38. canal.instance.detecting.interval.time = 3
  39. canal.instance.detecting.retry.threshold = 3
  40. canal.instance.detecting.heartbeatHaEnable = false
  41. # support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
  42. canal.instance.transaction.size = 1024
  43. # mysql fallback connected to new master should fallback times
  44. canal.instance.fallbackIntervalInSeconds = 60
  45. # network config
  46. canal.instance.network.receiveBufferSize = 16384
  47. canal.instance.network.sendBufferSize = 16384
  48. canal.instance.network.soTimeout = 30
  49. # binlog filter config
  50. canal.instance.filter.druid.ddl = true
  51. canal.instance.filter.query.dcl = false
  52. canal.instance.filter.query.dml = false
  53. canal.instance.filter.query.ddl = false
  54. canal.instance.filter.table.error = false
  55. canal.instance.filter.rows = false
  56. canal.instance.filter.transaction.entry = false
  57. # binlog format/image check
  58. canal.instance.binlog.format = ROW,STATEMENT,MIXED
  59. canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
  60. # binlog ddl isolation
  61. canal.instance.get.ddl.isolation = false
  62. # parallel parser config
  63. canal.instance.parser.parallel = true
  64. ## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
  65. #canal.instance.parser.parallelThreadSize = 16
  66. ## disruptor ringbuffer size, must be power of 2
  67. canal.instance.parser.parallelBufferSize = 256
  68. # table meta tsdb info
  69. canal.instance.tsdb.enable = true
  70. canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
  71. canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
  72. canal.instance.tsdb.dbUsername = canal
  73. canal.instance.tsdb.dbPassword = canal
  74. # dump snapshot interval, default 24 hour
  75. canal.instance.tsdb.snapshot.interval = 24
  76. # purge snapshot expire , default 360 hour(15 days)
  77. canal.instance.tsdb.snapshot.expire = 360
  78. #################################################
  79. ######### destinations #############
  80. #################################################
  81. canal.destinations = example
  82. # conf root dir
  83. canal.conf.dir = ../conf
  84. # auto scan instance dir add/remove and start/stop instance
  85. canal.auto.scan = true
  86. canal.auto.scan.interval = 5
  87. canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
  88. #canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
  89. canal.instance.global.mode = spring
  90. canal.instance.global.lazy = false
  91. canal.instance.global.manager.address = ${canal.admin.manager}
  92. #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
  93. canal.instance.global.spring.xml = classpath:spring/file-instance.xml
  94. #canal.instance.global.spring.xml = classpath:spring/default-instance.xml
  95. ##################################################
  96. ######### MQ Properties #############
  97. ##################################################
  98. canal.mq.flat.message = true
  99. canal.mq.database.hash = true
  100. canal.mq.parallel.thread.size = 8
  101. canal.mq.canal.batch.size = 50
  102. canal.mq.canal.fetch.timeout = 100
  103. # Set this value to "cloud", if you want open message trace feature in aliyun.
  104. canal.mq.access.channel = local
  105. # aliyun ak/sk , support rds/mq
  106. canal.aliyun.accessKey =
  107. canal.aliyun.secretKey =
  108. canal.aliyun.uid=
  109. ##################################################
  110. ######### Kafka #############
  111. ##################################################
  112. kafka.bootstrap.servers = 192.168.140.128:9092
  113. kafka.acks = all
  114. kafka.compression.type = none
  115. kafka.batch.size = 16384
  116. kafka.linger.ms = 1
  117. kafka.max.request.size = 1048576
  118. kafka.buffer.memory = 33554432
  119. kafka.max.in.flight.requests.per.connection = 1
  120. kafka.retries = 0
  121. canal.mq.kafka.kerberos.enable = false
  122. canal.mq.kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
  123. canal.mq.kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"
  124. ##################################################
  125. ######### RocketMQ #############
  126. ##################################################
  127. rocketmq.producer.group = test
  128. rocketmq.enable.message.trace = false
  129. rocketmq.customized.trace.topic =
  130. rocketmq.namespace =
  131. rocketmq.namesrv.addr = 127.0.0.1:9876
  132. rocketmq.retry.times.when.send.failed = 0
  133. rocketmq.vip.channel.enabled = false
  134. ##################################################
  135. ######### RabbitMQ #############
  136. ##################################################
  137. rabbitmq.host =
  138. rabbitmq.virtual.host =
  139. rabbitmq.exchange =
  140. rabbitmq.username =
  141. rabbitmq.password =

b、example/instance.properties

修改:canal.instance.master.address=192.168.140.1:3306

canal.instance.dbUsername=flink

canal.instance.dbPassword=flink

canal.mq.topic=mysql_binlogs

  1. #################################################
  2. ## mysql serverId , v1.0.26+ will autoGen
  3. # canal.instance.mysql.slaveId=0
  4. # enable gtid use true/false
  5. canal.instance.gtidon=false
  6. # position info
  7. canal.instance.master.address=192.168.140.1:3306
  8. canal.instance.master.journal.name=
  9. canal.instance.master.position=
  10. canal.instance.master.timestamp=
  11. canal.instance.master.gtid=
  12. # rds oss binlog
  13. canal.instance.rds.accesskey=
  14. canal.instance.rds.secretkey=
  15. canal.instance.rds.instanceId=
  16. # table meta tsdb info
  17. canal.instance.tsdb.enable=true
  18. #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
  19. #canal.instance.tsdb.dbUsername=canal
  20. #canal.instance.tsdb.dbPassword=canal
  21. #canal.instance.standby.address =
  22. #canal.instance.standby.journal.name =
  23. #canal.instance.standby.position =
  24. #canal.instance.standby.timestamp =
  25. #canal.instance.standby.gtid=
  26. # username/password
  27. canal.instance.dbUsername=flink
  28. canal.instance.dbPassword=flink
  29. canal.instance.connectionCharset = UTF-8
  30. # enable druid Decrypt database password
  31. canal.instance.enableDruid=false
  32. #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
  33. # table regex
  34. canal.instance.filter.regex=.*\\..*
  35. # table black regex
  36. canal.instance.filter.black.regex=
  37. # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
  38. #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
  39. # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
  40. #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
  41. # mq config
  42. canal.mq.topic=mysql_binlogs
  43. # dynamic topic route by schema or table regex
  44. #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
  45. canal.mq.partition=0
  46. # hash partition config
  47. #canal.mq.partitionsNum=3
  48. #canal.mq.partitionHash=test.table:id^name,.*\\..*
  49. #################################################

c、启动

startup.sh

d、查看数据生产情况

kafka-console-consumer.sh --bootstrap-server 192.168.140.128:9092 --topic mysql_binlogs --from-beginning

4、datahub兼容kafka的实现

配置如何实现啊

什么是Canal插件,如何使用Canal插件_数据总线 DataHub(DataHub)-阿里云帮助中心

看样子需要datahub兼容kafka的ip:port (私有云需要联系运维)

5、TCP模式+Datahub SDK实现

TCP配置及基本实现参考2

a、依赖

  1. <dependency>
  2. <groupId>com.aliyun.datahub</groupId>
  3. <artifactId>aliyun-sdk-datahub</artifactId>
  4. <version>2.25.1</version>
  5. </dependency>

b、代码实现

  1. package com.tbea;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.alibaba.otter.canal.client.CanalConnector;
  4. import com.alibaba.otter.canal.client.CanalConnectors;
  5. import com.alibaba.otter.canal.protocol.CanalEntry;
  6. import com.alibaba.otter.canal.protocol.Message;
  7. import com.aliyun.datahub.client.DatahubClient;
  8. import com.aliyun.datahub.client.DatahubClientBuilder;
  9. import com.aliyun.datahub.client.auth.AliyunAccount;
  10. import com.aliyun.datahub.client.common.DatahubConfig;
  11. import com.aliyun.datahub.client.example.examples.Constant;
  12. import com.aliyun.datahub.client.exception.*;
  13. import com.aliyun.datahub.client.http.HttpConfig;
  14. import com.aliyun.datahub.client.model.*;
  15. import com.google.protobuf.ByteString;
  16. import com.google.protobuf.InvalidProtocolBufferException;
  17. import java.net.InetSocketAddress;
  18. import java.util.ArrayList;
  19. import java.util.List;
  20. import static com.aliyun.datahub.client.example.examples.Constant.*;
  21. public class CanalClient {
  22. static DatahubClient datahubClient = DatahubClientBuilder.newBuilder().setDatahubConfig(
  23. //Protocol可不设置,不设置默认使用PROTOBUF传输协议
  24. new DatahubConfig("https://datahub.cn-beijing-tbdg-d01.dh.res.bigdata.tbea.com",
  25. new AliyunAccount("2Z8tAOpDPBm5LEkA", "Tlupsw2G0PdKGCRyPLucHjeESqoCla"))
  26. ).setHttpConfig(new HttpConfig().setCompressType(CompressType.LZ4)).build();
  27. public static void main(String[] args) throws InterruptedException, InvalidProtocolBufferException {
  28. //获取链接
  29. CanalConnector canalConnector = CanalConnectors.newSingleConnector(
  30. new InetSocketAddress("192.168.140.129", 11111)
  31. , "example"
  32. , "", ""
  33. );
  34. //尝试获取新数据
  35. while (true){
  36. //todo 连接
  37. canalConnector.connect();
  38. //todo 订阅数据库
  39. canalConnector.subscribe("flinkcdc.*");
  40. //todo 批量拉取数据
  41. Message message = canalConnector.get(100);
  42. //todo 获取entry
  43. List<CanalEntry.Entry> entries = message.getEntries();
  44. //todo 遍历 判断集合状态
  45. if (entries.size()<=0){
  46. System.out.println("当次未抓取到数据,休息一会~~~~~~~");
  47. Thread.sleep(1000);
  48. }else {
  49. //遍历解析
  50. for (CanalEntry.Entry entry:entries){
  51. //1.获取表名
  52. String tableName = entry.getHeader().getTableName();
  53. String schemaName = entry.getHeader().getSchemaName();
  54. //2.获取类型
  55. CanalEntry.EntryType entryType = entry.getEntryType();
  56. //3.获取序列化数据
  57. ByteString storeValue = entry.getStoreValue();
  58. //4.判断entry是否rowdata类型
  59. if (CanalEntry.EntryType.ROWDATA.equals(entryType)){
  60. //5.反序列化数据
  61. CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
  62. //6.获取当前事件操作类型
  63. CanalEntry.EventType eventType = rowChange.getEventType();
  64. //7.获取数据集
  65. List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
  66. JSONObject beforeData = new JSONObject();
  67. JSONObject afterData = new JSONObject();
  68. //8.遍历rowdatalist,并打印数据集
  69. for (CanalEntry.RowData rowData:rowDatasList){
  70. List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
  71. List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
  72. for (CanalEntry.Column column:beforeColumnsList){
  73. beforeData.put(column.getName(),column.getValue());
  74. }
  75. for (CanalEntry.Column column:afterColumnsList){
  76. afterData.put(column.getName(),column.getValue());
  77. }
  78. System.out.println("Table:"+tableName+
  79. ",EventType:"+eventType+
  80. ",Before:"+beforeData+
  81. ",After:"+afterData);
  82. }
  83. Binlog binlog = new Binlog(eventType.toString(), tableName, schemaName, beforeData.toJSONString(), afterData.toJSONString());
  84. tupleExample("bigdata","tcp_canal",3,binlog);
  85. }else {
  86. System.out.println("数据类型不是所需要的");
  87. }
  88. }
  89. }
  90. }
  91. }
  92. // 写入Tuple型数据
  93. public static void tupleExample(String project,String topic,int retryTimes,Binlog binlog) {
  94. DatahubClient datahubClient = DatahubClientBuilder.newBuilder().setDatahubConfig(
  95. //Protocol可不设置,不设置默认使用PROTOBUF传输协议
  96. new DatahubConfig("https://datahub.cn-*****************.com",
  97. new AliyunAccount("************", "*************"))
  98. ).setHttpConfig(new HttpConfig().setCompressType(CompressType.LZ4)).build();
  99. // 获取schema
  100. RecordSchema recordSchema = datahubClient.getTopic(project,topic).getRecordSchema();
  101. // 生成十条数据
  102. List<RecordEntry> recordEntries = new ArrayList<>();
  103. RecordEntry recordEntry = new RecordEntry();
  104. // 对每条数据设置额外属性,例如ip 机器名等。可以不设置额外属性,不影响数据写入
  105. recordEntry.addAttribute("key1", "value1");
  106. TupleRecordData data = new TupleRecordData(recordSchema);
  107. data.setField("operate", binlog.getOperater());
  108. data.setField("tablename", binlog.getTableName());
  109. data.setField("databasename", binlog.getDatabaseName());
  110. data.setField("before", binlog.getBefore());
  111. data.setField("after", binlog.getAfter());
  112. recordEntry.setRecordData(data);
  113. recordEntries.add(recordEntry);
  114. try {
  115. PutRecordsResult result = datahubClient.putRecords(project, topic, recordEntries);
  116. int i = result.getFailedRecordCount();
  117. if (i > 0) {
  118. retry(datahubClient, result.getFailedRecords(), retryTimes, project, topic);
  119. }
  120. } catch (DatahubClientException e) {
  121. System.out.println("requestId:" + e.getRequestId() + "\tmessage:" + e.getErrorMessage());
  122. }
  123. }
  124. //重试机制
  125. public static void retry(DatahubClient client, List<RecordEntry> records, int retryTimes, String project, String topic) {
  126. boolean suc = false;
  127. while (retryTimes != 0) {
  128. retryTimes = retryTimes - 1;
  129. PutRecordsResult recordsResult = client.putRecords(project, topic, records);
  130. if (recordsResult.getFailedRecordCount() > 0) {
  131. retry(client,recordsResult.getFailedRecords(),retryTimes,project,topic);
  132. }
  133. suc = true;
  134. break;
  135. }
  136. if (!suc) {
  137. System.out.println("retryFailure");
  138. }
  139. }
  140. }

注意topic和代码的映射关系

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/爱喝兽奶帝天荒/article/detail/954474
推荐阅读
相关标签
  

闽ICP备14008679号