赞
踩
发送者
- package org.example;
- import com.alibaba.fastjson.JSON;
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import java.io.IOException;
- import java.util.HashMap;
- import java.util.concurrent.TimeoutException;
- public class PublishProduct {
- public static void main(String[] args) {
- // 创建连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- // 设置 RabbitMQ 服务器的地址,我用的redis,RabbitMQ,现在redis里安装RabbitMQ
- factory.setHost("192.168.74.75");
- Connection connection = null;
- Channel channel = null;
- try {
- connection = factory.newConnection();
- // 创建一个通道
- channel = connection.createChannel();
- //创建交换机
- channel.exchangeDeclare("qy172-fanout-exchange", BuiltinExchangeType.FANOUT, true);
- //创建队列,如果存在则不会创建
- channel.queueDeclare("qy172-publish-queue01", true, false, false, null);
- channel.queueDeclare("qy172-publish-queue02", true, false, false, null);
- //交互机和队列绑定
- channel.queueBind("qy172-publish-queue01", "qy172-fanout-exchange", "");
- channel.queueBind("qy172-publish-queue02", "qy172-fanout-exchange", "");
- // 创建消息内容
- HashMap<String, Object> map = new HashMap<>();
- map.put("name", "张三");
- map.put("age", "22");
- //把数据给交换机,让他分发给队列
- channel.basicPublish("qy172-fanout-exchange", "", null, JSON.toJSONBytes(map));
- System.out.println("发送成功");
- } catch (IOException e) {
- // 发生 IO 异常时抛出运行时异常
- throw new RuntimeException(e);
- } catch (TimeoutException e) {
- // 发生超时异常时抛出运行时异常
- throw new RuntimeException(e);
- } finally {
- if (channel != null) {
- try {
- // 关闭通道
- channel.close();
- } catch (IOException | TimeoutException e) {
- // 发生 IO 或超时异常时抛出运行时异常
- throw new RuntimeException(e);
- }
- }
- if (connection != null) {
- try {
- // 关闭连接
- connection.close();
- } catch (IOException e) {
- // 发生 IO 异常时抛出运行时异常
- throw new RuntimeException(e);
- }
- }
- }
- }
- }

订阅者1
- package org.example;
- import com.alibaba.fastjson.JSON;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.Map;
- public class Consumer01 {
- public static void main(String[] args) throws Exception {
- // 创建连接工厂对象
- ConnectionFactory factory = new ConnectionFactory();
- // 设置 RabbitMQ 服务器的主机地址为 "192.168.74.75"
- factory.setHost("192.168.74.75");
- Connection connection = factory.newConnection();
- // 创建一个 RabbitMQ 连接
- Channel channel = connection.createChannel();
- // 创建一个通道,用于与 RabbitMQ 之间的通信
- com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
- // 创建一个消费者对象,并重写其方法
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- // 消费消息的处理方法
- String json = new String(body);
- // 将消息内容转换为字符串
- Map map = JSON.parseObject(json, Map.class);
- // 使用 JSON 解析成 Map 对象
- System.out.println("消息内容Consumer01"+map);
- // 输出消息内容
- }
- };
- channel.basicConsume("qy172-publish-queue01",true,consumer);
- }
- }

订阅者2
- package com.aaa;
- import com.alibaba.fastjson.JSON;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.Map;
- import java.util.concurrent.TimeoutException;
- public class Consumer02 {
- public static void main(String[] args) {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("192.168.74.75");
- try {
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- String json = new String(body);
- Map map = JSON.parseObject(json, Map.class);
- System.out.println("消息内容Consumer02" + map);
- }
- };
- //订阅者2
- channel.basicConsume("qy172-publish-queue02",true,consumer);
- } catch (IOException | TimeoutException e) {
- // 处理连接、通道创建或消费消息时可能抛出的异常
- e.printStackTrace();
- }
- }
- }

