当前位置:   article > 正文

RabbitMQ_mq的监视器

mq的监视器

1、什么是MQ

问题1:

系统的注册功能:注册到数据库中、发邮件、发短信

如果串行执行的话,需要耗时3秒

改进: 注册数据库成功,即可,

注册成功后,并行进行发邮件、发短信 ,仅需1秒

Redis数据缓存,它只供获取数据使用,不能发送数据,所以不能使用redis

问题2:

大量的写数据库的请求,如果请求直接到达数据库,将会把数据库写死?

改进:把写数据的请求都写到MQ中,消息的消费者按照队列的顺序逐一取出写入的数据,写入数据库中

流量削峰

某一时间段会有大量的数据发送,达到一定的峰值,针对于这种情况,我们就将消除峰值的操作,称之为,流量削峰

微服务之间的通讯方式:

1、调用

2、MQ

市面上比较火爆的几款MQ:

ActiveMQ,RocketMQ,Kafka,RabbitMQ。

  • 语言的支持:ActiveMQ,RocketMQ只支持Java语言,Kafka可以支持多们语言,RabbitMQ支持多种语言。

  • 效率方面:ActiveMQ,RocketMQ,Kafka效率都是毫秒级别,RabbitMQ是微秒级别的。

  • 消息丢失,消息重复问题: RabbitMQ针对消息的持久化,和重复问题都有比较成熟的解决方案。

  • 学习成本:RabbitMQ非常简单。

RabbitMQ是由Rabbit公司去研发和维护的,最终是在Pivotal。

RabbitMQ严格的遵循AMQP协议,高级消息队列协议,帮助我们在进程之间传递异步消息。


2、搭建环境

1、安装erang依赖环境,一路下一步即可

2、配置环境变量

新建环境变量ERLANG_HOME

编辑系统环境变量Path

3、安装rabbitmq软件,一路下一步即可

4、配置rabbitmq的环境变量

新建环境变量

5、启动RabbitMQ的监视器:

6、启动rabbitmq的服务

7、url地址中访问 localhost:15672

8、账号密码均为mgest


3、RabbitMQ的工作原理

  • Publisher - 生产者:发布消息到RabbitMQ中的Exchange

  • Consumer - 消费者:监听RabbitMQ中的Queue中的消息

  • Exchange - 交换机:和生产者建立连接并接收生产者的消息

  • Queue - 队列:Exchange会将消息分发到指定的Queue,Queue和消费者进行交互

  • Routes - 路由:交换机以什么样的策略将消息发布到Queue


4、RabbitMQ的通讯方式

Hello-World通讯方式

特点:生产环境无法使用

一个生产者、一个消费者、一个交换机、一个队列

1、新建生产者微服务

2、添加依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-web</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.cloud</groupId>
  8. <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
  9. </dependency>
  10. <dependency>
  11. <groupId>com.rabbitmq</groupId>
  12. <artifactId>amqp-client</artifactId>
  13. <version>5.6.0</version>
  14. </dependency>
  15. </dependencies>

3、创建启动程序

4、编写配置文件

  1. server:
  2. port: 8091
  3. spring:
  4. application:
  5. name: APPLYPUBLISHERSERVER
  6. eureka:
  7. client:
  8. service-url:
  9. defaultZone: http://root:root@localhost:8080/eureka

5、工具类,在pda-common

  1. package com.qf.pdacommon.util;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import java.io.IOException;
  5. import java.util.concurrent.TimeoutException;
  6. public class RabbitMQUtil {
  7. //声明连接工厂,static的
  8. private static ConnectionFactory factory = null;
  9. //在静态代码块中初始化工厂信息
  10. static{
  11. //创建工厂
  12. factory = new ConnectionFactory();
  13. //配置连接信息
  14. factory.setHost("127.0.0.1");
  15. //设置端口号
  16. factory.setPort(5672);
  17. // //以下可以省略,当自建的用户必须做出如下的配置
  18. // factory.setUsername("zhangsan"); //登陆账号
  19. // factory.setPassword("abc123"); //密码
  20. // factory.setVirtualHost("/pda"); //操作目录
  21. }
  22. //编写获取连接对象的方法
  23. public static Connection getConnection(){
  24. Connection connection=null;
  25. try {
  26. connection = factory.newConnection();
  27. } catch (IOException e) {
  28. e.printStackTrace();
  29. } catch (TimeoutException e) {
  30. e.printStackTrace();
  31. }
  32. return connection;
  33. }
  34. }

