赞
踩
系统的注册功能:注册到数据库中、发邮件、发短信
如果串行执行的话,需要耗时3秒
改进: 注册数据库成功,即可,
注册成功后,并行进行发邮件、发短信 ,仅需1秒
Redis数据缓存,它只供获取数据使用,不能发送数据,所以不能使用redis
大量的写数据库的请求,如果请求直接到达数据库,将会把数据库写死?
改进:把写数据的请求都写到MQ中,消息的消费者按照队列的顺序逐一取出写入的数据,写入数据库中
某一时间段会有大量的数据发送,达到一定的峰值,针对于这种情况,我们就将消除峰值的操作,称之为,流量削峰
1、调用
2、MQ
ActiveMQ,RocketMQ,Kafka,RabbitMQ。
语言的支持:ActiveMQ,RocketMQ只支持Java语言,Kafka可以支持多们语言,RabbitMQ支持多种语言。
效率方面:ActiveMQ,RocketMQ,Kafka效率都是毫秒级别,RabbitMQ是微秒级别的。
消息丢失,消息重复问题: RabbitMQ针对消息的持久化,和重复问题都有比较成熟的解决方案。
学习成本:RabbitMQ非常简单。
RabbitMQ是由Rabbit公司去研发和维护的,最终是在Pivotal。
RabbitMQ严格的遵循AMQP协议,高级消息队列协议,帮助我们在进程之间传递异步消息。
新建环境变量ERLANG_HOME
编辑系统环境变量Path
新建环境变量
Publisher - 生产者:发布消息到RabbitMQ中的Exchange
Consumer - 消费者:监听RabbitMQ中的Queue中的消息
Exchange - 交换机:和生产者建立连接并接收生产者的消息
Queue - 队列:Exchange会将消息分发到指定的Queue,Queue和消费者进行交互
Routes - 路由:交换机以什么样的策略将消息发布到Queue
特点:生产环境无法使用
一个生产者、一个消费者、一个交换机、一个队列
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
- </dependency>
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>5.6.0</version>
- </dependency>
- </dependencies>
- server:
- port: 8091
- spring:
- application:
- name: APPLYPUBLISHERSERVER
- eureka:
- client:
- service-url:
- defaultZone: http://root:root@localhost:8080/eureka
- package com.qf.pdacommon.util;
-
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- public class RabbitMQUtil {
- //声明连接工厂,static的
- private static ConnectionFactory factory = null;
- //在静态代码块中初始化工厂信息
- static{
- //创建工厂
- factory = new ConnectionFactory();
- //配置连接信息
- factory.setHost("127.0.0.1");
- //设置端口号
- factory.setPort(5672);
- // //以下可以省略,当自建的用户必须做出如下的配置
- // factory.setUsername("zhangsan"); //登陆账号
- // factory.setPassword("abc123"); //密码
- // factory.setVirtualHost("/pda"); //操作目录
- }
-
- //编写获取连接对象的方法
- public static Connection getConnection(){
- Connection connection=null;
- try {
- connection = factory.newConnection();
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- return connection;
- }
- }

- package com.qf.applypublisher.service.impl;
-
- import com.qf.applypublisher.service.PublisherService;
- import com.qf.pdacommon.util.RabbitMQUtil;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import org.springframework.stereotype.Service;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- @Service
- public class PublisherServiceImpl implements PublisherService {
-
-
- @Override
- public String addMq(String meg) {
- //在此将消息写入消息队列
- //1、获取RabbitMQ服务的连接
- Connection connection = RabbitMQUtil.getConnection();
- //通过连接对象获取管道
- Channel channel = null;
- String result ="";
-
- try {
- channel= connection.createChannel();
-
- //生产者把消息通过管道,写入队列中
- //参数1:""使用默认的交换机
- //参数2:队列的名字,也可以作为路由规则,【注意】在一个MQ服务中,队列的名字是唯一的
- //参数3:写入消息时,携带的附加信息,如果没有就传null
- //参数4:写入的消息
- channel.basicPublish("","apply",null,meg.getBytes());
- System.out.println("写入消息队列成功!");
- result="success";
- } catch (IOException e) {
- System.out.println("写入消息失败,发生有异常:"+e.getMessage());
- result = "failed";
- e.printStackTrace();
- }finally {
- try {
- channel.close();
- connection.close();
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
-
- }
-
- return result;
- }
-
-
- }

- package com.qf.applyconsumer.service.impl;
-
- import com.qf.applyconsumer.service.ConsumerService;
- import com.qf.pdacommon.util.RabbitMQUtil;
- import com.rabbitmq.client.*;
- import org.springframework.stereotype.Service;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- @Service
- public class ConsumerServiceImpl implements ConsumerService {
- @Override
- public String getMeg() {
- //到队列中获取消息
- //1、获取连接
- Connection connection = RabbitMQUtil.getConnection();
- //2、创建管道
- Channel channel = null;
- String result = "";
- try {
- channel = connection.createChannel();
-
- //定义队列
- //参数1:获取消息队列的名称
- //参数2:是否给队列中的消息进行持久化,true-进行持久化
- //参数3:当消费者的连接connection对象关闭后(消费者宕机后),是否删除队列中的消息
- //参数4:当队列中没有消费者消费的时候,是否自动删除队列
- //参数5:指定的队列的附加消息
- channel.queueDeclare("apply",true,false,false,null);
- //声明监听:监听会一直运行,只要监听到队列中有消息就去获取消息
- //通过handleDelivery方法获取消息
- DefaultConsumer consumer = new DefaultConsumer(channel){
- public void handleDelivery(String consumerTeg, Envelope envelope, AMQP.BasicProperties properties,byte[] body){
- //参数body就是从队列中获取到的消息
- //service-> dao 调用其他微服务消息处理
- String meg = new String(body);
- System.out.println("获取到的消息是" + meg);
- }
- };
-
- //自动消息确认:从队列里取出新消息后,自动通知队列,可以删除刚才取出的消息 ACK
- //参数1:队列的名字,告知哪个队列在取出消息后,就自动删除多列中的消息
- //参数2:true-取出消息后,队列就自动删除消息
- //参数3:监听对象
- channel.basicConsume("apply",true,consumer);
- System.out.println("消费者的监听程序开始工作......");
- System.in.read();
- result = "success";
- } catch (IOException e) {
- result = "faileld";
- e.printStackTrace();
- }finally {
- try {
- channel.close();
- connection.close();
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- }
- return result;
- }
- }

特点:一个生产者、一个交换机、一个队列、多个消费者
新建了一个消费者:确保两个消费者从同一队列中获取消息
【注意】一个消息只能被消费一次,有一个消费者进行消费了,另一个就等到消费完了,删除之后才能取下一个
同时定义两个消费者,除了yml文件中的name名字不一样之外,其他地方一样,生产者不用变,队列名相同就可以
- package com.qf.applyconsumerwork2.service.impl;
-
- import com.qf.applyconsumerwork2.service.ConsumerWorkService;
- import com.qf.pdacommon.util.RabbitMQUtil;
- import com.rabbitmq.client.*;
- import org.springframework.stereotype.Service;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- @Service
- public class ConsumerWorkServiceImpl implements ConsumerWorkService {
- @Override
- public String getMeg() {
-
- //获取连接
- Connection connection = RabbitMQUtil.getConnection();
- //创建管道
- Channel channel = null;
- String result = "";
- try {
- channel = connection.createChannel();
- channel.queueDeclare("apply",true,false,false,null);
-
- //指定消费者的消费能力:一个消费者一次只能最多消费一个消息
- channel.basicQos(1);
-
- //定义channel1,接收外部类的channel,因为内部类中 外部类定义的channel不适用
- Channel channel1 = channel;
- DefaultConsumer consumer = new DefaultConsumer(channel){
- public void handleDelivery(String consumerTeg, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- String meg = new String(body);
- System.out.println("获取到的消息是:" + meg);
- //改为false只是队列不自动删除消息
- //需要我们手动删除
- //判断:消息处理成功了,手动通知队列删除消费者的消息
- channel1.basicAck(envelope.getDeliveryTag(), false);
- }
- };
- //改成手动确认,将true改成false
- //从队列里取出消息后,队列不删除取出的消息
- channel.basicConsume("apply",false,consumer);
- System.out.println("8094消费者的监听程序开始工作......");
- System.in.read();
- result = "Success";
-
- } catch (IOException e) {
- result = "failed";
- e.printStackTrace();
- }finally {
- try {
- channel.close();
- connection.close();
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- }
-
-
- return result;
- }
- }

特点:一个生产者、一个交换机、多个队列,每个队列对应多个消费者
定义的所有消费者每一个都会接收到消息
生产者
- //广播模式
- @Override
- public String addMqFanOut(String meg) {
- String result = "";
- Connection connection = RabbitMQUtil.getConnection();
- Channel channel = null;
- try {
- channel = connection.createChannel();
- //定义交换机的类型
- channel.exchangeDeclare("exchange2105", BuiltinExchangeType.FANOUT);
- //定义队列
- //参数1:消息队列的名字
- //参数2:交换机的名字,使用对用的交换机写入到队列里
- //路由器规则
- channel.queueBind("queue1","exchange2105","");
- channel.queueBind("queue2","exchange2105","");
-
- //写入到队列中
- channel.basicPublish("exchange2105","",null,meg.getBytes());
- result = "success";
- System.out.println("广播方式消息写入队列成功");
- } catch (IOException e) {
- e.printStackTrace();
- }finally {
- try {
- channel.close();
- connection.close();
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
-
- }
- return result;
- }
-
-
- }

消费者queue1
- package com.qf.applyconsumerwork.service.impl;
-
- import com.qf.applyconsumerwork.service.ConsumerWorkService;
- import com.qf.pdacommon.util.RabbitMQUtil;
- import com.rabbitmq.client.*;
- import org.springframework.stereotype.Service;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- @Service
- public class ConsumerWorkServiceImpl implements ConsumerWorkService {
- @Override
- public String getMeg() {
-
- //获取连接
- Connection connection = RabbitMQUtil.getConnection();
- //创建管道
- Channel channel = null;
- String result = "";
- try {
- channel = connection.createChannel();
- //queue1进行修改,和生产者一致
- channel.queueDeclare("queue1",true,false,false,null);
-
- //指定消费者的消费能力:一个消费者一次只能最多消费一个消息
- channel.basicQos(1);
-
- //定义channel1,接收外部类的channel,因为内部类中 外部类定义的channel不适用
- Channel channel1 = channel;
- DefaultConsumer consumer = new DefaultConsumer(channel){
- public void handleDelivery(String consumerTeg, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- String meg = new String(body);
- System.out.println("8093获取到的消息是:" + meg);
- //改为false只是队列不自动删除消息
- //需要我们手动删除
- //判断:消息处理成功了,手动通知队列删除消费者的消息
- channel1.basicAck(envelope.getDeliveryTag(), false);
- }
- };
- //改成手动确认,将true改成false
- //从队列里取出消息后,队列不删除取出的消息
- //queue1进行修改,和生产者一致
- channel.basicConsume("queue1",false,consumer);
- System.out.println("消费者的监听程序开始工作......");
- System.in.read();
- result = "Success";
-
- } catch (IOException e) {
- result = "failed";
- e.printStackTrace();
- }finally {
- try {
- channel.close();
- connection.close();
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- }
-
-
- return result;
- }
- }

消费者queue2
- package com.qf.applyconsumerwork2.service.impl;
-
- import com.qf.applyconsumerwork2.service.ConsumerWorkService;
- import com.qf.pdacommon.util.RabbitMQUtil;
- import com.rabbitmq.client.*;
- import org.springframework.stereotype.Service;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- @Service
- public class ConsumerWorkServiceImpl implements ConsumerWorkService {
- @Override
- public String getMeg() {
-
- //获取连接
- Connection connection = RabbitMQUtil.getConnection();
- //创建管道
- Channel channel = null;
- String result = "";
- try {
- channel = connection.createChannel();
- //queue2进行修改,和生产者一致
- channel.queueDeclare("queue2",true,false,false,null);
-
- //指定消费者的消费能力:一个消费者一次只能最多消费一个消息
- channel.basicQos(1);
-
- //定义channel1,接收外部类的channel,因为内部类中 外部类定义的channel不适用
- Channel channel1 = channel;
- DefaultConsumer consumer = new DefaultConsumer(channel){
- public void handleDelivery(String consumerTeg, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- String meg = new String(body);
- System.out.println("获取到的消息是:" + meg);
- //改为false只是队列不自动删除消息
- //需要我们手动删除
- //判断:消息处理成功了,手动通知队列删除消费者的消息
- channel1.basicAck(envelope.getDeliveryTag(), false);
- }
- };
-
- //队列名字是消费者中定义的队列名字
- //queue2进行修改,和生产者一致
- channel.basicConsume("queue2",false,consumer);
- System.out.println("8094消费者的监听程序开始工作......");
- System.in.read();
- result = "Success";
-
- } catch (IOException e) {
- result = "failed";
- e.printStackTrace();
- }finally {
- try {
- channel.close();
- connection.close();
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- }
-
-
- return result;
- }
- }

生产者:交换机类型是Direct
定义队列时指定了路由规则
生产者
- //路由方式
- @Override
- public String addMqDirect(String meg, String route) {
- Connection connection = RabbitMQUtil.getConnection();
- Channel channel = null;
- String result = "";
- try {
- channel = connection.createChannel();
-
- //定义交换机的类型
- channel.exchangeDeclare("exchange210502",BuiltinExchangeType.DIRECT);
- //绑定队列,同时指定路由规则
- channel.queueBind("queue3","exchange210502","goods");
- channel.queueBind("queue4","exchange210502","score");
-
- //写入消息
- channel.basicPublish("exchange210502",route,null,meg.getBytes());
- System.out.println("Direct消息写入成功");
- result = "success";
- } catch (IOException e) {
- e.printStackTrace();
- }
- return result;
- }

消费者queue3 ---- queue4中把3改为4
- package com.qf.applyconsumerwork.service.impl;
-
- import com.qf.applyconsumerwork.service.ConsumerWorkService;
- import com.qf.pdacommon.util.RabbitMQUtil;
- import com.rabbitmq.client.*;
- import org.springframework.stereotype.Service;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- @Service
- public class ConsumerWorkServiceImpl implements ConsumerWorkService {
- @Override
- public String getMeg() {
-
- //获取连接
- Connection connection = RabbitMQUtil.getConnection();
- //创建管道
- Channel channel = null;
- String result = "";
- try {
- channel = connection.createChannel();
- channel.queueDeclare("queue3",true,false,false,null);
-
- //指定消费者的消费能力:一个消费者一次只能最多消费一个消息
- channel.basicQos(1);
-
- //定义channel1,接收外部类的channel,因为内部类中 外部类定义的channel不适用
- Channel channel1 = channel;
- DefaultConsumer consumer = new DefaultConsumer(channel){
- public void handleDelivery(String consumerTeg, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- String meg = new String(body);
- System.out.println("8093获取到的消息是:" + meg);
- //改为false只是队列不自动删除消息
- //需要我们手动删除
- //判断:消息处理成功了,手动通知队列删除消费者的消息
- channel1.basicAck(envelope.getDeliveryTag(), false);
- }
- };
- //改成手动确认,将true改成false
- //从队列里取出消息后,队列不删除取出的消息
- channel.basicConsume("queue3",false,consumer);
- System.out.println("8093消费者的监听程序开始工作......");
- System.in.read();
- result = "Success";
-
- } catch (IOException e) {
- result = "failed";
- e.printStackTrace();
- }finally {
- try {
- channel.close();
- connection.close();
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- }
-
-
- return result;
- }
- }

生产者
- @Override
- public String addTopic(String meg, String route) {
- Connection connection = RabbitMQUtil.getConnection();
- Channel channel = null;
- String result = "";
-
- try {
- channel = connection.createChannel();
-
- //定义交换机的类型
- channel.exchangeDeclare("exchange210504",BuiltinExchangeType.TOPIC);
-
- //绑定队列:交换机按照什么路由规则,写入哪个队列
- channel.queueBind("queue5","exchange210504","liaoning.dalian.*");
- channel.queueBind("queue6","exchange210504","heilongjiang.#");
-
- //写入消息
- channel.basicPublish("exchange210504",route,null,meg.getBytes());
- System.out.println("Topic消息写入成功");
- result = "success";
- } catch (IOException e) {
- e.printStackTrace();
- }
- return result;
- }

消费者
- package com.qf.applyconsumerwork.service.impl;
-
- import com.qf.applyconsumerwork.service.ConsumerWorkService;
- import com.qf.pdacommon.util.RabbitMQUtil;
- import com.rabbitmq.client.*;
- import org.springframework.stereotype.Service;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- @Service
- public class ConsumerWorkServiceImpl implements ConsumerWorkService {
- @Override
- public String getMeg() {
-
- //获取连接
- Connection connection = RabbitMQUtil.getConnection();
- //创建管道
- Channel channel = null;
- String result = "";
- try {
- channel = connection.createChannel();
- channel.queueDeclare("queue5",true,false,false,null);
-
- //指定消费者的消费能力:一个消费者一次只能最多消费一个消息
- channel.basicQos(1);
-
- //定义channel1,接收外部类的channel,因为内部类中 外部类定义的channel不适用
- Channel channel1 = channel;
- DefaultConsumer consumer = new DefaultConsumer(channel){
- public void handleDelivery(String consumerTeg, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- String meg = new String(body);
- System.out.println("8093获取到的消息是:" + meg);
- //改为false只是队列不自动删除消息
- //需要我们手动删除
- //判断:消息处理成功了,手动通知队列删除消费者的消息
- channel1.basicAck(envelope.getDeliveryTag(), false);
- }
- };
- //改成手动确认,将true改成false
- //从队列里取出消息后,队列不删除取出的消息
- channel.basicConsume("queue5",false,consumer);
- System.out.println("8093消费者的监听程序开始工作......");
- System.in.read();
- result = "success";
-
- } catch (IOException e) {
- result = "failed";
- e.printStackTrace();
- }finally {
- try {
- channel.close();
- connection.close();
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- }
-
-
- return result;
- }
- }

如果没有问题但是报错就看一下http://localhost:15672/#/exchanges
更改交换机的名字
从如下方面考虑可靠性:
1、生产者要把消息成功的写入交换机 ----- confirm机制
2、交换机要成功的把消息写入队列
3、队列要可靠 ---- 本身具有的持久化机制
4、确保消费者成功消费消息后,才把消息从队列中删除 --- 手动ACK
- @Override
- public String addMqTopic(String meg, String route) {
- String result="";
- Connection connection=RabbitMQUtil.getConnection();
- Channel channel=null;
- try {
- channel=connection.createChannel();
- //开启Confirm机制---确保生产者把消息成功的写入到交换机中
- channel.confirmSelect();
- //定义交换机
- channel.exchangeDeclare("exchange210503",BuiltinExchangeType.TOPIC);
-
- //绑定队列:交换机按照什么路由规则,写入哪个队列
- channel.queueBind("queue5","exchange210503","liaoning.dalian.*");
- channel.queueBind("queue6","exchange210503","heilongjiang.#");
-
- //写入队列
- channel.basicPublish("exchange210503",route,null,meg.getBytes());
-
- //判断消息写入交换机是否成功!
- if(channel.waitForConfirms()){
- result="写入交换机成功!";
- }else {
- result="写入交换机失败!";
- }
- } catch (IOException | InterruptedException e) {
- e.printStackTrace();
- } finally {
- try {
- channel.close();
- connection.close();
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- }
- return result;
- }

- @Override
- public String addMq(String meg) {
- //在此处将消息写入消息队列
- //1、获取RabbitMQ服务的连接
- Connection connection= RabbitMQUtil.getConnection();
- //2、通过连接对象获取管道
- Channel channel=null;
- String reslult="";
- try {
- channel=connection.createChannel();
- //开启消息确认机制
- channel.confirmSelect();
- //批量写入消息
- //生产者把消息通过管道,写入队列中
- //参数1:""使用默认的交换机
- //参数2:队列的名字,也可以作为路由规则,【注意】在一个MQ服务中,队列的名字是唯一的
- //参数3:写入消息时,携带的附加信息,如果没有就传null
- //参数4:写入的消息
- for (int i=1; i<=10; i++){
- meg=meg+i;
- channel.basicPublish("","apply",null,meg.getBytes());
- }
- //批量确认,如果有一个消息写入交换机失败,则之前写过的所有消息都失败,失败后会抛出异常
- channel.waitForConfirmsOrDie();
- System.out.println("写入消息队列成功!");
- reslult="批量将消息写入交换机成功!";
- } catch (IOException | InterruptedException e) {
- System.out.println("写入消息失败,发生有异常:"+e.getMessage());
- reslult="批量写入交换机失败!";
- e.printStackTrace();
- }finally {
- try {
- channel.close();
- connection.close();
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- }
- return reslult;
- }

异步Confirm消息确认
- @Override
- public String addMqFanOut(String meg){
- String result="";
- Connection connection=RabbitMQUtil.getConnection();
- Channel channel=null;
- try {
- channel=connection.createChannel();
- //开启Confirm消息确认机制
- channel.confirmSelect();
- //定义交换机的类型
- channel.exchangeDeclare("exchange2105", BuiltinExchangeType.FANOUT);
- //定义队列
- //参数1:队列名字,不能重复
- //参数2:交换机名字,使用对应没名字的交换机写入到该队列中
- channel.queueBind("queue1","exchange2105","");
- channel.queueBind("queue2","exchange2105","");
-
- //写入到队列中
- channel.basicPublish("exchange2105","",null,meg.getBytes());
- //异步消息确认
- channel.addConfirmListener(new ConfirmListener() {
- @Override
- public void handleAck(long l, boolean b) throws IOException {
- //消息写入成功时,会调用
- System.out.println("写入成功!消息的标识:"+l+",是否为批量?"+b);
- }
-
- @Override
- public void handleNack(long l, boolean b) throws IOException {
- //消息写入失败时回调
- System.out.println("写入失败!消息的标识:"+l+",是否为批量?"+b);
-
- }
- });
-
- result="success";
- System.out.println("发布、订阅方式将消息写入队列成功!");
-
- } catch (IOException e) {
- e.printStackTrace();
- } finally {
- try {
- channel.close();
- connection.close();
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- }
- return null;
- }

产生的原因:没有及时 进行手动ACK
解决思路:
1、在Redis中设置一个值 set mqKey 0 0---没有程序去消费这个消息
2、消费者在到队列中取消息之前,先读取mqKey
如果读取到的是0,则允许读取消息,否则不允许
3、在读取过程中,设置为1 1---此消息正在被某个消费者消费
当一个消费者进入消息队列之后,将其key值进行改变,正常情况之下,在执行完流程,将信息存储进数据库,然后正常出队列之后进行删除,将key改回去,但是有一种情况,在没有执行完就离开队列key没有改回去,这种情况下,谁都进不来,产生了死锁现象
解决的办法就是设置有效时间
- public void getMeg() {
- //到队列中获取消息
- //1、获取连接
- Connection connection= RabbitMQUtil.getConnection();
- //2、创建管道
- Channel channel=null;
- String result="";
- try {
- channel=connection.createChannel();
-
- //定义队列
- //参数1:获取消息的队列的名称
- //参数2:是否给队列中的消息进行持久化 true-进行持久化
- //参数3:当消费者的连接connection对象关闭后(消费者宕机后),是否删除队列中的消息
- //参数4:当队列中没有消费者消费的时候,是否自动删除队列
- //参数5:指定的队列的附加消息
- channel.queueDeclare("queue5",true,false,false,null);
-
- //指定消费者的消费能力:一个消费者一次只能最多消费一个消息
- channel.basicQos(1);
-
- //声明监听:监听会一直运行,只要监听到队列中有消息,就去获取消息
- //通过handleDelivery方法获取消息
- Channel channel1=channel;
- DefaultConsumer consumer=new DefaultConsumer(channel1){
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- //获取Redis的连接
- Jedis jedis= JedisUtil.getJedis();
- if(jedis!=null){
- jedis.select(5);
- //读取锁里的信息
- String redisLock=jedis.get("mqKey");
- if(redisLock==null){
- //设置只当时间,时间到自动将key改回去,赋值
- jedis.setex("mqKey",10,"1");
- //参数body就是从队列中获取到的消息
- //service->dao 调用其他微服务处理消息
- String meg=new String(body);
- System.out.println("1号消费者获取到消息:"+meg);
- //判断,消息处理成功了,手动通知队列删除消费的消息
- channel1.basicAck(envelope.getDeliveryTag(),false);
- //消费完成之后,设置为0
- jedis.set("mqKey","0");
- }else{
- int intRedisLock=Integer.parseInt(redisLock);
- if(intRedisLock==0){
- //赋值
- jedis.setex("mqKey",10,"1");
- //参数body就是从队列中获取到的消息
- //service->dao 调用其他微服务处理消息
- String meg=new String(body);
- System.out.println("1号消费者获取到消息:"+meg);
- //判断,消息处理成功了,手动通知队列删除消费的消息
- channel1.basicAck(envelope.getDeliveryTag(),false);
- //消费完成之后,设置为0
- jedis.set("mqKey","0");
- }
- }
- }
- }
- };

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。