赞
踩
生产者发送消息到交换机并指定一个路由key,消费者队列绑定到交换机时要制定路由key(key匹配就能接受消息,key不匹配就不能接受消息)
例如:我们可以把路由key设置为insert ,那么消费者队列key指定包含insert才可以接收消息,消费者队列key定义为update或者delete就不能接收消息。很好的控制了更新,插入和删除的操作。
采用交换机direct模式
生产者代码如下:
- package com.rabbitmqdemo.Producer;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmqdemo.Utils.MQConnectionUtils;
- import java.util.concurrent.TimeoutException;
-
- import java.io.IOException;
-
- public class DirectProducter {
- private static final String EXCHANGE_NAME = "direct_exchange";
-
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1.创建新的连接
- Connection connection = MQConnectionUtils.newConnection();
- // 2.创建通道
- Channel channel = connection.createChannel();
- // 3.绑定的交换机 参数1交互机名称 参数2 exchange类型
- channel.exchangeDeclare(EXCHANGE_NAME, "direct");
- String routingKey = "info";
- String msg = "direct_exchange_msg" + routingKey;
- // 4.发送消息
- channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
- System.out.println("生产者发送msg:" + msg);
- // // 5.关闭通道、连接
- // channel.close();
- // connection.close();
- // 注意:如果消费没有绑定交换机和队列,则消息会丢失
-
- }
- }
短信消费者代码:
- package com.rabbitmqdemo.consumer;
-
- import com.rabbitmq.client.*;
- import com.rabbitmqdemo.Utils.MQConnectionUtils;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- public class DirectSMSConsumer {
- private static final String QUEUE_NAME = "consumer_direct_sms";
- private static final String EXCHANGE_NAME = "direct_exchange";
-
- public static void main(String[] args) throws IOException, TimeoutException {
- System.out.println("短信消费者启动");
- // 1.创建新的连接
- Connection connection = MQConnectionUtils.newConnection();
- // 2.创建通道
- Channel channel = connection.createChannel();
- // 3.消费者关联队列
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- // 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
- channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
- throws IOException {
- String msg = new String(body, "UTF-8");
- System.out.println("消费者获取生产者消息:" + msg);
- }
- };
- // 5.消费者监听队列消息
- channel.basicConsume(QUEUE_NAME, true, consumer);
- }
- }
邮件消费者代码:
- package com.rabbitmqdemo.consumer;
-
- import com.rabbitmq.client.*;
- import com.rabbitmqdemo.Utils.MQConnectionUtils;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- public class DirectEmailConsumer {
- private static final String QUEUE_NAME = "consumer_direct_email";
- private static final String EXCHANGE_NAME = "direct_exchange";
-
- public static void main(String[] args) throws IOException, TimeoutException {
- System.out.println("邮件消费者启动");
- // 1.创建新的连接
- Connection connection = MQConnectionUtils.newConnection();
- // 2.创建通道
- Channel channel = connection.createChannel();
- // 3.消费者关联队列
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- // 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
- channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
- channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
- throws IOException {
- String msg = new String(body, "UTF-8");
- System.out.println("消费者获取生产者消息:" + msg);
- }
- };
- // 5.消费者监听队列消息
- channel.basicConsume(QUEUE_NAME, true, consumer);
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。