6、生产者写入消息到队列中

  1. package com.qf.applypublisher.service.impl;
  2. import com.qf.applypublisher.service.PublisherService;
  3. import com.qf.pdacommon.util.RabbitMQUtil;
  4. import com.rabbitmq.client.Channel;
  5. import com.rabbitmq.client.Connection;
  6. import org.springframework.stereotype.Service;
  7. import java.io.IOException;
  8. import java.util.concurrent.TimeoutException;
  9. @Service
  10. public class PublisherServiceImpl implements PublisherService {
  11. @Override
  12. public String addMq(String meg) {
  13. //在此将消息写入消息队列
  14. //1、获取RabbitMQ服务的连接
  15. Connection connection = RabbitMQUtil.getConnection();
  16. //通过连接对象获取管道
  17. Channel channel = null;
  18. String result ="";
  19. try {
  20. channel= connection.createChannel();
  21. //生产者把消息通过管道,写入队列中
  22. //参数1:""使用默认的交换机
  23. //参数2:队列的名字,也可以作为路由规则,【注意】在一个MQ服务中,队列的名字是唯一的
  24. //参数3:写入消息时,携带的附加信息,如果没有就传null
  25. //参数4:写入的消息
  26. channel.basicPublish("","apply",null,meg.getBytes());
  27. System.out.println("写入消息队列成功!");
  28. result="success";
  29. } catch (IOException e) {
  30. System.out.println("写入消息失败,发生有异常:"+e.getMessage());
  31. result = "failed";
  32. e.printStackTrace();
  33. }finally {
  34. try {
  35. channel.close();
  36. connection.close();
  37. } catch (IOException e) {
  38. e.printStackTrace();
  39. } catch (TimeoutException e) {
  40. e.printStackTrace();
  41. }
  42. }
  43. return result;
  44. }
  45. }

7、新建消息的消费者微服务

  1. package com.qf.applyconsumer.service.impl;
  2. import com.qf.applyconsumer.service.ConsumerService;
  3. import com.qf.pdacommon.util.RabbitMQUtil;
  4. import com.rabbitmq.client.*;
  5. import org.springframework.stereotype.Service;
  6. import java.io.IOException;
  7. import java.util.concurrent.TimeoutException;
  8. @Service
  9. public class ConsumerServiceImpl implements ConsumerService {
  10. @Override
  11. public String getMeg() {
  12. //到队列中获取消息
  13. //1、获取连接
  14. Connection connection = RabbitMQUtil.getConnection();
  15. //2、创建管道
  16. Channel channel = null;
  17. String result = "";
  18. try {
  19. channel = connection.createChannel();
  20. //定义队列
  21. //参数1:获取消息队列的名称
  22. //参数2:是否给队列中的消息进行持久化,true-进行持久化
  23. //参数3:当消费者的连接connection对象关闭后(消费者宕机后),是否删除队列中的消息
  24. //参数4:当队列中没有消费者消费的时候,是否自动删除队列
  25. //参数5:指定的队列的附加消息
  26. channel.queueDeclare("apply",true,false,false,null);
  27. //声明监听:监听会一直运行,只要监听到队列中有消息就去获取消息
  28. //通过handleDelivery方法获取消息
  29. DefaultConsumer consumer = new DefaultConsumer(channel){
  30. public void handleDelivery(String consumerTeg, Envelope envelope, AMQP.BasicProperties properties,byte[] body){
  31. //参数body就是从队列中获取到的消息
  32. //service-> dao 调用其他微服务消息处理
  33. String meg = new String(body);
  34. System.out.println("获取到的消息是" + meg);
  35. }
  36. };
  37. //自动消息确认:从队列里取出新消息后,自动通知队列,可以删除刚才取出的消息 ACK
  38. //参数1:队列的名字,告知哪个队列在取出消息后,就自动删除多列中的消息
  39. //参数2:true-取出消息后,队列就自动删除消息
  40. //参数3:监听对象
  41. channel.basicConsume("apply",true,consumer);
  42. System.out.println("消费者的监听程序开始工作......");
  43. System.in.read();
  44. result = "success";
  45. } catch (IOException e) {
  46. result = "faileld";
  47. e.printStackTrace();
  48. }finally {
  49. try {
  50. channel.close();
  51. connection.close();
  52. } catch (IOException e) {
  53. e.printStackTrace();
  54. } catch (TimeoutException e) {
  55. e.printStackTrace();
  56. }
  57. }
  58. return result;
  59. }
  60. }

 

 

