赞
踩
只包含一个生产者以及一个消费者,生产者Producer将消息发送到队列中,消费者Consumer从该队列中接受消息,简单来说就是1v1的模式。
上图中,P是我们的生产者,C是我们的消费者。
- /**
- * @description 获取RabbitMQ的连接工具类
- */
- public class MQConnecitonUtils {
-
- private static ConnectionFactory connectionFactory;
-
- static {
- connectionFactory = new ConnectionFactory();
- //我们把重量级资源通过单例模式加载
- connectionFactory.setHost("127.0.0.1");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("guest");
- connectionFactory.setPassword("guest");
- //设置虚拟主机
- connectionFactory.setVirtualHost("/test");
- }
-
-
- public static Connection getConnection() {
- try {
- return connectionFactory.newConnection();
- } catch (Exception e) {
- e.printStackTrace();
- }
- return null;
- }
-
- //定义关闭通道和关闭连接工具方法
- public static void closeConnectionAndChanel(Channel channel, Connection conn) {
- try {
- if (channel != null) {
- channel.close();
- }
- if (conn != null) {
- conn.close();
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- /**
- * TODO 描述:消费生产者
- */
-
- public class ProducerSIMPLE {
-
- private static final String SIMPLE_QUEUE_NAME = "MQ_SIMPLE_QUEUE";
-
- private static final String SIMPLE_QUEUE_MESSAGE = "简单消息123123!" + new Date();
-
- public static void main(String[] args) throws IOException {
- //获取MQ连接
- Connection connection = MQConnecitonUtils.getConnection();
- //创建通道 从连接中获取Channel通道对象
- Channel channel = connection.createChannel();
- //创建Queue队列
- //参数1:队列名称,参数2:是否持久化,参数3:是否独占队列 参数4:是否自动删除 参数5:其他属性
- channel.queueDeclare(SIMPLE_QUEUE_NAME, false, false, false, null);
- //发送消息到队列MQ_SIMPLE_QUEUE
- //basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
- channel.basicPublish("", SIMPLE_QUEUE_NAME, null, SIMPLE_QUEUE_MESSAGE.getBytes("UTF-8"));
-
- System.out.println("消息已发送!");
-
- MQConnecitonUtils.closeConnectionAndChanel(channel, connection);
-
- }
- }
再查看RabbitMQ控制台:
- /**
- * TODO 描述:1v1
- */
-
- public class ConsumerSIMPLE {
-
- private static final String SIMPLE_QUEUE_NAME = "MQ_SIMPLE_QUEUE";
-
- public static void main(String[] args) {
- //获取MQ连接对象
- Connection connection = MQConnecitonUtils.getConnection();
- Channel channel;
- try {
- //创建消息通道对象
- channel = connection.createChannel();
- //声明queue队列
- channel.queueDeclare(SIMPLE_QUEUE_NAME, false, false, false, null);
- //创建消费者对象7
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- //消息消费者获取消息
- String message = new String(body, "UTF-8");
- System.out.println("receive message: " + message);
- }
- };
- //监听消息队列
- channel.basicConsume(SIMPLE_QUEUE_NAME, true, consumer);
-
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
多个消费者绑定到同一个队列上,一条消息只能被一个消费者进行消费,工作队列有轮询分发和公平分发两种模式。有点类似于抢红包
- /**
- * 说明:
- * 消费者1与消费者2处理的消息是均分的,而且消息是轮询分发的(轮询分发 round-robin)
- */
- public class ProducerWORK {
-
- private static final String WORK_QUEUE_NAME = "MQ_WORK_QUEUE";
- private static final String WORK_QUEUE_MESSAGE = "工作队列消息!! ------> " + new Date();
-
- public static void main(String[] args) throws IOException, InterruptedException {
- //获取MQ连接
- Connection connection = MQConnecitonUtils.getConnection();
- //从连接中获取Channel通道对象
- Channel channel = null;
-
- channel = connection.createChannel();
- //创建Queue队列
- channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);
- //发送10条消息到工作队列
- for (int i = 1; i <= 10; i++) {
- //模拟延迟
- Thread.sleep(2000);
- StringBuilder msg = new StringBuilder(WORK_QUEUE_MESSAGE).append("---").append(i);
- //发送消息
- channel.basicPublish("", WORK_QUEUE_NAME, null, msg.toString().getBytes());
- System.out.println(msg);
- }
- System.out.println("=============工作队列消息发送完毕=============");
- MQConnecitonUtils.closeConnectionAndChanel(channel, connection);
- }
- }
- /**
- * TODO 描述:消费者A
- *
- * @author ShuZL
- * @date 2023/9/15 0015 13:44
- */
-
- public class Consumer_A {
-
- private static final String WORK_QUEUE_NAME = "MQ_WORK_QUEUE";
-
- public static void main(String[] args) {
- //获取MQ连接对象
- Connection connection = MQConnecitonUtils.getConnection();
- Channel channel = null;
- try {
- //创建消息通道对象
- channel = connection.createChannel();
- //声明queue队列
- channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);
- //创建消费者对象
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- //消息消费者获取消息
- String message = new String(body, "UTF-8");
- System.out.println("【CustomConsumer-A】receive message: " + message);
- try {
- //模拟延迟
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- };
-
- //监听消息队列
- channel.basicConsume(WORK_QUEUE_NAME, true, consumer);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- public class Consumer_B {
-
- private static final String WORK_QUEUE_NAME = "MQ_WORK_QUEUE";
-
- public static void main(String[] args) {
- //获取MQ连接对象
- Connection connection = MQConnecitonUtils.getConnection();
- Channel channel = null;
- try {
- //创建消息通道对象
- channel = connection.createChannel();
- //声明queue队列
- channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);
- //创建消费者对象
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- //消息消费者获取消息
- String message = new String(body, "UTF-8");
- System.out.println("【CustomConsumer-B】receive message: " + message);
- try {
- //模拟延迟
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- };
- //监听消息队列
- channel.basicConsume(WORK_QUEUE_NAME, true, consumer);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
先运行消费者A和消费者B,再运行消息生产者
查看控制台已经被消费完了
由以上运行结果可知,消费者A和消费者B处理消费是均匀的,也就是消费条数是一样的,而且消息是轮询分发的,也就是说同一条消息只能被一个消费者消费。由此也引出了一个问题,上面的消费者A和消费者B处理消息的效率不同,但是最后两者接受的到的消息还是一样多,如果我们需要让工作效率高的消费者消费更多的消息,那可以使用公平分发,也就是多劳多得。
好处:防止消息堆积(多consumer(消费者)+能者多劳)
- public class ProducerWORK {
-
- private static final String WORK_QUEUE_NAME = "MQ_WORK_QUEUE";
- private static final String WORK_QUEUE_MESSAGE = "工作队列消息!! ------> " + new Date();
-
- public static void main(String[] args) throws IOException, InterruptedException {
- //获取MQ连接
- Connection connection = MQConnecitonUtils.getConnection();
- //从连接中获取Channel通道对象
- Channel channel = null;
-
- channel = connection.createChannel();
- //创建Queue队列
- channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);
- //发送10条消息到工作队列
- for (int i = 1; i <= 20; i++) {
- //模拟延迟
- // Thread.sleep(2000);
- StringBuilder msg = new StringBuilder(WORK_QUEUE_MESSAGE).append("---").append(i);
- //发送消息
- channel.basicPublish("", WORK_QUEUE_NAME, null, msg.toString().getBytes());
- System.out.println(msg);
- }
- System.out.println("=============工作队列消息发送完毕=============");
- MQConnecitonUtils.closeConnectionAndChanel(channel, connection);
- }
- }
使用basicQos方法和prefetchCount = 1设置,使用公平分发必须关闭自动应答(autoAck:true自动返回结果,false手动返回),以上两个设置就是告诉RabbitMQ必须要让消费者处理并手动确认了前一个消息,才会发送新的消息。
- public class Consumer_A {
-
- private static final String WORK_QUEUE_NAME = "MQ_WORK_QUEUE";
-
- public static void main(String[] args) throws IOException {
- //获取MQ连接对象
- Connection connection = MQConnecitonUtils.getConnection();
- //创建消息通道对象
- final Channel channel = connection.createChannel();
- //声明queue队列
- channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);
-
- channel.basicQos(1);
-
- //创建消费者对象
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- //消息消费者获取消息
- String message = new String(body, "UTF-8");
- System.out.println("【CustomConsumer-A】receive message: " + message);
- try {
- //模拟延迟
- Thread.sleep(1000);
- //消费完一条消息需要自动发送确认消息给MQ
- channel.basicAck(envelope.getDeliveryTag(), false);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- };
- //使用公平分发必须关闭自动应答(autoAck:true自动返回结果,false手动返回)
- boolean autoAck = false;
- //监听消息队列
- channel.basicConsume(WORK_QUEUE_NAME, autoAck, consumer);
- }
- }
- public class Consumer_B {
-
- private static final String WORK_QUEUE_NAME = "MQ_WORK_QUEUE";
-
- public static void main(String[] args) throws IOException {
- //获取MQ连接对象
- Connection connection = MQConnecitonUtils.getConnection();
- //创建消息通道对象
- final Channel channel = connection.createChannel();
- //声明queue队列
- channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);
-
- channel.basicQos(1);
-
- //创建消费者对象
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- //消息消费者获取消息
- String message = new String(body, "UTF-8");
- System.out.println("【CustomConsumer-B】receive message: " + message);
- try {
- //模拟延迟
- Thread.sleep(3000);
- //消费完一条消息需要自动发送确认消息给MQ
- channel.basicAck(envelope.getDeliveryTag(), false);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- };
- //使用公平分发必须关闭自动应答(autoAck:true自动返回结果,false手动返回)
- boolean autoAck = false;
- //监听消息队列
- channel.basicConsume(WORK_QUEUE_NAME, autoAck, consumer);
- }
- }
生产者运行结果如下:
消费者A运行结果如下:
消费者B运行结果如下:
由以上运行结果可见,消费者A的效率相对较高,所以消费者A消费消息比消费者B多一些,这样就可以 充分发挥处理消息的能力。也能一定的避免消息堆积问题
生产者将消息发送到交换机,然后交换机绑定到多个队列,监听该队列的所以消费者消费消息。
- /**
- * 说明:可实现一条消息被多个消费者消费
- * a. 一个生产者,多个消费者;
- * b. 每一个消费者都有自己的消息队列;
- * c. 生产者没有把消息发送到队列,而是发送到交换器exchange上;
- * d. 每个队列都需要绑定到交换机上;
- * e. 生产者生产的消息先经过交换机然后到达队列,一个消息可以被多个消费者消费;
- */
- public class Product {
-
- //定义交换机名称
- private static final String PUBLISH_SUBSCRIBE_EXCHANGE_NAME = "publish_subscribe_exchange_fanout";
- //类型:分发
- private static final String PUBLISH_SUBSCRIBE_EXCHANGE_TYPE = "fanout";
-
- public static void main(String[] args) throws IOException {
- //获取MQ连接
- Connection connection = MQConnecitonUtils.getConnection();
- //从连接中获取Channel通道对象
- Channel channel = connection.createChannel();
- //创建交换机对象publish_subscribe_exchange_fanout
- channel.exchangeDeclare(PUBLISH_SUBSCRIBE_EXCHANGE_NAME, PUBLISH_SUBSCRIBE_EXCHANGE_TYPE);
- //发送消息到交换机exchange上
- String msg = "hello world!!!";
- channel.basicPublish(PUBLISH_SUBSCRIBE_EXCHANGE_NAME, "", null, msg.getBytes());
- System.out.println("=============工作队列消息发送完毕=============");
- MQConnecitonUtils.closeConnectionAndChanel(channel, connection);
- }
- }
- public class ConsumerA {
- private static final String PUBLIC_SUBSCRIBE_QUEUE_NAME = "public_subscribe_queue_name01";
-
- private static final String PUBLISH_SUBSCRIBE_EXCHANGE_NAME = "publish_subscribe_exchange_fanout";
-
- public static void main(String[] args) {
- //获取MQ连接对象
- Connection connection = MQConnecitonUtils.getConnection();
- try {
- //创建消息通道对象
- final Channel channel = connection.createChannel();
- //创建队列
- channel.queueDeclare(PUBLIC_SUBSCRIBE_QUEUE_NAME, false, false, false, null);
- //将队列绑定到交换机上
- channel.queueBind(PUBLIC_SUBSCRIBE_QUEUE_NAME, PUBLISH_SUBSCRIBE_EXCHANGE_NAME, "");
-
- //创建消费者对象
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- //消息消费者获取消息
- String message = new String(body, "UTF-8");
- System.out.println("【CustomConsumer-A】receive message: " + message);
- //消费完一条消息需要自动发送确认消息给MQ
- channel.basicAck(envelope.getDeliveryTag(), false);
- }
- };
- //监听消息队列
- channel.basicConsume(PUBLIC_SUBSCRIBE_QUEUE_NAME, false, consumer);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- public class ConsumerB {
- private static final String PUBLIC_SUBSCRIBE_QUEUE_NAME = "public_subscribe_queue_name02";
-
- private static final String PUBLISH_SUBSCRIBE_EXCHANGE_NAME = "publish_subscribe_exchange_fanout";
-
- public static void main(String[] args) {
- //获取MQ连接对象
- Connection connection = MQConnecitonUtils.getConnection();
- try {
- //创建消息通道对象
- final Channel channel = connection.createChannel();
- //创建队列
- channel.queueDeclare(PUBLIC_SUBSCRIBE_QUEUE_NAME, false, false, false, null);
- //将队列绑定到交换机上
- channel.queueBind(PUBLIC_SUBSCRIBE_QUEUE_NAME, PUBLISH_SUBSCRIBE_EXCHANGE_NAME, "");
-
- //创建消费者对象
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- //消息消费者获取消息
- String message = new String(body, "UTF-8");
- System.out.println("【CustomConsumer-B】receive message: " + message);
- //消费完一条消息需要自动发送确认消息给MQ
- channel.basicAck(envelope.getDeliveryTag(), false);
- }
- };
- //监听消息队列
- channel.basicConsume(PUBLIC_SUBSCRIBE_QUEUE_NAME, false, consumer);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
由此可见,一条消息被两个消费者同时消费。
生产者将消息发送到direct交换机,它会把消息路由到那些binding key和routing key完全匹配的Queue中,这样就能实现消费者有选择性的去消费消息了。
- public class ProducerRouting {
- //自定义交换机名称
- private static final String EXCHANGE_NAME = "publish_subscribe_exchange_direct";
- //交换机类型:direct
- private static final String EXCHANGE_TYPE = "direct";
- //指定一个routing key
- private static final String EXCHANGE_ROUTE_KEY = "info";
-
- public static void main(String[] args) throws IOException {
- //获取MQ连接
- Connection connection = MQConnecitonUtils.getConnection();
- //从连接中获取Channel通道对象
- Channel channel = connection.createChannel();
- //创建交换机对象
- channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
- //发送消息到交换机exchange上
- String msg = "路由模式消息111 !!!!";
- //指定routing key为info
- channel.basicPublish(EXCHANGE_NAME, EXCHANGE_ROUTE_KEY, null, msg.getBytes());
- System.out.println("=============路由模式消息发送完毕=============");
- MQConnecitonUtils.closeConnectionAndChanel(channel, connection);
- }
- }
- public class Consumer_A {
- //定义队列名称
- private static final String QUEUE_NAME = "routing_direct_queue_name_1";
- //绑定生产者给定的交换机名称
- private static final String EXCHANGE_NAME = "publish_subscribe_exchange_direct";
- //binding key
- private static final String EXCHANGE_ROUTE_KEY = "error";
-
- public static void main(String[] args) {
- //获取MQ连接对象
- Connection connection = MQConnecitonUtils.getConnection();
- try {
- //创建消息通道对象
- final Channel channel = connection.createChannel();
- //创建队列
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- //将队列绑定到交换机上,并且指定routing_key
- channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTE_KEY);
- //创建消费者对象
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- //消息消费者获取消息
- String message = new String(body, "UTF-8");
- System.out.println("【CustomConsumer-A】receive message: " + message);
- //消费完一条消息需要自动发送确认消息给MQ
- channel.basicAck(envelope.getDeliveryTag(), false);
- }
- };
- //监听消息队列
- channel.basicConsume(QUEUE_NAME, false, consumer);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- public class Consumer_B {
- //定义队列名称
- private static final String QUEUE_NAME = "routing_direct_queue_name_2";
- //绑定生产者给定的交换机名称
- private static final String EXCHANGE_NAME = "publish_subscribe_exchange_direct";
- //binding key
- //binding key
- private static final String EXCHANGE_ROUTE_KEY01 = "error";
- private static final String EXCHANGE_ROUTE_KEY02 = "info";
- private static final String EXCHANGE_ROUTE_KEY03 = "warning";
-
- public static void main(String[] args) {
- //获取MQ连接对象
- Connection connection = MQConnecitonUtils.getConnection();
- try {
- //创建消息通道对象
- final Channel channel = connection.createChannel();
- //创建队列
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- //将队列绑定到交换机上,并且指定routing_key
- channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTE_KEY01);
- channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTE_KEY02);
- channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTE_KEY03);
- //创建消费者对象
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- //消息消费者获取消息
- String message = new String(body, "UTF-8");
- System.out.println("【CustomConsumer-B】receive message: " + message);
- //消费完一条消息需要自动发送确认消息给MQ
- channel.basicAck(envelope.getDeliveryTag(), false);
- }
- };
- //监听消息队列
- channel.basicConsume(QUEUE_NAME, false, consumer);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
因为生产者发布消息的时候指定了routing key为info, 消费者A绑定队列的时候指定的binding key 为error,显然消费者A接收不到此消息,因为消费者B绑定队列的时候指定了binding key为error、info、warning,所以消费者B能够成功接收该消息进行消费。
类似于正则表达式匹配的一种模式。主要使用#、*进行匹配。
通配符规则:
' # ':匹配一个或多个词
' * ':匹配不多不少恰好1个词
例如:
`audit.#`:能够匹配`audit.irs.corporate` 或者 `audit.irs`
`audit.*`:只能匹配`audit.irs`
- /**
- * 说明:
- * #: 代表一个或者多个
- * *: 代表一个
- * <p>
- * 举例:
- * 比如发送消息的时候指定了routing key为news.insert,
- * 如果消费者指定binding key 为news.* 或者news.#都能接收到该消息;
- */
- public class Producer {
- private static final String EXCHANGE_NAME = "exchange_topic";
- //交换机类型:topic 类似正则匹配模式
- private static final String EXCHANGE_TYPE = "topic";
- //指定routing key
- private static final String EXCHANGE_ROUTE_KEY = "news.insert";
-
- public static void main(String[] args) throws IOException {
- //获取MQ连接
- Connection connection = MQConnecitonUtils.getConnection();
- //从连接中获取Channel通道对象
- Channel channel = connection.createChannel();
- //创建交换机对象
- channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
- //发送消息到交换机exchange上
- String msg = "主题(Topic)!!!";
- channel.basicPublish(EXCHANGE_NAME, EXCHANGE_ROUTE_KEY, null, msg.getBytes());
- System.out.println("=============主题模式消息发送完毕=============");
- MQConnecitonUtils.closeConnectionAndChanel(channel, connection);
-
- }
- }
- public class Consumer_A {
- //定义队列名称
- private static final String QUEUE_NAME = "topic_queue_name1";
- //交换机名称
- private static final String EXCHANGE_NAME = "exchange_topic";
- //binding key
- private static final String EXCHANGE_ROUTE_KEY = "news.*";
-
- public static void main(String[] args) {
- //获取MQ连接对象
- Connection connection = MQConnecitonUtils.getConnection();
- try {
- //创建消息通道对象
- final Channel channel = connection.createChannel();
- //创建队列
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- //将队列绑定到交换机上,并且指定routing_key
- channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTE_KEY);
- //创建消费者对象
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- //消息消费者获取消息
- String message = new String(body, "UTF-8");
- System.out.println("【CustomConsumer01】receive message: " + message);
- //消费完一条消息需要自动发送确认消息给MQ
- channel.basicAck(envelope.getDeliveryTag(), false);
- }
- };
- //监听消息队列
- channel.basicConsume(QUEUE_NAME, false, consumer);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- public class Consumer_B {
- private static final String QUEUE_NAME = "topic_queue_name2";
- private static final String EXCHANGE_NAME = "exchange_topic";
- //binding key
- private static final String EXCHANGE_ROUTE_KEY = "news.#";
-
- public static void main(String[] args) {
- //获取MQ连接对象
- Connection connection = MQConnecitonUtils.getConnection();
- try {
- //创建消息通道对象
- final Channel channel = connection.createChannel();
- //创建队列
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- //将队列绑定到交换机上,并且指定routing_key
- channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTE_KEY);
-
- //创建消费者对象
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- //消息消费者获取消息
- String message = new String(body, "UTF-8");
- System.out.println("【CustomConsumer02】receive message: " + message);
-
- //消费完一条消息需要自动发送确认消息给MQ
- channel.basicAck(envelope.getDeliveryTag(), false);
- }
- };
- //监听消息队列
- channel.basicConsume(QUEUE_NAME, false, consumer);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
生产者发送消息绑定的routing key 为news.insert;消费者A监听的队列和交换器binding key 为news.insert;消费者B监听的队列和交换器bindingkey为news.#,很显然,两个消费者都将接收到该消息。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。