赞
踩
Canal服务端搭建:https://github.com/alibaba/canal/wiki/QuickStart
Canal配置文件说明:https://github.com/alibaba/canal/wiki/AdminGuide
ClientAPI:https://github.com/alibaba/canal/wiki/ClientAPI
微信使用weixin-java-tools:https://gitee.com/binary/weixin-java-tools
canal服务端配置文件canal.properties,默认配置即可
#################################################
######### common argument #############
#################################################
#每个canal server实例的唯一标识,默认为1
canal.id= 1
#canal server绑定的本地IP信息,如果不配置,默认选择一个本机IP进行启动服务
canal.ip= 192.168.2.105
#canal server提供socket服务的端口默认11111
canal.port= 11111
#canal server链接zookeeper集群的链接信息例子:127.0.0.1:2181,127.0.0.1:2182
canal.zkServers=
# canal持久化数据到zookeeper上的更新频率,单位毫秒
canal.zookeeper.flush.period = 1000
# canal持久化数据到file上的目录
canal.file.data.dir = ${canal.conf.dir}
#canal持久化数据到file上的更新频率,单位毫秒
canal.file.flush.period = 1000
#canal内存store中可缓存buffer记录数,需要为2的指数
canal.instance.memory.buffer.size = 16384
#内存记录的单位大小,默认1KB,和buffer.size组合决定最终的内存使用大小
canal.instance.memory.buffer.memunit = 1024
#canal内存store中数据缓存模式
#1.ITEMSIZE : 根据buffer.size进行限制,只限制记录的数量
#2.MEMSIZE : 根据buffer.size * buffer.memunit的大小,限制缓存记录的内存大小
canal.instance.memory.batch.mode = MEMSIZE
## 是否开启心跳检查
canal.instance.detecting.enable = false
#心跳检查sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
#心跳检查频率,单位秒
canal.instance.detecting.interval.time = 3
#心跳检查失败重试次数
canal.instance.detecting.retry.threshold = 3
#心跳检查失败后,是否开启mysql自动切换
#说明:比如心跳检查失败超过阀值后,如果该配置为true,canal就会自动链到mysql备库获取binlog数据
canal.instance.detecting.heartbeatHaEnable = false
# 最大事务完整解析的长度支持
#超过该长度后,一个事务可能会被拆分成多次提交到canal store中,无法保证事务的完整可见性
#个人建议,需要同步的数据最好是来自一张表;多张表时可以在client端实现一个buffer,凑足一个事务数据再逐个ack
#参考链接:https://github.com/alibaba/canal/issues/589
canal.instance.transaction.size = 1024
#canal发生mysql切换时,在新的mysql库上查找binlog时需要往前查找的时间,单位秒
#说明:mysql主备库可能存在解析延迟或者时钟不统一,需要回退一段时间,保证数据不丢
canal.instance.fallbackIntervalInSeconds = 60
# 网络链接参数,SocketOptions.SO_RCVBUF
canal.instance.network.receiveBufferSize = 16384
#网络链接参数,SocketOptions.SO_SNDBUF
canal.instance.network.sendBufferSize = 16384
#网络链接参数,SocketOptions.SO_TIMEOUT
canal.instance.network.soTimeout = 30
# v1.0.25版本新增,是否启用druid的DDL parse的过滤,基于sql的完整parser可以解决之前基于正则匹配补全的问题,默认为true
canal.instance.filter.druid.ddl = true
#是否忽略DCL的query语句,比如grant/create user等
canal.instance.filter.query.dcl = false
#是否忽略DML的query语句,比如insert/update/delete table.(mysql5.6的ROW模式可以包含statement模式的query记录)
canal.instance.filter.query.dml = false
#是否忽略DDL的query语句,由于1.0.25版本解析ddl会出错(已确认为bug),此值建议设置成功true
canal.instance.filter.query.ddl = true
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
#ddl语句是否隔离发送,开启隔离可保证每次只返回发送一条ddl数据,不和其他dml语句混合返回
canal.instance.get.ddl.isolation = false
#################################################
######### destinations #############
#################################################
canal.destinations= business
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
canal.instance.tsdb.spring.xml=classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml=classpath:spring/tsdb/mysql-tsdb.xml
canal.instance.global.mode = spring
canal.instance.global.lazy = false
#canal.instance.global.manager.address = 127.0.0.1:1099
#canal.instance.global.spring.xml = classpath:spring/local-instance.xml
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

instance配置文件
#################################################
## mysql serverId
canal.instance.mysql.slaveId=0
#mysql主库链接地址
canal.instance.master.address=127.0.0.1:3306
#mysql主库链接时起始的binlog文件
canal.instance.master.journal.name=
#mysql主库链接时起始的binlog偏移量
canal.instance.master.position=
#mysql主库链接时起始的binlog的时间戳
canal.instance.master.timestamp=
# table meta tsdb info true
canal.instance.tsdb.enable=true
canal.instance.tsdb.dir=${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url=jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
canal.instance.tsdb.dbUsername=canal
canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#mysql数据库帐号
canal.instance.dbUsername=root
#mysql数据库密码
canal.instance.dbPassword=root
canal.instance.defaultDatabaseName=canal
canal.instance.connectionCharset=UTF-8
#mysql 数据解析关注的表,Perl正则表达式
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=
#################################################

代码结构:
canal-client.xml配置文件
- <?xml version="1.0" encoding="utf-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"
- default-autowire="byName">
-
- <bean id="canalConsumer_business" class="com.innofarm.canal.client.CanalConsumer" init-method="init" destroy-method="stop" >
- <!-- hostname对应canal.properies中的canal.ip值保持一致 -->
- <property name="hostname" value="192.168.2.105"/>
- <property name="port" value="11111" />
- <!-- destination对应canal.properies中的canal.destinations -->
- <property name="destination" value="business" />
- <property name="processor">
- <!-- 配置数据处理类key为表名称,value为处理类 为BaseProcess子类-->
- <map>
- <entry key="message_info" value-ref="messageInfoProcess"></entry>
- </map>
- </property>
- </bean>
-
- <bean id="messageInfoProcess" class="com.innofarm.canal.process.MessageInfoProcess"></bean>
-
- </beans>

项目入口。main启动
需要的sql,DDL文件:
- CREATE TABLE `message_info` (
- `ID` bigint(11) unsigned NOT NULL AUTO_INCREMENT,
- `OPEN_ID` varchar(28) NOT NULL,
- `TEMPLATE_ID` varchar(43) NOT NULL COMMENT '模板ID',
- `URL` varchar(255) DEFAULT NULL COMMENT '跳转链接',
- `MINIPROGRAM` varchar(255) DEFAULT NULL COMMENT '跳小程序所需数据,不需跳小程序可不用传该数据(Json格式)',
- `MESSAGE_CONTENT` varchar(1000) NOT NULL COMMENT '微信要发送的数据Json格式',
- PRIMARY KEY (`ID`)
- ) ENGINE=InnoDB AUTO_INCREMENT=4461 DEFAULT CHARSET=utf8mb4;
此工程是maven,pom文件已配置好maven打成jar的配置信息,执行install命令,在target文件下会生成一个BusinessCanal-0.0.1-SNAPSHOT.jar包。
java -jar BusinessCanal-0.0.1-SNAPSHOT.jar
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。