当前位置:   article > 正文

RabbitMq和Canal的使用_canal rabbitmq

canal rabbitmq

一、RabbitMq

(1)RabbitMq是什么

RabbitMq是一种主流的消息队列,消息队列(Message Queue)是一种消息的容器,主要用于实现程序(服务、进程、线程)之间的通信;队列是 FIFO(先进先出)的数据结构

(2)为什么要使用RabbitMq

一般我们在做微服务项目时,会用feign来进行RPC远程调用,这样如果在一段逻辑代码中多次调用RPC,会比较浪费时间,因为是同步的,并且改一处逻辑,很多地方都要改,耦合性较强。因此使用RabbitMq这个消息中间件,来实现远程调用,主要作用是解耦、削峰、异步。

(3)RabbitMq的安装

1、首先安装erlang

2、安装rabbitmq

3、在安装的bin目录下打开cmd启动插件使能

rabbitmq-plugins enable rabbitmq_management

4、启动rabbitmq

net start rabbitmq

5、浏览器输入: http://localhost:15672 账号密码都是guest

Linux的安装见

https://blog.csdn.net/qq_45502336/article/details/118699251
 

(4)RabbitMq的基本使用

 

(1)一对一模式:生产者和消费者间只有一个队列,一个生产者发送消息到一个队列,一个消费者从队列中取消息。

(2)一对多模式(工作队列):生产者将消息分发给多个消费者,如果生产者生产了100条消息,消费者1消费50条,消费者2消费50条。 

        扩展:工作队列的能者多劳模式:

因为队列默认采用是自动确认机制,消息发过去后就自动确认,队列不清楚每个消息具体什么时间处理完,所以平均分配消息数量。

实现能者多劳:

  1. channel.basicQos(1);限制队列一次发一个消息给消费者,等消费者有了反馈,再发下一条

  2. channel.basicAck 消费完消息后手动反馈,处理快的消费者就能处理更多消息

  3. basicConsume 中的参数改为false

(3)发布/订阅模式

发布/订阅模式和Work模式的区别是:Work模式只存在一个队列,多个消费者共同消费一个队列中的消息;而发布订阅模式存在多个队列,不同的消费者可以从各自的队列中处理完全相同的消息。 实现步骤:

1) 创建交换机(Exchange)类型是fanout(扇出)

2) 交换机需要绑定不同的队列

3) 不同的消费者从不同的队列中获得消息

4) 生产者发送消息到交换机

5) 再由交换机将消息分发到多个队列

(4)路由模式

路由模式的消息队列可以给队列绑定不同的key,生产者发送消息时,给消息设置不同的key,这样交换机在分发消息时,可以让消息路由到key匹配的队列中。

(5)主题模式

主题模式和路由模式差不多,在key中可以加入通配符:

* 匹配任意一个单词

# 匹配.号隔开的多个单词

二、Canal

(1)Canal介绍

Canal是阿里巴巴的数据同步工具,最初主要为了应对杭州和美国的双机房部署问题,目前也是国内互联网企业经常使用的数据增量同步解决方案。

(2)实现原理

  1. canal将自己伪装为MySQL的slave,向master发送dump协议

  2. master收到dump协议,数据发生修改后推送binary log给canal

  3. canal解析binary log对象,转换为增量数据,同步到ES、Redis等

(3)安装

(1)Mysql配置

首先要让mysql开启binlog模式

1、进入mysql查看是否启动binlog

SHOW VARIABLES LIKE '%log_bin%'

log_bin为ON表示启动,为OFF则未启动,需要修改mysql配置文件启动log_bin

windows配置文件是MySQL安装目录的my.in

修改:

   [mysqld]
   log-bin=mysql-bin
   binlog-format=ROW
   server_id=1

2、创建用户

进入mysql

  1. create user canal@'%'IDENTIFIED WITH mysql_native_password BY 'canal';
  2. GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
  3. FLUSH PRIVILEGES;

(2)Windows安装

Windows上直接解压.gz的压缩包,在bin目录中编辑startup.bat,按图将下面的配置改为这样

 直接运行即可

(3)Linux安装

(1)上传文件到Linux,解压到canal目录中

  1. cd /usr/local
  2. mkdir canal
  3. tar -vxf canal.deployer-1.1.4.tar.gz -C canal

(2)配置canal

进入mysql,输入命令,记录文件名和位置 (我连接的是本机的mysql,不是虚拟机的)

show master status;

 

