赞
踩
导入jar依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.14.2</version>
</dependency>
代码结构
工具类MQUtil.java
package com.example; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * 参考: https://blog.csdn.net/zuzhiang/article/details/117618105 */ public class MQUtil { private static Connection connection; public static Connection getConnection() throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setUsername("ems"); connectionFactory.setPassword("ems"); connectionFactory.setVirtualHost("/ems"); connection = connectionFactory.newConnection(); return connection; } public static void closeChannelAndConnection(Channel channel, Connection connection) throws Exception { channel.close(); connection.close(); } }
生产者 MqProduct .java
package com.example.basic; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * 基础模式 */ public class MqProduct { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/ems"); connectionFactory.setUsername("ems"); connectionFactory.setPassword("ems"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("hello_queue", false, false, false, null); //第二个参数为队列名(idea编辑器提示有错误) channel.basicPublish("", "hello_queue", null, "hello RabbitMQ!".getBytes()); channel.close(); connection.close(); } }
消费者 MqConsumer .java
package com.example.basic; import com.rabbitmq.client.*; import java.io.IOException; public class MqConsumer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setUsername("ems"); connectionFactory.setPassword("ems"); connectionFactory.setVirtualHost("/ems"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("hello_queue", false, false, false, null); channel.basicConsume("hello_queue", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("收到消息: " + new String(body)); } }); } }
生产者 ProductMQ .java
package com.example.workQueue; import com.example.MQUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; /** * work 模式下, 会根据消费能力,能者多劳.效率高的消费者consumer2消费较多的消息, 消息较低的consumer1消费较少的消息 */ public class ProductMQ { public static void main(String[] args) throws Exception { Connection connection = MQUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("work_queue", true, false, false, null); for (int i = 0; i < 10; i++) { channel.basicPublish("", "work_queue", null, (i + " hello work queue!!!").getBytes()); } MQUtil.closeChannelAndConnection(channel, connection); } }
消费者1 ConsumerMQ1.java
package com.example.workQueue; import com.example.MQUtil; import com.rabbitmq.client.*; import java.io.IOException; public class ConsumerMQ1 { public static void main(String[] args) throws Exception { Connection connection = MQUtil.getConnection(); final Channel channel = connection.createChannel(); // 每次只能消费1个消息 channel.basicQos(1); channel.queueDeclare("work_queue", true, false, false, null); //关闭自动确认机制 channel.basicConsume("work_queue", false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("MQ-1 接收到了消息, 消息是: " + new String(body)); System.out.println(envelope.getDeliveryTag()); System.out.println("**************"); // 手动确认 // 参数:确认队列中哪个具体消息、是否开启多个消息同时确认 channel.basicAck(envelope.getDeliveryTag(), false); } }); } }
消费者2 ConsumerMQ2.java
package com.example.workQueue; import com.example.MQUtil; import com.rabbitmq.client.*; import java.io.IOException; public class ConsumerMQ2 { public static void main(String[] args) throws Exception { Connection connection = MQUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("work_queue", true, false, false, null); channel.basicConsume("work_queue", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("MQ-2 接收到了消息, 消息是: " + new String(body)); } }); } }
生产者 ProductMQ .java
package com.example.fanout; import com.example.MQUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; /** * fanout 模式下, consumer1和consumer2都会收到消息 */ public class ProductMQ { public static void main(String[] args) throws Exception { Connection connection = MQUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("logs", "fanout"); channel.basicPublish("logs", "", null, "fanout type message!".getBytes()); MQUtil.closeChannelAndConnection(channel, connection); } }
消费者1 ConsumerMQ.java
package com.example.fanout; import com.example.MQUtil; import com.rabbitmq.client.*; import java.io.IOException; public class ConsumerMQ { public static void main(String[] args) throws Exception { Connection connection = MQUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("logs", "fanout"); //临时队列 String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue, "logs", ""); channel.basicConsume(queue, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("收到消息: " + new String(body)); } }); } }
消费者2 ConsumerMQ2.java
package com.example.fanout; import com.example.MQUtil; import com.rabbitmq.client.*; import java.io.IOException; public class ConsumerMQ2 { public static void main(String[] args) throws Exception { Connection connection = MQUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("logs", "fanout"); //临时队列 String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue, "logs", ""); channel.basicConsume(queue, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("收到消息: " + new String(body)); } }); } }
生产者 ProductMQ.java
package com.example.direct; import com.example.MQUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; /** * direct模式 基于routing_key发送消息 */ public class ProductMQ { public static void main(String[] args) throws Exception { Connection connection = MQUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("logs_direct", "direct"); //依次注释掉下面几个进行测试 String routing_key = "info"; // String routing_key = "error"; // String routing_key = "warning"; // String routing_key = "debug"; channel.basicPublish("logs_direct", routing_key, null, ("direct模式基于routing_key发送消息,routing_key是: [" + routing_key + "]").getBytes()); MQUtil.closeChannelAndConnection(channel, connection); } }
消费者1 ConsumerMQ1.java
package com.example.direct; import com.example.MQUtil; import com.rabbitmq.client.*; import java.io.IOException; public class ConsumerMQ1 { public static void main(String[] args) throws Exception { Connection connection = MQUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("logs_direct", "direct"); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue, "logs_direct", "error"); channel.basicConsume(queue, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1: " + new String(body)); } }); } }
消费者2 ConsumerMQ2.java
package com.example.direct; import com.example.MQUtil; import com.rabbitmq.client.*; import java.io.IOException; public class ConsumerMQ2 { public static void main(String[] args) throws Exception { Connection connection = MQUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("logs_direct", "direct"); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue, "logs_direct", "info"); channel.queueBind(queue, "logs_direct", "warning"); channel.queueBind(queue, "logs_direct", "debug"); channel.basicConsume(queue, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2: " + new String(body)); } }); } }
生产者 ProductMQ.java
package com.example.topic; import com.example.MQUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; /** * topic模式下动态routing_key */ public class ProductMQ { public static void main(String[] args) throws Exception { Connection connection = MQUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("topics", "topic"); String routing_key = "user.save"; // String routing_key = "user.save.item"; // String routing_key = "user.save.item.good_id"; channel.basicPublish("topics", routing_key, null, ("topic模式下动态模型: [" + routing_key + "]").getBytes()); MQUtil.closeChannelAndConnection(channel, connection); } }
消费者1 ConsumerMQ1.java
package com.example.topic; import com.example.MQUtil; import com.rabbitmq.client.*; import java.io.IOException; public class ConsumerMQ1 { public static void main(String[] args) throws Exception { Connection connection = MQUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("topics", "topic"); String queue = channel.queueDeclare().getQueue(); // #号可以匹配任意多个 String routing_key = "user.#"; channel.queueBind(queue, "topics", routing_key); channel.basicConsume(queue, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1: 当前为'#'符号 " + new String(body)); } }); } }
消费者2 ConsumerMQ2.java
package com.example.topic; import com.example.MQUtil; import com.rabbitmq.client.*; import java.io.IOException; public class ConsumerMQ2 { public static void main(String[] args) throws Exception { Connection connection = MQUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("topics", "topic"); String queue = channel.queueDeclare().getQueue(); //*号只能匹配一个 String routing_key = "user.*"; channel.queueBind(queue, "topics", routing_key); channel.basicConsume(queue, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2: 当前为'*'符号 " + new String(body)); } }); } }
生产者
import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @SpringBootTest(classes = MainApplication.class) @RunWith(SpringRunner.class) public class TestRabbitMQ { // 注入RabbitTemplate @Autowired private RabbitTemplate rabbitTemplate; // hello world @Test public void test() { //参数2表示: 队列名称(idea编辑器提示有误, 总是提示第二个参数是routingKey,真实是队列名称) rabbitTemplate.convertAndSend("", "hello","我是发送消息端"); System.out.println("发送完成..."); } }
消费者
import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component // 指定监听hello队列 // 默认是持久化、非独占、不自动删除队列的 @RabbitListener(queuesToDeclare = @Queue(value = "hello", durable = "false", exclusive = "false", autoDelete = "true")) public class Consumer { @RabbitHandler public void receive(String message) { System.out.println("message: " + message); } }
操作说明: 重启服务会自动加载消费者程序, 然后再启动生产者的测试代码,控制台就可以看到消息了
生产者
import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.core.env.Environment; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class HelloController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/test") public void testWork() { for (int i = 0; i < 10; i++) { rabbitTemplate.convertAndSend("work", "work模型" + i); } } }
消费者
import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class WorkConsumer { // 第1个消费者 @RabbitListener(queuesToDeclare = @Queue("work")) public void receive1(String message) { System.out.println("message1: " + message); } // 第2个消费者 @RabbitListener(queuesToDeclare = @Queue("work")) public void receive2(String message) { System.out.println("message2: " + message); } }
使用说明: 重启服务后,自动加载@Component注解下面的两个消费者代码; 然后浏览器访问 http://127.0.0.1:8080/test 得到控制台打印如下:
生产者
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。