2路由模式
发送者
- package org.example;
- import com.alibaba.fastjson.JSON;
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- import java.io.IOException;
- import java.util.HashMap;
- import java.util.concurrent.TimeoutException;
- public class PublishProduct {
- public static void main(String[] args) {
- // 创建连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- // 设置 RabbitMQ 服务器的地址,我用的redis,RabbitMQ,现在redis里安装RabbitMQ
- factory.setHost("192.168.74.75");
- Connection connection = null;
- Channel channel = null;
- try {
- connection = factory.newConnection();
- // 创建一个通道
- channel = connection.createChannel();
- //创建交换机,
- channel.exchangeDeclare("qy172-router-exchange", BuiltinExchangeType.DIRECT, true);
- //创建队列,如果存在则不会创建
- channel.queueDeclare("qy172-router-queue01", true, false, false, null);
- channel.queueDeclare("qy172-router-queue02", true, false, false, null);
- //交互机和队列绑定
- channel.queueBind("qy172-router-queue01", "qy172-router-exchange", "error");
- channel.queueBind("qy172-router-queue02", "qy172-router-exchange", "error");
- channel.queueBind("qy172-router-queue02", "qy172-router-exchange", "info");
- channel.queueBind("qy172-router-queue02", "qy172-router-exchange", "warning");
- // 创建消息内容
- HashMap<String, Object> map = new HashMap<>();
- map.put("name", "张三");
- map.put("age", "22");
- //把数据给交换机,让他分发给队列
- channel.basicPublish("qy172-router-exchange","error",null,JSON.toJSONBytes(map));
- // channel.basicPublish("qy172-router-exchange","info",null,JSON.toJSONBytes(map));
- System.out.println("发送成功");
- } catch (IOException e) {
- // 发生 IO 异常时抛出运行时异常
- throw new RuntimeException(e);
- } catch (TimeoutException e) {
- // 发生超时异常时抛出运行时异常
- throw new RuntimeException(e);
- } finally {
- if (channel != null) {
- try {
- // 关闭通道
- channel.close();
- } catch (IOException | TimeoutException e) {
- // 发生 IO 或超时异常时抛出运行时异常
- throw new RuntimeException(e);
- }
- }
- if (connection != null) {
- try {
- // 关闭连接
- connection.close();
- } catch (IOException e) {
- // 发生 IO 异常时抛出运行时异常
- throw new RuntimeException(e);
- }
- }
- }
- }
- }

接收者1
- package org.example;
- import com.alibaba.fastjson.JSON;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.Map;
- public class Consumer01 {
- public static void main(String[] args) throws Exception {
- // 创建连接工厂对象
- ConnectionFactory factory = new ConnectionFactory();
- // 设置 RabbitMQ 服务器的主机地址为 "192.168.74.75"
- factory.setHost("192.168.74.75");
- Connection connection = factory.newConnection();
- // 创建一个 RabbitMQ 连接
- Channel channel = connection.createChannel();
- // 创建一个通道,用于与 RabbitMQ 之间的通信
- com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
- // 创建一个消费者对象,并重写其方法
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- // 消费消息的处理方法
- String json = new String(body);
- // 将消息内容转换为字符串
- Map map = JSON.parseObject(json, Map.class);
- // 使用 JSON 解析成 Map 对象
- System.out.println("消息内容Consumer01"+map);
- // 输出消息内容
- }
- };
- channel.basicConsume("qy172-router-queue01",true,consumer);
- }
- }

接收者2
- package org.example;
- import com.alibaba.fastjson.JSON;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.Map;
- public class Consumer01 {
- public static void main(String[] args) throws Exception {
- // 创建连接工厂对象
- ConnectionFactory factory = new ConnectionFactory();
- // 设置 RabbitMQ 服务器的主机地址为 "192.168.74.75"
- factory.setHost("192.168.74.75");
- Connection connection = factory.newConnection();
- // 创建一个 RabbitMQ 连接
- Channel channel = connection.createChannel();
- // 创建一个通道,用于与 RabbitMQ 之间的通信
- com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
- // 创建一个消费者对象,并重写其方法
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- // 消费消息的处理方法
- String json = new String(body);
- // 将消息内容转换为字符串
- Map map = JSON.parseObject(json, Map.class);
- // 使用 JSON 解析成 Map 对象
- System.out.println("消息内容Consumer01"+map);
- // 输出消息内容
- }
- };
- channel.basicConsume("qy172-router-queue01",true,consumer);
- }
- }

