赞
踩
canal-client-springboot-starter
引用jar包canal-client升级1.1.5。
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>${canal-client.version}</version>
</dependency>
1.1.5中把protocol包单独设置了一个模块。
所以,需要加一个包。
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>${canal-client.version}</version>
</dependency>
本来想在canal-client-springboot-starter
中集成rabbitmq,但是发现官方提供的canal.client jar包中没有rabbitmq相关的连接包,自己再去扩展的话还得去编译canal的源码——暂时只好放弃。
Exception in thread "canal-client-thread" java.lang.NoClassDefFoundError: com/rabbitmq/client/ConnectionFactory
at com.alibaba.otter.canal.client.rabbitmq.RabbitMQCanalConnector.connect(RabbitMQCanalConnector.java:67)
at top.javatool.canal.client.client.RabbitMqCanalClient.process(RabbitMqCanalClient.java:34)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.rabbitmq.client.ConnectionFactory
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 3 more
RabbitMQCanalConnector.java
连接rabbitmq的部分源码。
public void connect() throws CanalClientException { ConnectionFactory factory = new ConnectionFactory(); if (accessKey.length() > 0 && secretKey.length() > 0) { factory.setCredentialsProvider(new AliyunCredentialsProvider(accessKey, secretKey, resourceOwnerId)); } else { factory.setUsername(username); factory.setPassword(password); } factory.setHost(nameServer); factory.setAutomaticRecoveryEnabled(true); factory.setNetworkRecoveryInterval(5000); factory.setVirtualHost(vhost); try { connect = factory.newConnection(); channel = connect.createChannel(); } catch (IOException | TimeoutException e) { throw new CanalClientException("Start RabbitMQ producer error", e); } }
不过也是,截止2021年10月26日,canal官方推荐的正式版本仍然是1.1.4,对客户端支持rabbitmq还没有做足够的支持。
参考canal-client-springboot-starter
自己构建了一个easy-canal-client
的项目。
源码已开源:https://gitee.com/cowboy2014/easy-canal-client.git
架构示意图如下:
canal需要升级为1.1.5
,canal把binlog数据解析完成后,就把数据直接投递给rabbitmq了——1.1.5可以把数据直接投递给rabbitmq。
canal.properties
参考配置:
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = rabbitMQ
##################################################
######### RabbitMQ #############
##################################################
rabbitmq.host = 172.16.150.11
rabbitmq.virtual.host = /
rabbitmq.exchange = canal-exchange
rabbitmq.username = canal
rabbitmq.password = canal%123
rabbitmq.deliveryMode =
conf/example/instance.properties
参考配置:
# position info
canal.instance.master.address=172.16.150.12:3306
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal%123
# table regex
canal.instance.filter.regex=lpm-center\\.lpm_(park|company|store|route)
# mq config
canal.mq.topic=shangwt
业务模块直接集成rabbitmq,作为消费者进行数据的解析、同步。
@Component @CanalTable(value = "lpm_park") @Slf4j public class ParkHandler implements EntryHandler<Park> { @Resource private ElSearchParkServiceImpl elSearchParkService; @Resource private EsIndexes esIndexes; @Override public void insert(Park park) { log.info("insert message {}", park); try { elSearchParkService.synchronous(esIndexes.getPark(), park.getId()); } catch (IOException e) { log.error("es insert wrong!"); } } @Override public void update(Park before, Park park) { try { if (ObjectUtil.isNotEmpty(before.getDeleted()) && !before.getDeleted().equals(park.getDeleted()) && park.getDeleted() == 1){ this.delete(park); } if (ObjectUtil.isNotEmpty(before.getDeleted()) && !before.getDeleted().equals(park.getDeleted()) && park.getDeleted() == 0){ this.insert(park); } elSearchParkService.synchronous(esIndexes.getPark(), park.getId()); } catch (IOException e) { log.error("es insert wrong!"); } log.info("update after {}", park); } @Override public void delete(Park park) { log.info("delete {}", park); elSearchParkService.deleteById(esIndexes.getPark(), park.getId().intValue()); } }
/**
* canal消息同步
* @param info
*/
@RabbitListener(queues = {"shangwtQueue"},containerFactory = "multiListenerContainer")
public <T> void consumeMsg(FlatMessage info){
try {
handlerUtil.handleMessage(info);
}catch (Exception e){
log.error("canal消息-监听者-发生异常:",e.fillInStackTrace());
}
}
我们知道canal数据处理是在mysql事务之外的,一定会产生延迟,延迟期间最新的数据被消费了怎么办?
早期的数据延迟消费,可能会造成“无用功”(幂等性OK的情况下),甚至是错误操作缓存数据。所以务必要重视步骤1的执行。
如何判断消息数据为最新?表中添加一个时间戳字段,设置自动更新。通过比较时间戳,判断消息是否过期。
https://github.com/NormanGyllenhaal/canal-client
感谢您的赏读~
如果您对我的文章感兴趣的话,欢迎留下您的问题让我们一起探讨!一起进步!!
还可以关注我的微信公众号,回复“Canal”获取我的Canal学习脑图哦~声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/445423
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。