Work工作队列通讯方式

特点:一个生产者、一个交换机、一个队列、多个消费者

新建了一个消费者:确保两个消费者从同一队列中获取消息

【注意】一个消息只能被消费一次,有一个消费者进行消费了,另一个就等到消费完了,删除之后才能取下一个

同时定义两个消费者,除了yml文件中的name名字不一样之外,其他地方一样,生产者不用变,队列名相同就可以

  1. package com.qf.applyconsumerwork2.service.impl;
  2. import com.qf.applyconsumerwork2.service.ConsumerWorkService;
  3. import com.qf.pdacommon.util.RabbitMQUtil;
  4. import com.rabbitmq.client.*;
  5. import org.springframework.stereotype.Service;
  6. import java.io.IOException;
  7. import java.util.concurrent.TimeoutException;
  8. @Service
  9. public class ConsumerWorkServiceImpl implements ConsumerWorkService {
  10. @Override
  11. public String getMeg() {
  12. //获取连接
  13. Connection connection = RabbitMQUtil.getConnection();
  14. //创建管道
  15. Channel channel = null;
  16. String result = "";
  17. try {
  18. channel = connection.createChannel();
  19. channel.queueDeclare("apply",true,false,false,null);
  20. //指定消费者的消费能力:一个消费者一次只能最多消费一个消息
  21. channel.basicQos(1);
  22. //定义channel1,接收外部类的channel,因为内部类中 外部类定义的channel不适用
  23. Channel channel1 = channel;
  24. DefaultConsumer consumer = new DefaultConsumer(channel){
  25. public void handleDelivery(String consumerTeg, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  26. String meg = new String(body);
  27. System.out.println("获取到的消息是:" + meg);
  28. //改为false只是队列不自动删除消息
  29. //需要我们手动删除
  30. //判断:消息处理成功了,手动通知队列删除消费者的消息
  31. channel1.basicAck(envelope.getDeliveryTag(), false);
  32. }
  33. };
  34. //改成手动确认,将true改成false
  35. //从队列里取出消息后,队列不删除取出的消息
  36. channel.basicConsume("apply",false,consumer);
  37. System.out.println("8094消费者的监听程序开始工作......");
  38. System.in.read();
  39. result = "Success";
  40. } catch (IOException e) {
  41. result = "failed";
  42. e.printStackTrace();
  43. }finally {
  44. try {
  45. channel.close();
  46. connection.close();
  47. } catch (IOException e) {
  48. e.printStackTrace();
  49. } catch (TimeoutException e) {
  50. e.printStackTrace();
  51. }
  52. }
  53. return result;
  54. }
  55. }

 

 

发布订阅通讯方式 --- 广播模式

特点:一个生产者、一个交换机、多个队列,每个队列对应多个消费者

定义的所有消费者每一个都会接收到消息