(3)进入canal目录,修改配置文件

  vi conf/example/instance.properties

 (4)启动Canal

进入bin目录启动服务

./startup.sh

查看日志

cat /usr/local/canal/logs/canal/canal.log

 这样就说明启动成功了

(3)Canal+RabbitMQ实现数据增量同步

以实际业务需求为例

 

给生产者服务添加依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>com.xpand</groupId>
  7. <artifactId>starter-canal</artifactId>
  8. <version>0.0.1-SNAPSHOT</version>
  9. </dependency>

canal那个依赖是引入的第三方库,因为第三方库比官方的简洁好用,代码量少,因此使用第三方库,需要导入第三方maven,首先从GitHub上克隆

https://github.com/chenqian56131/spring-boot-starter-canal

在maven上添加这个pom文件即可

添加配置文件(连接RabbitMq)

  1. //连接RabbitMq
  2. spring:
  3. rabbitmq:
  4. host: localhost
  5. port: 5672
  6. username: admin //rabbitmq的用户名(这是我新建的用户)
  7. password: 123456 //Rabbitmq的密码 (新建用户的密码)
  8. virtual-host: myhost //Rabbitmq创建虚拟主机的名字
  9. //连接canal
  10. canal:
  11. client:
  12. instances:
  13. example:
  14. host: 192.168.160.136 //canal所在的ip,我这里是虚拟机ip
  15. port: 11111 //这是默认端口,可以在配置文件该
  16. batchSize: 1000

生产者的配置类(定义交换机、队列、绑定方式等)

  1. /**
  2. * RabbitMQ的配置
  3. */
  4. @Configuration
  5. public class RabbitMQConfig {
  6. public static final String QUEUE_COURSE_SAVE = "queue.course.save";
  7. public static final String QUEUE_COURSE_REMOVE = "queue.course.remove";
  8. public static final String KEY_COURSE_SAVE = "key.course.save";
  9. public static final String KEY_COURSE_REMOVE = "key.course.remove";
  10. public static final String COURSE_EXCHANGE = "edu.course.exchange";
  11. @Bean
  12. public Queue queueCourseSave() {
  13. return new Queue(QUEUE_COURSE_SAVE);
  14. }
  15. @Bean
  16. public Queue queueCourseRemove() {
  17. return new Queue(QUEUE_COURSE_REMOVE);
  18. }
  19. @Bean
  20. public TopicExchange topicExchange() {
  21. return new TopicExchange(COURSE_EXCHANGE);
  22. }
  23. @Bean
  24. public Binding bindCourseSave() {
  25. return BindingBuilder.bind(queueCourseSave()).to(topicExchange()).with(KEY_COURSE_SAVE);
  26. }
  27. @Bean
  28. public Binding bindCourseRemove() {
  29. return BindingBuilder.bind(queueCourseRemove()).to(topicExchange()).with(KEY_COURSE_REMOVE);
  30. }
  31. }

canal实现监听(数据库的同步)

  1. package com.blb.educourseservice.listener;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.otter.canal.protocol.CanalEntry;
  4. import com.blb.common.entity.Course;
  5. import com.blb.educourseservice.config.RabbitMQConfig;
  6. import com.blb.educourseservice.service.CourseService;
  7. import com.xpand.starter.canal.annotation.CanalEventListener;
  8. import com.xpand.starter.canal.annotation.ListenPoint;
  9. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  10. import org.springframework.beans.factory.annotation.Autowired;
  11. /**
  12. * 事件监听器
  13. */
  14. @CanalEventListener
  15. public class CanalListener {
  16. @Autowired
  17. private RabbitTemplate rabbitTemplate;
  18. @Autowired
  19. private CourseService courseService;
  20. /**
  21. * 监听 edu_course数据库的course表
  22. */
  23. @ListenPoint(schema = "edu_course",table = "course")
  24. public void handleCourseUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData){
  25. //判断操作的类型
  26. if("DELETE".equals(eventType.name())){
  27. //遍历删除前数据行的每一列
  28. rowData.getBeforeColumnsList().forEach(column -> {
  29. //获得删除前的ID
  30. if("id".equals(column.getName())){
  31. //发删除消息给处理删除的队列
  32. rabbitTemplate.convertAndSend(RabbitMQConfig.COURSE_EXCHANGE,RabbitMQConfig.KEY_COURSE_REMOVE,Long.valueOf(column.getValue()));
  33. return;
  34. }
  35. });
  36. }else if("INSERT".equals(eventType.name()) || "UPDATE".equals(eventType.name())){
  37. //获得插入或更新后的数据
  38. rowData.getAfterColumnsList().forEach(column -> {
  39. if("id".equals(column.getName())){
  40. //通过id查询课程的完整信息
  41. Course course = courseService.getCourseById(Long.valueOf(column.getValue()));
  42. //包装到Course对象中,转换为JSON
  43. String json = JSON.toJSONString(course);
  44. //发送给添加或更新队列
  45. rabbitTemplate.convertAndSend(RabbitMQConfig.COURSE_EXCHANGE,RabbitMQConfig.KEY_COURSE_SAVE,json);
  46. }
  47. });
  48. }
  49. }
  50. }

