赞
踩
一个队列只有一个消费者
多个消费者监听同一个队列
一个交换机绑定多个消息队列,每个消息队列有一个消费者监听
一个交换机绑定多个消息队列,每个消息队列都由自己唯一的key,每个消息队列有一个消费者监听
创建RabbitMQ队列结构:在host1中共创建6个队列,两个交换机,其中交换机ex1连接queue3 和queue4,交换机ex2连接queue5和queue6
创建MQ连接工具类ConnectionUtil
:
public class ConnectionUtil { public static Connection getConnection() throws IOException, TimeoutException, IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2.在工厂对象中设置MQ的连接信息(ip,port,virtualhost,username,password) factory.setHost("192.168.157.130"); factory.setPort(5672); factory.setVirtualHost("host1"); factory.setUsername("admin"); factory.setPassword("admin"); //3.通过工厂对象获取与MQ的链接 Connection connection = factory.newConnection(); return connection; } //测试是否连接成功 public static void main(String[] args) throws IOException, TimeoutException { System.out.println(getConnection()); } }
public class SendMsg { public static void main(String[] args) throws Exception{ String msg = "Hello HuangDaoJun!"; //获取连接 Connection connection = ConnectionUtil.getConnection(); //创建channel Channel channel = connection.createChannel(); //定义队列(使用Java代码在MQ中新建一个队列) //参数1:定义的队列名称 //参数2:队列中的数据是否持久化(如果选择了持久化) //参数3: 是否排外(当前队列是否为当前连接私有) //参数4:自动删除(当此队列的连接数为0时,此队列会销毁(无论队列中是否还有数据)) //参数5:设置当前队列的参数 //channel.queueDeclare("queue7",false,false,false,null); //参数1:交换机名称,如果直接发送信息到队列,则交换机名称为""(简单模式和工作模式不需要交换机) //参数2:目标队列名称 //参数3:设置当前这条消息的属性(设置过期时间 10) //参数4:消息的内容 channel.basicPublish("","queue1",null,msg.getBytes()); System.out.println("发送:" + msg); channel.close(); connection.close(); } }
public class ReceiveMsg { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body就是从队列中获取的数据 String msg = new String(body); System.out.println("接收:"+msg); } }; channel.basicConsume("queue1",true,consumer); } }
public class SendMsg { public static void main(String[] args) throws Exception{ System.out.println("请输入消息:"); Scanner scanner = new Scanner(System.in); String msg = null; while(!"quit".equals(msg = scanner.nextLine())){ Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //生产消息,将消息发送到队列 channel.basicPublish("","queue2",null,msg.getBytes()); System.out.println("发送:" + msg); channel.close(); connection.close(); } } }
public class ReceiveMsg { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body就是从队列中获取的数据 String msg = new String(body); System.out.println("Consumer1接收:"+msg); } }; channel.basicConsume("queue2",true,consumer); } }
public class ReceiveMsg { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body就是从队列中获取的数据 String msg = new String(body); System.out.println("Consumer2接收:"+msg); } }; //消费消息 channel.basicConsume("queue2",true,consumer); } }
最终结果:消息生产者生产消息,消息消费者消费消息,但是具体是哪个消息消费者消费是随机挑选的,也就是说消息生产者生产的消息只能有一个消费者消费,具体是哪个是随机的,不会出现两个消费者消费同一个消息的情况
public class SendMsg { public static void main(String[] args) throws Exception{ System.out.println("请输入消息:"); Scanner scanner = new Scanner(System.in); String msg = null; while(!"quit".equals(msg = scanner.nextLine())){ Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //消息发送到交换机,而不是队列,因此需要设置交换机名称,将队列名称置为空 channel.basicPublish("ex1","",null,msg.getBytes()); System.out.println("发送:" + msg); channel.close(); connection.close(); } } }
public class ReceiveMsg { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body就是从队列中获取的数据 String msg = new String(body); System.out.println("Consumer1接收:"+msg); } }; //消息消费者1只监听队列queue3 channel.basicConsume("queue3",true,consumer); } }
public class ReceiveMsg { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body就是从队列中获取的数据 String msg = new String(body); System.out.println("Consumer2接收:"+msg); } }; //消息消费者2只监听队列queue4 channel.basicConsume("queue4",true,consumer); } }
结果:消息生产者生产的消息,各个消息消费者都能够接收到。
public class SendMsg { public static void main(String[] args) throws Exception{ System.out.println("请输入消息:"); Scanner scanner = new Scanner(System.in); String msg = null; while(!"quit".equals(msg = scanner.nextLine())){ Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //指定交换机ex2,后面的a,b代表key if(msg.startsWith("a")){ channel.basicPublish("ex2","a",null,msg.getBytes()); }else if(msg.startsWith("b")){ channel.basicPublish("ex2","b",null,msg.getBytes()); } System.out.println("发送:" + msg); channel.close(); connection.close(); } } }
public class ReceiveMsg { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body就是从队列中获取的数据 String msg = new String(body); System.out.println("Consumer1接收:"+msg); } }; channel.basicConsume("queue5",true,consumer); } }
public class ReceiveMsg { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body就是从队列中获取的数据 String msg = new String(body); System.out.println("Consumer2接收:"+msg); } }; channel.basicConsume("queue6",true,consumer); } }
结果:消息消费者1监听queue5,并且接收key=a的消息,消息消费者2监听queue6,并且接收key=b的消息。
配置文件:
server:
port: 9001
spring:
application:
name: producer
rabbitmq:
host: 192.168.157.130
port: 5672
virtual-host: host1
username: admin
password: admin
@Service public class TestService { @Resource private AmqpTemplate amqpTemplate; public void sendMsg(String msg){ //1. 发送消息到队列 amqpTemplate.convertAndSend("queue1",msg); //2. 发送消息到交换机(订阅交换机) amqpTemplate.convertAndSend("ex1","",msg); //3. 发送消息到交换机(路由交换机) amqpTemplate.convertAndSend("ex2","a",msg); } }
@RestController
public class TestController {
@Autowired
private TestService testService;
@RequestMapping("test")
public String test(){
return "success";
}
}
@Service
//@RabbitListener(queues = {"queue1","queue2"})
@RabbitListener(queues = "queue1")
public class ReceiveMsgService {
@RabbitHandler
public void receiveMsg(String msg){
System.out.println("接收MSG:"+msg);
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。