生产者

  1. //广播模式
  2. @Override
  3. public String addMqFanOut(String meg) {
  4. String result = "";
  5. Connection connection = RabbitMQUtil.getConnection();
  6. Channel channel = null;
  7. try {
  8. channel = connection.createChannel();
  9. //定义交换机的类型
  10. channel.exchangeDeclare("exchange2105", BuiltinExchangeType.FANOUT);
  11. //定义队列
  12. //参数1:消息队列的名字
  13. //参数2:交换机的名字,使用对用的交换机写入到队列里
  14. //路由器规则
  15. channel.queueBind("queue1","exchange2105","");
  16. channel.queueBind("queue2","exchange2105","");
  17. //写入到队列中
  18. channel.basicPublish("exchange2105","",null,meg.getBytes());
  19. result = "success";
  20. System.out.println("广播方式消息写入队列成功");
  21. } catch (IOException e) {
  22. e.printStackTrace();
  23. }finally {
  24. try {
  25. channel.close();
  26. connection.close();
  27. } catch (IOException e) {
  28. e.printStackTrace();
  29. } catch (TimeoutException e) {
  30. e.printStackTrace();
  31. }
  32. }
  33. return result;
  34. }
  35. }

消费者queue1

  1. package com.qf.applyconsumerwork.service.impl;
  2. import com.qf.applyconsumerwork.service.ConsumerWorkService;
  3. import com.qf.pdacommon.util.RabbitMQUtil;
  4. import com.rabbitmq.client.*;
  5. import org.springframework.stereotype.Service;
  6. import java.io.IOException;
  7. import java.util.concurrent.TimeoutException;
  8. @Service
  9. public class ConsumerWorkServiceImpl implements ConsumerWorkService {
  10. @Override
  11. public String getMeg() {
  12. //获取连接
  13. Connection connection = RabbitMQUtil.getConnection();
  14. //创建管道
  15. Channel channel = null;
  16. String result = "";
  17. try {
  18. channel = connection.createChannel();
  19. //queue1进行修改,和生产者一致
  20. channel.queueDeclare("queue1",true,false,false,null);
  21. //指定消费者的消费能力:一个消费者一次只能最多消费一个消息
  22. channel.basicQos(1);
  23. //定义channel1,接收外部类的channel,因为内部类中 外部类定义的channel不适用
  24. Channel channel1 = channel;
  25. DefaultConsumer consumer = new DefaultConsumer(channel){
  26. public void handleDelivery(String consumerTeg, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  27. String meg = new String(body);
  28. System.out.println("8093获取到的消息是:" + meg);
  29. //改为false只是队列不自动删除消息
  30. //需要我们手动删除
  31. //判断:消息处理成功了,手动通知队列删除消费者的消息
  32. channel1.basicAck(envelope.getDeliveryTag(), false);
  33. }
  34. };
  35. //改成手动确认,将true改成false
  36. //从队列里取出消息后,队列不删除取出的消息
  37. //queue1进行修改,和生产者一致
  38. channel.basicConsume("queue1",false,consumer);
  39. System.out.println("消费者的监听程序开始工作......");
  40. System.in.read();
  41. result = "Success";
  42. } catch (IOException e) {
  43. result = "failed";
  44. e.printStackTrace();
  45. }finally {
  46. try {
  47. channel.close();
  48. connection.close();
  49. } catch (IOException e) {
  50. e.printStackTrace();
  51. } catch (TimeoutException e) {
  52. e.printStackTrace();
  53. }
  54. }
  55. return result;
  56. }
  57. }

