赞
踩
在RabbitMQ中,生产者发送消息是不会直接将消息直接推送到消息推送到队列中的,实际上推送的过程是交由交换机推送到队列中(如果没有定义交换机会使用MQ默认的交换机推送),生产者只能推送消息到交换机中。其中,在之前的推送中,队列中的每条消息只能被消费者消费一次,通过交换机我们就可以实现把消息推到不同的队列,进而实现多个消费者消相同的消息。
注:交换机可以在提供者/消费者任意一端声明。
以下案例建议先启动消费者开启监听,后启动消息提供者,方便测试结果。
通过routingKey和bindingKey将交换机和路由绑定,交换机通过对比Key值将消息推送到指定的队列中。
public class DirectProduct {
private static final String EXCHANGE_NAME = "DIRECT_EXCHANGE";
private static final String routingKey = "direct_key";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String message = "测试一条消息";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println("[提供者] ==> 推送成功!");
}
}
public class DirectCustomer { private static final String EXCHANGE_NAME = "DIRECT_EXCHANGE"; private static final String routingKey = "direct_key"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 获取临时队列,队列消息消费完后自动删除 String queueName = channel.queueDeclare().getQueue(); // 绑定队列 channel.queueBind(queueName, EXCHANGE_NAME, routingKey); DeliverCallback deliverCallback = (tag, message) -> { System.out.println("[消费者] ==> " + new String(message.getBody())); }; CancelCallback cancelCallback = (tag) -> { System.out.println("消息接收失败"); }; // 绑定交换机和队列 channel.basicConsume(queueName, true, deliverCallback, cancelCallback); } }
消费者启动(先启动,方便监听)
提供者启动
将路由按模式匹配,可以通过设置 # 或 ***** 对队列中定义的routingKey模糊匹配,匹配成功后将消息转入队列
**#:**匹配一个或多个词,如:ta.#.te == ta.tb.tc.td.te
**:**匹配一个词,如:td..te == td.tf.te
public class TopicProducer { private static final String EXCHANGE_NAME = "EXCHANGE_NAME_TOPIC"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); // 输入 Scanner scanner = new Scanner(System.in); while (true) { System.out.println("请输入发送消息:"); String message = scanner.nextLine(); if (Objects.equals(message, "exit")){ break; } System.out.println("请输入发送路由:"); String routerKey = scanner.nextLine(); channel.basicPublish(EXCHANGE_NAME, routerKey, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println("[主题交换机<" + routerKey + ">]: " + message); } } }
public class TopicCustomer_A { private static final String EXCHANGE_NAME = "EXCHANGE_NAME_TOPIC"; public static final String CODER_LURE = "coder.#.lure"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明一个临时队列,队列名称随机,当消费者端口与队列的连接时,队列自动删除 String queueName = channel.queueDeclare().getQueue(); // 绑定交换机和队列 channel.queueBind(queueName, EXCHANGE_NAME, CODER_LURE); System.out.println("[主题交换机<消费者A>] ==<接收成功,routingKey:[" + CODER_LURE + "]>==> 监听开始!"); DeliverCallback deliverCallback = (tag, message) -> { String messageContent = "[主题交换机<消费者A>] ==<接收成功,routingKey:[" + CODER_LURE + "]>==> " + new String(message.getBody()); System.out.println(messageContent); }; CancelCallback cancelCallback = (tag) -> { System.out.println("[主题交换机<消费者A>] ==<接收取消,routingKey:[" + CODER_LURE + "]>==> 失败!"); }; // 接收消息 channel.basicConsume(queueName, true, deliverCallback, cancelCallback); } }
public class TopicCustomer_B { private static final String EXCHANGE_NAME = "EXCHANGE_NAME_TOPIC"; public static final String CODER_LURE = "*.coder.lure"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明一个临时队列,队列名称随机,当消费者端口与队列的连接时,队列自动删除 String queueName = channel.queueDeclare().getQueue(); // 绑定交换机和队列 channel.queueBind(queueName, EXCHANGE_NAME, CODER_LURE); System.out.println("[主题交换机<消费者B>] ==<接收成功,routingKey:[" + CODER_LURE + "]>==> 监听开始!"); DeliverCallback deliverCallback = (tag, message) -> { String messageContent = "[主题交换机<消费者B>] ==<接收成功,routingKey:[" + CODER_LURE + "]>==> " + new String(message.getBody()); System.out.println(messageContent); }; CancelCallback cancelCallback = (tag) -> { System.out.println("[主题交换机<消费者B>] ==<接收取消,routingKey:[" + CODER_LURE + "]>==> 失败!"); }; // 接收消息 channel.basicConsume(queueName, true, deliverCallback, cancelCallback); } }
启动提供者方法,控制台信息如下:
消费者A接收
消费者B接收
首部交换机和扇出交换机都不需要路由键(routingKey),交换机时通过headers头部来将消息映射到队列中。其中Hash结构中要求携带一个键 ‘x-match’ ,这个键值可以是any或者all。
any:只要在发布消息是携带有一对键值对headers满足队列定义的多个参数arguments中其中一个,就能匹配上。(需要key:value完全匹配才行)
all:需要所有entry和绑定在队列上的所有entry完全匹配
/** * HeadersProducer * * @author codeの诱惑 * @version 1.0.0 * @Descript 首部交换机: * 首部交换机和扇出交换机都不需要路由键(routingKey),交换机时通过headers头部来将消息映射到队列中 * 其中Hash结构中要求携带一个键 ‘x-match’ ,这个键值可以是any或者all * any:只要在发布消息是携带有一对键值对headers满足队列定义的多个参数arguments中其中一个,就能匹配上。(需要key:value完全匹配才行) * all:需要所有entry和绑定在队列上的所有entry完全匹配 * Created on 2023/5/2 17:42 */ public class HeadersProducer { private static final String EXCHANGE_NAME = "HEADERS_EXCHANGE"; public static void main(String[] args) throws IOException, TimeoutException { Connection conn = RabbitConnectionUtil.getConnection(); Channel channel = conn.createChannel(); // 声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS); Map<String, Object> headers = new HashMap<String, Object>(); headers.put("author", "coderの诱惑"); headers.put("exchangeName", "Headers Exchange"); headers.put("version", "v1.0"); System.out.println("发送一条消息"); AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().headers(headers).build(); String message = "hello world!"; channel.basicPublish(EXCHANGE_NAME, "", properties, message.getBytes(StandardCharsets.UTF_8)); } }
/** * HeaderExchangeCustomer * * @author codeの诱惑 * @version 1.0.0 * @Descript 首部交换机 * Created on 2023/5/3 0:11 */ public class HeaderExchangeCustomer { private static final String EXCHANGE_NAME = "HEADERS_EXCHANGE"; public static void main(String[] args) throws IOException, TimeoutException { Connection conn = RabbitConnectionUtil.getConnection(); Channel channel = conn.createChannel(); Map<String, Object> arguments = new HashMap<String, Object>(); // arguments.put("x-match", "all"); // 测试不匹配 arguments.put("all", "测试不同headers,不匹配"); arguments.put("x-match", "any"); arguments.put("author", "coderの诱惑"); arguments.put("exchangeName", "Headers Exchange"); arguments.put("version", "v1.0"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "", arguments); System.out.println("[首部交换机]:消费者等待消息中..."); DeliverCallback deliverCallback = (tag, message) -> { byte[] body = message.getBody(); String string = new String(body); System.out.println(string); }; CancelCallback cancelCallback = (tag) -> { System.out.println("取消"); }; channel.basicConsume(queueName, true, deliverCallback, cancelCallback); } }
执行提供者后,消费者监听结果如下:
以广播的形式向队列推送消息,即类比通过广播发布通知,所有人接收通知内容。
以下以日志输出为案例,通过推送日志内容,一个负责将日志打印控制台,一个通过负责将日志写入日志文件当中。
private static final String EXCHANGE_NAME = "EXCHANGE_NAME_LOG";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String inputText = scanner.next();
channel.basicPublish(EXCHANGE_NAME, "", null, inputText.getBytes(StandardCharsets.UTF_8));
System.out.println("[生产者输出日志]: " + inputText);
}
}
}
public class ExConsumerForConsoleLog { private static final String EXCHANGE_NAME = "EXCHANGE_NAME_LOG"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitConnectionUtil.getConnection(); Channel channel = connection.createChannel(); /** * @param exchange 交换机名称 * @param type 交换机类型 */ channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); // 声明一个临时队列,队列名称随机,当消费者端口与队列的连接时,队列自动删除 String queueName = channel.queueDeclare().getQueue(); // 绑定交换机和队列 /** * @param queueName 队列名称 * @param exchangeName 交换机名称 * @param routingKey 路由key(由于扇出交换机是以广播形式推送消息到队列,routingKey可以为空 * ,即可以忽略routingKey值 * ,忽略后会向所有队列推送,如果指定了routingKey,那就和 ‘直接交换机’ 差不多了) */ channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println("等待接收队列消息, 打印控制台..."); DeliverCallback deliverCallback = (tag, message) -> { System.out.println("[控制台<消费者A>] ==<接收成功>==> " + new String(message.getBody())); }; CancelCallback cancelCallback = (tag) -> { System.out.println("[控制台<消费者A>] ==<接收取消>==> 失败!"); }; // 接收消息 channel.basicConsume(queueName, true, deliverCallback, cancelCallback); } }
public class ExConsumerForFileLog { private static final String EXCHANGE_NAME = "EXCHANGE_NAME_LOG"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitConnectionUtil.getConnection(); Channel channel = connection.createChannel(); /** * @param exchange 交换机名称 * @param type 交换机类型 */ channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); // 声明一个临时队列,队列名称随机,当消费者端口与队列的连接时,队列自动删除 String queueName = channel.queueDeclare().getQueue(); // 绑定交换机和队列 /** * @param queueName 队列名称 * @param exchangeName 交换机名称 * @param routingKey 路由key(由于扇出交换机是以广播形式推送消息到队列,routingKey可以为空 * ,即可以忽略routingKey值 * ,忽略后会向所有队列推送,如果指定了routingKey,那就和 ‘直接交换机’ 差不多了) */ channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println("等待接收队列消息, 写入日志文件..."); DeliverCallback deliverCallback = (tag, message) -> { String messageContent = "[日志文件<消费者B>] ==<接收成功>==> " + new String(message.getBody()); SimpleDateFormat sf = new SimpleDateFormat("yyyyMMddHHmmss"); String date = sf.format(new Date()); System.out.println(messageContent); writeFile(messageContent, "" + date + ".log"); }; CancelCallback cancelCallback = (tag) -> { System.out.println("[日志文件<消费者B>] ==<接收取消>==> 失败!"); }; // 接收消息 channel.basicConsume(queueName, true, deliverCallback, cancelCallback); } public static void writeFile(String message, String path) { try { String basePath = new String(System.getProperty("user.dir").getBytes(StandardCharsets.UTF_8)) + "\\RabbitMQ-simple\\src\\main\\java\\com\\mc\\exchange\\logs\\"; String absolutePath = basePath + path; System.out.println("存储路径:" + absolutePath); File Folder = new File(basePath); File file = new File(absolutePath); if (!Folder.exists()) { Folder.mkdirs(); } if (!file.exists()) { file.createNewFile(); } OutputStream out = new FileOutputStream(file); byte[] bytes = message.getBytes(StandardCharsets.UTF_8); out.write(bytes); out.close(); } catch (IOException e) { e.printStackTrace(); } } }
消息提供者控制台案例
消费者输出控制台
消费者输出日志文件
默认交换机实际和直接交换机一样,只不过默认交换机并不用特意声明交换机名称和routingKey 。以下不作案例演示。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。