发送者
- package org.example;
- import com.alibaba.fastjson.JSON;
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import java.io.IOException;
- import java.util.HashMap;
- import java.util.concurrent.TimeoutException;
- public class PublishProduct {
- public static void main(String[] args) {
- // 创建连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- // 设置 RabbitMQ 服务器的地址,我用的redis,RabbitMQ,现在redis里安装RabbitMQ
- factory.setHost("192.168.74.75");
- Connection connection = null;
- Channel channel = null;
- try {
- connection = factory.newConnection();
- // 创建一个通道
- channel = connection.createChannel();
- //创建交换机,
- channel.exchangeDeclare("qy172-topic-exchange", BuiltinExchangeType.TOPIC, true);
- //创建队列,如果存在则不会创建
- channel.queueDeclare("qy172-topic-queue01", true, false, false, null);
- channel.queueDeclare("qy172-topic-queue02", true, false, false, null);
- //交互机和队列绑定
- //主题匹配给这个
- channel.queueBind("qy172-topic-queue01", "qy172-topic-exchange", "*.orange.*");
- //主题,也匹配给这个
- channel.queueBind("qy172-topic-queue02", "qy172-topic-exchange", "*.*.rabbit");
- channel.queueBind("qy172-topic-queue02", "qy172-topic-exchange", "lazy.#");
- // 创建消息内容
- HashMap<String, Object> map = new HashMap<>();
- map.put("name", "张三");
- map.put("age", "22");
- //把数据给交换机,让他分发给队列
- channel.basicPublish("qy172-topic-exchange","lazy.orange.rabbit",null,JSON.toJSONBytes(map));
- System.out.println("发送成功");
- } catch (IOException e) {
- // 发生 IO 异常时抛出运行时异常
- throw new RuntimeException(e);
- } catch (TimeoutException e) {
- // 发生超时异常时抛出运行时异常
- throw new RuntimeException(e);
- } finally {
- if (channel != null) {
- try {
- // 关闭通道
- channel.close();
- } catch (IOException | TimeoutException e) {
- // 发生 IO 或超时异常时抛出运行时异常
- throw new RuntimeException(e);
- }
- }
- if (connection != null) {
- try {
- // 关闭连接
- connection.close();
- } catch (IOException e) {
- // 发生 IO 异常时抛出运行时异常
- throw new RuntimeException(e);
- }
- }
- }
- }
- }

接收者1
- package org.example;
- import com.alibaba.fastjson.JSON;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.Map;
- public class Consumer01 {
- public static void main(String[] args) throws Exception {
- // 创建连接工厂对象
- ConnectionFactory factory = new ConnectionFactory();
- // 设置 RabbitMQ 服务器的主机地址为 "192.168.74.75"
- factory.setHost("192.168.74.75");
- Connection connection = factory.newConnection();
- // 创建一个 RabbitMQ 连接
- Channel channel = connection.createChannel();
- // 创建一个通道,用于与 RabbitMQ 之间的通信
- com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
- // 创建一个消费者对象,并重写其方法
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- // 消费消息的处理方法
- String json = new String(body);
- // 将消息内容转换为字符串
- Map map = JSON.parseObject(json, Map.class);
- // 使用 JSON 解析成 Map 对象
- System.out.println("消息内容Consumer01"+map);
- // 输出消息内容
- }
- };
- channel.basicConsume("qy172-topic-queue01",true,consumer);
- }
- }

接收者2
- package com.aaa;
- import com.alibaba.fastjson.JSON;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.Map;
- import java.util.concurrent.TimeoutException;
- public class Consumer02 {
- public static void main(String[] args) {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("192.168.74.75");
- try {
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- String json = new String(body);
- Map map = JSON.parseObject(json, Map.class);
- System.out.println("消息内容Consumer02" + map);
- }
- };
- //订阅者2
- channel.basicConsume("qy172-topic-queue02",true,consumer);
- } catch (IOException | TimeoutException e) {
- // 处理连接、通道创建或消费消息时可能抛出的异常
- e.printStackTrace();
- }
- }
- }

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。