当前位置:   article > 正文

mysql监听rabbitmq消息_Mysql+Canal+RabbitMQ+Redis打造订单实时大屏-agent模块搭建4

agent服务+mq

昨天,把Order模块搞定了,今天来做agent模块,先说一下agent模块的功能

agent模块:负责从mysql中通过Canal拉去数据,然后将数据放入到MQ中。

昨天已经把agent模块架子打好了。

67365a2547e67e6b633e55baff130745.png个个包就不详细介绍了。

主要说一下:CanalListener这个类,CanalListener主要是Canal负责监听mysql,如果数据发生变法,这会收到更改的数据。

Canal:译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

2b07d1f31d13f66c70b54e9294a8b0b9.pngcanal 工作原理

canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议

MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )

canal 解析 binary log 对象(原始为 byte 流)

所有mysql一定要开启binlog。

这个是canal:https://github.com/alibaba/canal的地址。好了canal先说到这里。

还是说:CanalListener这个类。

e772d6de30697191a54ae8b7ef887583.png@PostConstruct:注解,服务起来后会调用该方法。

一个循环,通过canalConnector获取连接,订阅subscribe监听内容,如果没有数据则sleep3秒,如有数据则获取指定数量的数据,然后解析message信息,根据不同事件,将数据封装成一个实体。

private String dataBaseName;

private String tableName;

private String type;

private MapdataBefore;

private MapdataAfter;

dataBaseName 数据库名称

tableName 表名

type 是什么操作,插入删除更新等

dataBefore 修改前数据

dataAfter 修改之后数据

4cff098a6f11402eedd0ee6154d484e1.png

43843c29f2478b40c6ec6db2a10e6f64.png

fac4533e58c61281f52b1554140b5985.png将解析到的数据,发送到mq中,rabbitTemplate.convertAndSend(RabbitMqQueueConfig.STRING, canalMysqlEntry.toString());

到这agent模块主要功能已经说晚了,下面看一下代码需要注意的地方。

application.properties

#mq

spring.rabbitmq.host=192.168.199.101

spring.rabbitmq.username=guest

spring.rabbitmq.password=guest

spring.rabbitmq.port=5672

spring.rabbitmq.virtual-host=/

手动ACK 目的是防止报错后未正确处理消息丢失 默认 为 none

spring.rabbitmq.listener.simple.acknowledge-mode=manual

mq:使用的rabbitmq,通过rabbitTemplate操作,

ConnectionFactory connectionFactory = new ConnectionFactory();

connectionFactory.setHost("192.168.199.101");

connectionFactory.setPort(5672);

connectionFactory.setConnectionTimeout(6000);

connectionFactory.setVirtualHost("/");

connectionFactory.setUsername("guest");

connectionFactory.setPassword("guest");

// 生成Connection & Channel

connection = connectionFactory.newConnection();

//创建一个通道

Channel channel = connection.createChannel();

//声明队列

//queueDeclare第一个参数表示队列名称、

// 第二个参数为是否持久化(true表示是,队列将在服务器重启时生存)、

// 第三个参数为是否是独占队列(创建者可以使用的私有队列,断开后自动删除)、

// 第四个参数为当所有消费者客户端连接断开时是否自动删除队列、第五个参数为队列的其他参数

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

//发送消息到队列

//basicPublish第一个参数为交换机名称、

// 第二个参数为队列映射的路由key、

// 第三个参数为消息的其他属性、

// 第四个参数为发送信息的主体

channel.basicPublish("", QUEUE_NAME, null, msg.getBytes("UTF-8"));

System.out.println("Producer Send: '" + msg + "'");

channel.close();

写的有mq测试的类。

最后,看一下效果吧!

da952cfb6335d752c4ee5635952fbdd1.png项目启动成功!然后修改mysql中的一条数据:如将单号1567915800002的金额有1000.000修改为2000.则

e6999179810d43307e0e851cb5f1face.png

136bc29c64b0b689948caa01b010f26e.png

f04415d8eccdabf3dfa2f79596e70dd0.pngmysql修改完成后,agent立刻受到数据:

{"dataAfter":{"order_no":"1567915800002","source_no":"APPANDROID","address":"天津黄河路13号","receiver":"南吸佳","create_time":"2019-09-08 04:10:00","city":"天津","actual_amount":"1565.0","telephone":"1387138","type":"DIANZI","update_time":"2019-09-08 04:10:00","original_amount":"2000.0","id":"394","category":"ZIYING","status":"CANCEL","ts":"2019-09-08 20:08:58"},"dataBaseName":"eyeOrder","dataBefore":{"order_no":"1567915800002","source_no":"APPANDROID","address":"天津黄河路13号","receiver":"南吸佳","create_time":"2019-09-08 04:10:00","city":"天津","actual_amount":"1565.0","telephone":"1387138","type":"DIANZI","update_time":"2019-09-08 04:10:00","original_amount":"1000.0","id":"394","category":"ZIYING","status":"CANCEL","ts":"2019-09-08 19:38:03"},"tableName":"order_info","type":"UPDATE"}

然后放入到mq队列中,查看mq队列中的数据。

047c685cbbd3558eeb09c9a92df5643e.png好了,今天写作就到这了。大家有什么问题,可以留言。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/85619
推荐阅读
相关标签
  

闽ICP备14008679号