消费者queue2

  1. package com.qf.applyconsumerwork2.service.impl;
  2. import com.qf.applyconsumerwork2.service.ConsumerWorkService;
  3. import com.qf.pdacommon.util.RabbitMQUtil;
  4. import com.rabbitmq.client.*;
  5. import org.springframework.stereotype.Service;
  6. import java.io.IOException;
  7. import java.util.concurrent.TimeoutException;
  8. @Service
  9. public class ConsumerWorkServiceImpl implements ConsumerWorkService {
  10. @Override
  11. public String getMeg() {
  12. //获取连接
  13. Connection connection = RabbitMQUtil.getConnection();
  14. //创建管道
  15. Channel channel = null;
  16. String result = "";
  17. try {
  18. channel = connection.createChannel();
  19. //queue2进行修改,和生产者一致
  20. channel.queueDeclare("queue2",true,false,false,null);
  21. //指定消费者的消费能力:一个消费者一次只能最多消费一个消息
  22. channel.basicQos(1);
  23. //定义channel1,接收外部类的channel,因为内部类中 外部类定义的channel不适用
  24. Channel channel1 = channel;
  25. DefaultConsumer consumer = new DefaultConsumer(channel){
  26. public void handleDelivery(String consumerTeg, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  27. String meg = new String(body);
  28. System.out.println("获取到的消息是:" + meg);
  29. //改为false只是队列不自动删除消息
  30. //需要我们手动删除
  31. //判断:消息处理成功了,手动通知队列删除消费者的消息
  32. channel1.basicAck(envelope.getDeliveryTag(), false);
  33. }
  34. };
  35. //队列名字是消费者中定义的队列名字
  36. //queue2进行修改,和生产者一致
  37. channel.basicConsume("queue2",false,consumer);
  38. System.out.println("8094消费者的监听程序开始工作......");
  39. System.in.read();
  40. result = "Success";
  41. } catch (IOException e) {
  42. result = "failed";
  43. e.printStackTrace();
  44. }finally {
  45. try {
  46. channel.close();
  47. connection.close();
  48. } catch (IOException e) {
  49. e.printStackTrace();
  50. } catch (TimeoutException e) {
  51. e.printStackTrace();
  52. }
  53. }
  54. return result;
  55. }
  56. }

 

Routing路由通讯方式

生产者:交换机类型是Direct

               定义队列时指定了路由规则

生产者

  1. //路由方式
  2. @Override
  3. public String addMqDirect(String meg, String route) {
  4. Connection connection = RabbitMQUtil.getConnection();
  5. Channel channel = null;
  6. String result = "";
  7. try {
  8. channel = connection.createChannel();
  9. //定义交换机的类型
  10. channel.exchangeDeclare("exchange210502",BuiltinExchangeType.DIRECT);
  11. //绑定队列,同时指定路由规则
  12. channel.queueBind("queue3","exchange210502","goods");
  13. channel.queueBind("queue4","exchange210502","score");
  14. //写入消息
  15. channel.basicPublish("exchange210502",route,null,meg.getBytes());
  16. System.out.println("Direct消息写入成功");
  17. result = "success";
  18. } catch (IOException e) {
  19. e.printStackTrace();
  20. }
  21. return result;
  22. }

消费者queue3  ---- queue4中把3改为4

  1. package com.qf.applyconsumerwork.service.impl;
  2. import com.qf.applyconsumerwork.service.ConsumerWorkService;
  3. import com.qf.pdacommon.util.RabbitMQUtil;
  4. import com.rabbitmq.client.*;
  5. import org.springframework.stereotype.Service;
  6. import java.io.IOException;
  7. import java.util.concurrent.TimeoutException;
  8. @Service
  9. public class ConsumerWorkServiceImpl implements ConsumerWorkService {
  10. @Override
  11. public String getMeg() {
  12. //获取连接
  13. Connection connection = RabbitMQUtil.getConnection();
  14. //创建管道
  15. Channel channel = null;
  16. String result = "";
  17. try {
  18. channel = connection.createChannel();
  19. channel.queueDeclare("queue3",true,false,false,null);
  20. //指定消费者的消费能力:一个消费者一次只能最多消费一个消息
  21. channel.basicQos(1);
  22. //定义channel1,接收外部类的channel,因为内部类中 外部类定义的channel不适用
  23. Channel channel1 = channel;
  24. DefaultConsumer consumer = new DefaultConsumer(channel){
  25. public void handleDelivery(String consumerTeg, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  26. String meg = new String(body);
  27. System.out.println("8093获取到的消息是:" + meg);
  28. //改为false只是队列不自动删除消息
  29. //需要我们手动删除
  30. //判断:消息处理成功了,手动通知队列删除消费者的消息
  31. channel1.basicAck(envelope.getDeliveryTag(), false);
  32. }
  33. };
  34. //改成手动确认,将true改成false
  35. //从队列里取出消息后,队列不删除取出的消息
  36. channel.basicConsume("queue3",false,consumer);
  37. System.out.println("8093消费者的监听程序开始工作......");
  38. System.in.read();
  39. result = "Success";
  40. } catch (IOException e) {
  41. result = "failed";
  42. e.printStackTrace();
  43. }finally {
  44. try {
  45. channel.close();
  46. connection.close();
  47. } catch (IOException e) {
  48. e.printStackTrace();
  49. } catch (TimeoutException e) {
  50. e.printStackTrace();
  51. }
  52. }
  53. return result;
  54. }
  55. }

