赞
踩
<!-- 指定JDK编译版本 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.13.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/commons-io/commons-io -->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.11.0</version>
</dependency>
package cn.cnyasin.rabbit.hello; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /* * 初始化 * 作用: * 创建交换机 * 创建队列 * 绑定队列、交换机、路由key */ public class Initialization { // 交换机名 public static final String EXCHANGE_NAME = "exchange01"; // 队列名 public static final String QUEUE_NAME = "queue01"; // 路由key public static final String ROUTING_KEY = "routing01"; public static void main(String[] args) throws Exception { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 配置 factory.setHost("192.168.3.202"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("123456"); factory.setVirtualHost("/"); // 获取连接 Connection connection = factory.newConnection(); // 获取信道 Channel channel = connection.createChannel(); // 声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, "direct", true); // 声明队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 绑定队列、交换机、路由key channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); connection.close(); System.out.println("初始化成功。。。"); } }
package cn.cnyasin.rabbit.hello; import com.rabbitmq.client.*; /* * 消费者 * 消费者消费消息流程 * 建立连接(connection) * 获取信道(channel) * 消费队列中的消息,自动应答 * 接收消息回调方法 * 拒绝消息回调方法 */ public class Consumer { // 队列名 public static final String QUEUE_NAME = "queue01"; public static void main(String[] args) throws Exception { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.3.202"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("123456"); factory.setVirtualHost("/"); // 创建连接 Connection connection = factory.newConnection(); // 获取信道 Channel channel = connection.createChannel(); // 消费队列中的消息 channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); } // 接收消息回调方法 public static DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> { System.out.println(" [*] 成功处理消息:" + new String(message.getBody())); }; // 拒绝消息回调方法 public static CancelCallback cancelCallback = (String consumerTag) -> { System.out.println("消费消息失败"); }; }
package cn.cnyasin.rabbit.hello; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /* * 生产者 * 发送消息流程: * 建立连接(connection) * 获取信道(channel) * 通过信道将消息发送到指定交换机(exchange),并绑定路由key(routingKey),路由key可以是多个 * 注意: * 生产者不需要关心队列(queue) * 生产者发送消息前需要准备好: * 创建相关交换机 * 创建相关队列 * 绑定队列、交换机、路由key */ public class Producer { // 交换机名 public static final String EXCHANGE_NAME = "exchange01"; // 路由key public static final String ROUTING_KEY = "routing01"; public static void main(String[] args) throws Exception { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 配置信息 factory.setHost("192.168.3.202"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("123456"); factory.setVirtualHost("/"); // 创建连接 Connection connection = factory.newConnection(); // 获取信道 Channel channel = connection.createChannel(); // 发送消息 channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, "hello.".getBytes()); connection.close(); System.out.println("消息发送成功。。。"); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。