赞
踩
RabbitMq是一种主流的消息队列,消息队列(Message Queue)是一种消息的容器,主要用于实现程序(服务、进程、线程)之间的通信;队列是 FIFO(先进先出)的数据结构
一般我们在做微服务项目时,会用feign来进行RPC远程调用,这样如果在一段逻辑代码中多次调用RPC,会比较浪费时间,因为是同步的,并且改一处逻辑,很多地方都要改,耦合性较强。因此使用RabbitMq这个消息中间件,来实现远程调用,主要作用是解耦、削峰、异步。
1、首先安装erlang
2、安装rabbitmq
3、在安装的bin目录下打开cmd启动插件使能
rabbitmq-plugins enable rabbitmq_management
4、启动rabbitmq
net start rabbitmq
5、浏览器输入: http://localhost:15672 账号密码都是guest
Linux的安装见
https://blog.csdn.net/qq_45502336/article/details/118699251
(1)一对一模式:生产者和消费者间只有一个队列,一个生产者发送消息到一个队列,一个消费者从队列中取消息。
(2)一对多模式(工作队列):生产者将消息分发给多个消费者,如果生产者生产了100条消息,消费者1消费50条,消费者2消费50条。
扩展:工作队列的能者多劳模式:
因为队列默认采用是自动确认机制,消息发过去后就自动确认,队列不清楚每个消息具体什么时间处理完,所以平均分配消息数量。
实现能者多劳:
channel.basicQos(1);限制队列一次发一个消息给消费者,等消费者有了反馈,再发下一条
channel.basicAck 消费完消息后手动反馈,处理快的消费者就能处理更多消息
basicConsume 中的参数改为false
(3)发布/订阅模式
发布/订阅模式和Work模式的区别是:Work模式只存在一个队列,多个消费者共同消费一个队列中的消息;而发布订阅模式存在多个队列,不同的消费者可以从各自的队列中处理完全相同的消息。 实现步骤:
1) 创建交换机(Exchange)类型是fanout(扇出)
2) 交换机需要绑定不同的队列
3) 不同的消费者从不同的队列中获得消息
4) 生产者发送消息到交换机
5) 再由交换机将消息分发到多个队列
(4)路由模式
路由模式的消息队列可以给队列绑定不同的key,生产者发送消息时,给消息设置不同的key,这样交换机在分发消息时,可以让消息路由到key匹配的队列中。
(5)主题模式
主题模式和路由模式差不多,在key中可以加入通配符:
* 匹配任意一个单词
# 匹配.号隔开的多个单词
Canal是阿里巴巴的数据同步工具,最初主要为了应对杭州和美国的双机房部署问题,目前也是国内互联网企业经常使用的数据增量同步解决方案。
canal将自己伪装为MySQL的slave,向master发送dump协议
master收到dump协议,数据发生修改后推送binary log给canal
canal解析binary log对象,转换为增量数据,同步到ES、Redis等
首先要让mysql开启binlog模式
1、进入mysql查看是否启动binlog
SHOW VARIABLES LIKE '%log_bin%'
log_bin为ON表示启动,为OFF则未启动,需要修改mysql配置文件启动log_bin
windows配置文件是MySQL安装目录的my.in
修改:
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1
2、创建用户
进入mysql
- create user canal@'%'IDENTIFIED WITH mysql_native_password BY 'canal';
- GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
- FLUSH PRIVILEGES;
Windows上直接解压.gz的压缩包,在bin目录中编辑startup.bat,按图将下面的配置改为这样
直接运行即可
(1)上传文件到Linux,解压到canal目录中
- cd /usr/local
- mkdir canal
- tar -vxf canal.deployer-1.1.4.tar.gz -C canal
(2)配置canal
进入mysql,输入命令,记录文件名和位置 (我连接的是本机的mysql,不是虚拟机的)
show master status;
(3)进入canal目录,修改配置文件
vi conf/example/instance.properties
(4)启动Canal
进入bin目录启动服务
./startup.sh
查看日志
cat /usr/local/canal/logs/canal/canal.log
这样就说明启动成功了
以实际业务需求为例
给生产者服务添加依赖
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.xpand</groupId>
- <artifactId>starter-canal</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- </dependency>
canal那个依赖是引入的第三方库,因为第三方库比官方的简洁好用,代码量少,因此使用第三方库,需要导入第三方maven,首先从GitHub上克隆
https://github.com/chenqian56131/spring-boot-starter-canal
在maven上添加这个pom文件即可
添加配置文件(连接RabbitMq)
- //连接RabbitMq
- spring:
- rabbitmq:
- host: localhost
- port: 5672
- username: admin //rabbitmq的用户名(这是我新建的用户)
- password: 123456 //Rabbitmq的密码 (新建用户的密码)
- virtual-host: myhost //Rabbitmq创建虚拟主机的名字
- //连接canal
- canal:
- client:
- instances:
- example:
- host: 192.168.160.136 //canal所在的ip,我这里是虚拟机ip
- port: 11111 //这是默认端口,可以在配置文件该
- batchSize: 1000
生产者的配置类(定义交换机、队列、绑定方式等)
- /**
- * RabbitMQ的配置
- */
- @Configuration
- public class RabbitMQConfig {
-
- public static final String QUEUE_COURSE_SAVE = "queue.course.save";
- public static final String QUEUE_COURSE_REMOVE = "queue.course.remove";
- public static final String KEY_COURSE_SAVE = "key.course.save";
- public static final String KEY_COURSE_REMOVE = "key.course.remove";
- public static final String COURSE_EXCHANGE = "edu.course.exchange";
-
- @Bean
- public Queue queueCourseSave() {
- return new Queue(QUEUE_COURSE_SAVE);
- }
-
- @Bean
- public Queue queueCourseRemove() {
- return new Queue(QUEUE_COURSE_REMOVE);
- }
-
- @Bean
- public TopicExchange topicExchange() {
- return new TopicExchange(COURSE_EXCHANGE);
- }
-
- @Bean
- public Binding bindCourseSave() {
- return BindingBuilder.bind(queueCourseSave()).to(topicExchange()).with(KEY_COURSE_SAVE);
- }
-
- @Bean
- public Binding bindCourseRemove() {
- return BindingBuilder.bind(queueCourseRemove()).to(topicExchange()).with(KEY_COURSE_REMOVE);
- }
- }
canal实现监听(数据库的同步)
- package com.blb.educourseservice.listener;
-
- import com.alibaba.fastjson.JSON;
- import com.alibaba.otter.canal.protocol.CanalEntry;
- import com.blb.common.entity.Course;
- import com.blb.educourseservice.config.RabbitMQConfig;
- import com.blb.educourseservice.service.CourseService;
- import com.xpand.starter.canal.annotation.CanalEventListener;
- import com.xpand.starter.canal.annotation.ListenPoint;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
-
- /**
- * 事件监听器
- */
- @CanalEventListener
- public class CanalListener {
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- @Autowired
- private CourseService courseService;
-
- /**
- * 监听 edu_course数据库的course表
- */
- @ListenPoint(schema = "edu_course",table = "course")
- public void handleCourseUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData){
- //判断操作的类型
- if("DELETE".equals(eventType.name())){
- //遍历删除前数据行的每一列
- rowData.getBeforeColumnsList().forEach(column -> {
- //获得删除前的ID
- if("id".equals(column.getName())){
- //发删除消息给处理删除的队列
- rabbitTemplate.convertAndSend(RabbitMQConfig.COURSE_EXCHANGE,RabbitMQConfig.KEY_COURSE_REMOVE,Long.valueOf(column.getValue()));
- return;
- }
- });
- }else if("INSERT".equals(eventType.name()) || "UPDATE".equals(eventType.name())){
- //获得插入或更新后的数据
- rowData.getAfterColumnsList().forEach(column -> {
- if("id".equals(column.getName())){
- //通过id查询课程的完整信息
- Course course = courseService.getCourseById(Long.valueOf(column.getValue()));
- //包装到Course对象中,转换为JSON
- String json = JSON.toJSONString(course);
- //发送给添加或更新队列
- rabbitTemplate.convertAndSend(RabbitMQConfig.COURSE_EXCHANGE,RabbitMQConfig.KEY_COURSE_SAVE,json);
- }
- });
- }
- }
- }
启动类加上注解
@EnableCanalClient
消费者添加依赖
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
添加配置
- spring:
- cloud:
- nacos:
- discovery:
- server-addr: 127.0.0.1:8848
- config:
- file-extension: yaml
- server-addr: 127.0.0.1:8848
- application:
- name: edu-search-service
- elasticsearch:
- rest:
- uris: 127.0.0.1:9200
- rabbitmq:
- virtual-host: myhost
- host: 127.0.0.1
- username: wjm
- password: 123456
- port: 5672
- server:
- port: 8001
添加监听器
- package com.blb.edusearchservice.listener;
-
- import com.alibaba.fastjson.JSON;
- import com.blb.common.entity.Course;
- import com.blb.edusearchservice.service.CourseIndexService;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.ExchangeTypes;
- import org.springframework.amqp.rabbit.annotation.Exchange;
- import org.springframework.amqp.rabbit.annotation.Queue;
- import org.springframework.amqp.rabbit.annotation.QueueBinding;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- @Slf4j
- @Component
- public class CourseMQListener {
-
- //对课程进行添加或更新的队列名
- public static final String QUEUE_COURSE_SAVE = "queue.course.save";
- //对课程进行删除的队列名
- public static final String QUEUE_COURSE_REMOVE = "queue.course.remove";
- //对课程进行添加或更新的路由键
- public static final String KEY_COURSE_SAVE = "key.course.save";
- //对课程进行删除的路由键
- public static final String KEY_COURSE_REMOVE = "key.course.remove";
- //课程交换机名
- public static final String COURSE_EXCHANGE = "edu.course.exchange";
-
- @Autowired
- private CourseIndexService courseIndexService;
-
- /**
- * 监听课程添加操作
- */
- @RabbitListener(bindings = {
- @QueueBinding(value = @Queue(value = QUEUE_COURSE_SAVE, durable = "true"),
- exchange = @Exchange(value = COURSE_EXCHANGE,type = ExchangeTypes.TOPIC,ignoreDeclarationExceptions = "true")
- , key = KEY_COURSE_SAVE)})
- public void receiveCourseSaveMessage(String message) {
- try {
- log.info("课程保存:{}",message);
- //将json转换为课程对象
- Course course = JSON.parseObject(message, Course.class);
- courseIndexService.saveCourse(course);
- } catch (Exception ex) {
- log.error("接收消息出现异常",ex);
- }
- }
-
- /**
- * 监听课程删除操作
- */
- @RabbitListener(bindings = {
- @QueueBinding(value = @Queue(value = QUEUE_COURSE_REMOVE, durable = "true"),
- exchange = @Exchange(value = COURSE_EXCHANGE,type = ExchangeTypes.TOPIC,ignoreDeclarationExceptions = "true"),
- key = KEY_COURSE_REMOVE)})
- public void receiveCourseDeleteMessage(Long id) {
- try {
- log.info("课程删除完成:{}",id);
- courseIndexService.removeCourse(String.valueOf(id));
- } catch (Exception ex) {
- log.error("接收消息出现异常",ex);
- }
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。