Topic通讯方式


 

生产者

  1. @Override
  2. public String addTopic(String meg, String route) {
  3. Connection connection = RabbitMQUtil.getConnection();
  4. Channel channel = null;
  5. String result = "";
  6. try {
  7. channel = connection.createChannel();
  8. //定义交换机的类型
  9. channel.exchangeDeclare("exchange210504",BuiltinExchangeType.TOPIC);
  10. //绑定队列:交换机按照什么路由规则,写入哪个队列
  11. channel.queueBind("queue5","exchange210504","liaoning.dalian.*");
  12. channel.queueBind("queue6","exchange210504","heilongjiang.#");
  13. //写入消息
  14. channel.basicPublish("exchange210504",route,null,meg.getBytes());
  15. System.out.println("Topic消息写入成功");
  16. result = "success";
  17. } catch (IOException e) {
  18. e.printStackTrace();
  19. }
  20. return result;
  21. }

消费者

  1. package com.qf.applyconsumerwork.service.impl;
  2. import com.qf.applyconsumerwork.service.ConsumerWorkService;
  3. import com.qf.pdacommon.util.RabbitMQUtil;
  4. import com.rabbitmq.client.*;
  5. import org.springframework.stereotype.Service;
  6. import java.io.IOException;
  7. import java.util.concurrent.TimeoutException;
  8. @Service
  9. public class ConsumerWorkServiceImpl implements ConsumerWorkService {
  10. @Override
  11. public String getMeg() {
  12. //获取连接
  13. Connection connection = RabbitMQUtil.getConnection();
  14. //创建管道
  15. Channel channel = null;
  16. String result = "";
  17. try {
  18. channel = connection.createChannel();
  19. channel.queueDeclare("queue5",true,false,false,null);
  20. //指定消费者的消费能力:一个消费者一次只能最多消费一个消息
  21. channel.basicQos(1);
  22. //定义channel1,接收外部类的channel,因为内部类中 外部类定义的channel不适用
  23. Channel channel1 = channel;
  24. DefaultConsumer consumer = new DefaultConsumer(channel){
  25. public void handleDelivery(String consumerTeg, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  26. String meg = new String(body);
  27. System.out.println("8093获取到的消息是:" + meg);
  28. //改为false只是队列不自动删除消息
  29. //需要我们手动删除
  30. //判断:消息处理成功了,手动通知队列删除消费者的消息
  31. channel1.basicAck(envelope.getDeliveryTag(), false);
  32. }
  33. };
  34. //改成手动确认,将true改成false
  35. //从队列里取出消息后,队列不删除取出的消息
  36. channel.basicConsume("queue5",false,consumer);
  37. System.out.println("8093消费者的监听程序开始工作......");
  38. System.in.read();
  39. result = "success";
  40. } catch (IOException e) {
  41. result = "failed";
  42. e.printStackTrace();
  43. }finally {
  44. try {
  45. channel.close();
  46. connection.close();
  47. } catch (IOException e) {
  48. e.printStackTrace();
  49. } catch (TimeoutException e) {
  50. e.printStackTrace();
  51. }
  52. }
  53. return result;
  54. }
  55. }

如果没有问题但是报错就看一下http://localhost:15672/#/exchanges

 更改交换机的名字


5、消息的可靠性---重要

从如下方面考虑可靠性:

1、生产者要把消息成功的写入交换机 ----- confirm机制

2、交换机要成功的把消息写入队列

3、队列要可靠 ---- 本身具有的持久化机制

4、确保消费者成功消费消息后,才把消息从队列中删除 --- 手动ACK

