赞
踩
昨天,把Order模块搞定了,今天来做agent模块,先说一下agent模块的功能
agent模块:负责从mysql中通过Canal拉去数据,然后将数据放入到MQ中。
昨天已经把agent模块架子打好了。
个个包就不详细介绍了。
主要说一下:CanalListener这个类,CanalListener主要是Canal负责监听mysql,如果数据发生变法,这会收到更改的数据。
Canal:译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。
canal 工作原理
canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
canal 解析 binary log 对象(原始为 byte 流)
所有mysql一定要开启binlog。
这个是canal:https://github.com/alibaba/canal的地址。好了canal先说到这里。
还是说:CanalListener这个类。
@PostConstruct:注解,服务起来后会调用该方法。
一个循环,通过canalConnector获取连接,订阅subscribe监听内容,如果没有数据则sleep3秒,如有数据则获取指定数量的数据,然后解析message信息,根据不同事件,将数据封装成一个实体。
private String dataBaseName;
private String tableName;
private String type;
private MapdataBefore;
private MapdataAfter;
dataBaseName 数据库名称
tableName 表名
type 是什么操作,插入删除更新等
dataBefore 修改前数据
dataAfter 修改之后数据
将解析到的数据,发送到mq中,rabbitTemplate.convertAndSend(RabbitMqQueueConfig.STRING, canalMysqlEntry.toString());
到这agent模块主要功能已经说晚了,下面看一下代码需要注意的地方。
application.properties
#mq
spring.rabbitmq.host=192.168.199.101
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
手动ACK 目的是防止报错后未正确处理消息丢失 默认 为 none
spring.rabbitmq.listener.simple.acknowledge-mode=manual
mq:使用的rabbitmq,通过rabbitTemplate操作,
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.199.101");
connectionFactory.setPort(5672);
connectionFactory.setConnectionTimeout(6000);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// 生成Connection & Channel
connection = connectionFactory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
//声明队列
//queueDeclare第一个参数表示队列名称、
// 第二个参数为是否持久化(true表示是,队列将在服务器重启时生存)、
// 第三个参数为是否是独占队列(创建者可以使用的私有队列,断开后自动删除)、
// 第四个参数为当所有消费者客户端连接断开时是否自动删除队列、第五个参数为队列的其他参数
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//发送消息到队列
//basicPublish第一个参数为交换机名称、
// 第二个参数为队列映射的路由key、
// 第三个参数为消息的其他属性、
// 第四个参数为发送信息的主体
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes("UTF-8"));
System.out.println("Producer Send: '" + msg + "'");
channel.close();
写的有mq测试的类。
最后,看一下效果吧!
项目启动成功!然后修改mysql中的一条数据:如将单号1567915800002的金额有1000.000修改为2000.则
mysql修改完成后,agent立刻受到数据:
{"dataAfter":{"order_no":"1567915800002","source_no":"APPANDROID","address":"天津黄河路13号","receiver":"南吸佳","create_time":"2019-09-08 04:10:00","city":"天津","actual_amount":"1565.0","telephone":"1387138","type":"DIANZI","update_time":"2019-09-08 04:10:00","original_amount":"2000.0","id":"394","category":"ZIYING","status":"CANCEL","ts":"2019-09-08 20:08:58"},"dataBaseName":"eyeOrder","dataBefore":{"order_no":"1567915800002","source_no":"APPANDROID","address":"天津黄河路13号","receiver":"南吸佳","create_time":"2019-09-08 04:10:00","city":"天津","actual_amount":"1565.0","telephone":"1387138","type":"DIANZI","update_time":"2019-09-08 04:10:00","original_amount":"1000.0","id":"394","category":"ZIYING","status":"CANCEL","ts":"2019-09-08 19:38:03"},"tableName":"order_info","type":"UPDATE"}
然后放入到mq队列中,查看mq队列中的数据。
好了,今天写作就到这了。大家有什么问题,可以留言。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。