启动类加上注解

@EnableCanalClient

消费者添加依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

添加配置

  1. spring:
  2. cloud:
  3. nacos:
  4. discovery:
  5. server-addr: 127.0.0.1:8848
  6. config:
  7. file-extension: yaml
  8. server-addr: 127.0.0.1:8848
  9. application:
  10. name: edu-search-service
  11. elasticsearch:
  12. rest:
  13. uris: 127.0.0.1:9200
  14. rabbitmq:
  15. virtual-host: myhost
  16. host: 127.0.0.1
  17. username: wjm
  18. password: 123456
  19. port: 5672
  20. server:
  21. port: 8001

添加监听器

  1. package com.blb.edusearchservice.listener;
  2. import com.alibaba.fastjson.JSON;
  3. import com.blb.common.entity.Course;
  4. import com.blb.edusearchservice.service.CourseIndexService;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.springframework.amqp.core.ExchangeTypes;
  7. import org.springframework.amqp.rabbit.annotation.Exchange;
  8. import org.springframework.amqp.rabbit.annotation.Queue;
  9. import org.springframework.amqp.rabbit.annotation.QueueBinding;
  10. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  11. import org.springframework.beans.factory.annotation.Autowired;
  12. import org.springframework.stereotype.Component;
  13. @Slf4j
  14. @Component
  15. public class CourseMQListener {
  16. //对课程进行添加或更新的队列名
  17. public static final String QUEUE_COURSE_SAVE = "queue.course.save";
  18. //对课程进行删除的队列名
  19. public static final String QUEUE_COURSE_REMOVE = "queue.course.remove";
  20. //对课程进行添加或更新的路由键
  21. public static final String KEY_COURSE_SAVE = "key.course.save";
  22. //对课程进行删除的路由键
  23. public static final String KEY_COURSE_REMOVE = "key.course.remove";
  24. //课程交换机名
  25. public static final String COURSE_EXCHANGE = "edu.course.exchange";
  26. @Autowired
  27. private CourseIndexService courseIndexService;
  28. /**
  29. * 监听课程添加操作
  30. */
  31. @RabbitListener(bindings = {
  32. @QueueBinding(value = @Queue(value = QUEUE_COURSE_SAVE, durable = "true"),
  33. exchange = @Exchange(value = COURSE_EXCHANGE,type = ExchangeTypes.TOPIC,ignoreDeclarationExceptions = "true")
  34. , key = KEY_COURSE_SAVE)})
  35. public void receiveCourseSaveMessage(String message) {
  36. try {
  37. log.info("课程保存:{}",message);
  38. //将json转换为课程对象
  39. Course course = JSON.parseObject(message, Course.class);
  40. courseIndexService.saveCourse(course);
  41. } catch (Exception ex) {
  42. log.error("接收消息出现异常",ex);
  43. }
  44. }
  45. /**
  46. * 监听课程删除操作
  47. */
  48. @RabbitListener(bindings = {
  49. @QueueBinding(value = @Queue(value = QUEUE_COURSE_REMOVE, durable = "true"),
  50. exchange = @Exchange(value = COURSE_EXCHANGE,type = ExchangeTypes.TOPIC,ignoreDeclarationExceptions = "true"),
  51. key = KEY_COURSE_REMOVE)})
  52. public void receiveCourseDeleteMessage(Long id) {
  53. try {
  54. log.info("课程删除完成:{}",id);
  55. courseIndexService.removeCourse(String.valueOf(id));
  56. } catch (Exception ex) {
  57. log.error("接收消息出现异常",ex);
  58. }
  59. }
  60. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Li_阴宅/article/detail/830246
推荐阅读
相关标签
  

闽ICP备14008679号