Confirm 普通消息确认机制

  1. @Override
  2. public String addMqTopic(String meg, String route) {
  3. String result="";
  4. Connection connection=RabbitMQUtil.getConnection();
  5. Channel channel=null;
  6. try {
  7. channel=connection.createChannel();
  8. //开启Confirm机制---确保生产者把消息成功的写入到交换机中
  9. channel.confirmSelect();
  10. //定义交换机
  11. channel.exchangeDeclare("exchange210503",BuiltinExchangeType.TOPIC);
  12. //绑定队列:交换机按照什么路由规则,写入哪个队列
  13. channel.queueBind("queue5","exchange210503","liaoning.dalian.*");
  14. channel.queueBind("queue6","exchange210503","heilongjiang.#");
  15. //写入队列
  16. channel.basicPublish("exchange210503",route,null,meg.getBytes());
  17. //判断消息写入交换机是否成功!
  18. if(channel.waitForConfirms()){
  19. result="写入交换机成功!";
  20. }else {
  21. result="写入交换机失败!";
  22. }
  23. } catch (IOException | InterruptedException e) {
  24. e.printStackTrace();
  25. } finally {
  26. try {
  27. channel.close();
  28. connection.close();
  29. } catch (IOException e) {
  30. e.printStackTrace();
  31. } catch (TimeoutException e) {
  32. e.printStackTrace();
  33. }
  34. }
  35. return result;
  36. }

批量消息确认机制(Confirm)

  1. @Override
  2. public String addMq(String meg) {
  3. //在此处将消息写入消息队列
  4. //1、获取RabbitMQ服务的连接
  5. Connection connection= RabbitMQUtil.getConnection();
  6. //2、通过连接对象获取管道
  7. Channel channel=null;
  8. String reslult="";
  9. try {
  10. channel=connection.createChannel();
  11. //开启消息确认机制
  12. channel.confirmSelect();
  13. //批量写入消息
  14. //生产者把消息通过管道,写入队列中
  15. //参数1:""使用默认的交换机
  16. //参数2:队列的名字,也可以作为路由规则,【注意】在一个MQ服务中,队列的名字是唯一的
  17. //参数3:写入消息时,携带的附加信息,如果没有就传null
  18. //参数4:写入的消息
  19. for (int i=1; i<=10; i++){
  20. meg=meg+i;
  21. channel.basicPublish("","apply",null,meg.getBytes());
  22. }
  23. //批量确认,如果有一个消息写入交换机失败,则之前写过的所有消息都失败,失败后会抛出异常
  24. channel.waitForConfirmsOrDie();
  25. System.out.println("写入消息队列成功!");
  26. reslult="批量将消息写入交换机成功!";
  27. } catch (IOException | InterruptedException e) {
  28. System.out.println("写入消息失败,发生有异常:"+e.getMessage());
  29. reslult="批量写入交换机失败!";
  30. e.printStackTrace();
  31. }finally {
  32. try {
  33. channel.close();
  34. connection.close();
  35. } catch (IOException e) {
  36. e.printStackTrace();
  37. } catch (TimeoutException e) {
  38. e.printStackTrace();
  39. }
  40. }
  41. return reslult;
  42. }

异步Confirm消息确认

  1. @Override
  2. public String addMqFanOut(String meg){
  3. String result="";
  4. Connection connection=RabbitMQUtil.getConnection();
  5. Channel channel=null;
  6. try {
  7. channel=connection.createChannel();
  8. //开启Confirm消息确认机制
  9. channel.confirmSelect();
  10. //定义交换机的类型
  11. channel.exchangeDeclare("exchange2105", BuiltinExchangeType.FANOUT);
  12. //定义队列
  13. //参数1:队列名字,不能重复
  14. //参数2:交换机名字,使用对应没名字的交换机写入到该队列中
  15. channel.queueBind("queue1","exchange2105","");
  16. channel.queueBind("queue2","exchange2105","");
  17. //写入到队列中
  18. channel.basicPublish("exchange2105","",null,meg.getBytes());
  19. //异步消息确认
  20. channel.addConfirmListener(new ConfirmListener() {
  21. @Override
  22. public void handleAck(long l, boolean b) throws IOException {
  23. //消息写入成功时,会调用
  24. System.out.println("写入成功!消息的标识:"+l+",是否为批量?"+b);
  25. }
  26. @Override
  27. public void handleNack(long l, boolean b) throws IOException {
  28. //消息写入失败时回调
  29. System.out.println("写入失败!消息的标识:"+l+",是否为批量?"+b);
  30. }
  31. });
  32. result="success";
  33. System.out.println("发布、订阅方式将消息写入队列成功!");
  34. } catch (IOException e) {
  35. e.printStackTrace();
  36. } finally {
  37. try {
  38. channel.close();
  39. connection.close();
  40. } catch (IOException e) {
  41. e.printStackTrace();
  42. } catch (TimeoutException e) {
  43. e.printStackTrace();
  44. }
  45. }
  46. return null;
  47. }

Return机制---确保消息从交换机写入队列成功


6、避免消息的重复消费

产生的原因:没有及时 进行手动ACK

解决思路:

1、在Redis中设置一个值 set mqKey 0 0---没有程序去消费这个消息

2、消费者在到队列中取消息之前,先读取mqKey

如果读取到的是0,则允许读取消息,否则不允许

3、在读取过程中,设置为1 1---此消息正在被某个消费者消费        

当一个消费者进入消息队列之后,将其key值进行改变,正常情况之下,在执行完流程,将信息存储进数据库,然后正常出队列之后进行删除,将key改回去,但是有一种情况,在没有执行完就离开队列key没有改回去,这种情况下,谁都进不来,产生了死锁现象

解决的办法就是设置有效时间

  1. public void getMeg() {
  2. //到队列中获取消息
  3. //1、获取连接
  4. Connection connection= RabbitMQUtil.getConnection();
  5. //2、创建管道
  6. Channel channel=null;
  7. String result="";
  8. try {
  9. channel=connection.createChannel();
  10. //定义队列
  11. //参数1:获取消息的队列的名称
  12. //参数2:是否给队列中的消息进行持久化 true-进行持久化
  13. //参数3:当消费者的连接connection对象关闭后(消费者宕机后),是否删除队列中的消息
  14. //参数4:当队列中没有消费者消费的时候,是否自动删除队列
  15. //参数5:指定的队列的附加消息
  16. channel.queueDeclare("queue5",true,false,false,null);
  17. //指定消费者的消费能力:一个消费者一次只能最多消费一个消息
  18. channel.basicQos(1);
  19. //声明监听:监听会一直运行,只要监听到队列中有消息,就去获取消息
  20. //通过handleDelivery方法获取消息
  21. Channel channel1=channel;
  22. DefaultConsumer consumer=new DefaultConsumer(channel1){
  23. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  24. //获取Redis的连接
  25. Jedis jedis= JedisUtil.getJedis();
  26. if(jedis!=null){
  27. jedis.select(5);
  28. //读取锁里的信息
  29. String redisLock=jedis.get("mqKey");
  30. if(redisLock==null){
  31. //设置只当时间,时间到自动将key改回去,赋值
  32. jedis.setex("mqKey",10,"1");
  33. //参数body就是从队列中获取到的消息
  34. //service->dao 调用其他微服务处理消息
  35. String meg=new String(body);
  36. System.out.println("1号消费者获取到消息:"+meg);
  37. //判断,消息处理成功了,手动通知队列删除消费的消息
  38. channel1.basicAck(envelope.getDeliveryTag(),false);
  39. //消费完成之后,设置为0
  40. jedis.set("mqKey","0");
  41. }else{
  42. int intRedisLock=Integer.parseInt(redisLock);
  43. if(intRedisLock==0){
  44. //赋值
  45. jedis.setex("mqKey",10,"1");
  46. //参数body就是从队列中获取到的消息
  47. //service->dao 调用其他微服务处理消息
  48. String meg=new String(body);
  49. System.out.println("1号消费者获取到消息:"+meg);
  50. //判断,消息处理成功了,手动通知队列删除消费的消息
  51. channel1.basicAck(envelope.getDeliveryTag(),false);
  52. //消费完成之后,设置为0
  53. jedis.set("mqKey","0");
  54. }
  55. }
  56. }
  57. }
  58. };

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/秋刀鱼在做梦/article/detail/892557
推荐阅读
相关标签
  

闽ICP备14008679号