赞
踩
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件,RabbitMQ服务器是用Erlang语言编写的,即需要先安装部署Erlang环境再安装RabbitMQ环境。
RabbitMQ与Erlang版本对照表(https://www.rabbitmq.com/which-erlang.html)
本次安装的版本号:
Windows系统:Windows 11 家庭中文版
RabbitMQ:rabbitmq-server-3.12.1.exe
Erlang:otp_win64_26.0.exe
提示:以下是本篇文章正文内容
进到安装目录,运行命令rabbitmq-plugins enable rabbitmq_management
双击rabbitmq-server.bat
访问http://localhost:15672/
基于spring-boot 3.1.0
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
生产者:
public class Simple {
private final static String QUENE_NAME = "simple_queue";
public static void main(String[] args) throws URISyntaxException, NoSuchAlgorithmException, IOException, KeyManagementException, TimeoutException {
Connection connection = ConnUtils.getConn();
Channel channel = connection.createChannel();
channel.queueDeclare(QUENE_NAME, false, false, false, null);
String message = "hello, rabbitMq!";
channel.basicPublish("", QUENE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
package com.example.mqproducer.test; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.net.URISyntaxException; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.util.concurrent.TimeoutException; public class ConnUtils { public static Connection getConn() throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException, IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); return connection; } }
消费者:
生产者:
package com.example.mqproducer.test.producer; import com.example.mqproducer.test.ConnUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Work { private final static String QUEUE_NAME = "work_queue"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnUtils.getConn(); // 获取通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 循环发布任务 for (int i = 0; i < 50; i++) { // 消息内容 String message = "task .. " + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); Thread.sleep(i * 2); } // 关闭通道和连接 channel.close(); connection.close(); } }
消费者:
package com.example.mqproducer.test.consumer; import com.example.mqproducer.test.ConnUtils; import com.rabbitmq.client.*; import java.io.IOException; public class Work1 { private final static String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnUtils.getConn(); //轻量级的Connection Channel channel = connection.createChannel(); //创建队列,创建完成后才能发送消息--防止消费者去找的时候生产者还没提供,导致报错 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //接收消息 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { String msg = new String(body); System.out.println("[消费者1] received : "+msg + "!"); //手动维护Ack channel.basicAck(envelope.getDeliveryTag(),false); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }; channel.basicConsume(QUEUE_NAME,false,consumer); } }
package com.example.mqproducer.test.consumer; import com.example.mqproducer.test.ConnUtils; import com.rabbitmq.client.*; import java.io.IOException; public class Work2 { private final static String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnUtils.getConn(); //轻量级的Connection Channel channel = connection.createChannel(); //创建队列,创建完成后才能发送消息--防止消费者去找的时候生产者还没提供,导致报错 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //接收消息 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { String msg = new String(body); System.out.println("[消费者2] received : "+msg + "!"); //手动维护Ack channel.basicAck(envelope.getDeliveryTag(),false); } catch (Exception e) { e.printStackTrace(); } } }; channel.basicConsume(QUEUE_NAME,false,consumer); } }
生产者:
package com.example.mqproducer.test.producer; import com.example.mqproducer.test.ConnUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Fanout { private final static String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnUtils.getConn(); // 获取通道 Channel channel = connection.createChannel(); // 声明exchange,指定类型为fanout channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 消息内容 String message = "Hello everyone"; // 发布消息到Exchange channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [生产者] Sent '" + message + "'"); channel.close(); connection.close(); } }
消费者:
package com.example.mqproducer.test.consumer; import com.example.mqproducer.test.ConnUtils; import com.rabbitmq.client.*; import java.io.IOException; public class Fanout1 { private final static String QUEUE_NAME = "fanout_exchange_queue_1"; private final static String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnUtils.getConn(); // 获取通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 定义队列的消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息体 String msg = new String(body); System.out.println(" [消费者1] received : " + msg + "!"); } }; // 监听队列,自动返回完成 channel.basicConsume(QUEUE_NAME, true, consumer); } }
package com.example.mqproducer.test.consumer; import com.example.mqproducer.test.ConnUtils; import com.rabbitmq.client.*; import java.io.IOException; public class Fanout2 { private final static String QUEUE_NAME = "fanout_exchange_queue_2"; private final static String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnUtils.getConn(); // 获取通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 定义队列的消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息体 String msg = new String(body); System.out.println(" [消费者2] received : " + msg + "!"); } }; // 监听队列,自动返回完成 channel.basicConsume(QUEUE_NAME, true, consumer); } }
生产者:
package com.example.mqproducer.test.producer; import com.example.mqproducer.test.ConnUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Direct { private final static String EXCHANGE_NAME = "direct_exchange"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnUtils.getConn(); // 获取通道 Channel channel = connection.createChannel(); // 声明exchange,指定类型为direct channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 消息内容 String message = "商品更新了, id = 1001"; // 发送消息,并且指定routing key 为:insert ,代表新增商品 channel.basicPublish(EXCHANGE_NAME, "update", null, message.getBytes()); System.out.println(" [商品服务:] Sent '" + message + "'"); channel.close(); connection.close(); } }
消费者:
package com.example.mqproducer.test.consumer; import com.example.mqproducer.test.ConnUtils; import com.rabbitmq.client.*; import java.io.IOException; public class Direct2 { private final static String QUEUE_NAME = "direct_exchange_queue_2"; private final static String EXCHANGE_NAME = "direct_exchange"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnUtils.getConn(); // 获取通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定队列到交换机,同时指定需要订阅的routing key。订阅 insert、update、delete channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete"); // 定义队列的消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息体 String msg = new String(body); System.out.println(" [消费者2] received : " + msg + "!"); } }; // 监听队列,自动ACK channel.basicConsume(QUEUE_NAME, true, consumer); } }
package com.example.mqproducer.test.consumer; import com.example.mqproducer.test.ConnUtils; import com.rabbitmq.client.*; import java.io.IOException; public class Direct1 { private final static String QUEUE_NAME = "direct_exchange_queue_1"; private final static String EXCHANGE_NAME = "direct_exchange"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnUtils.getConn(); // 获取通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定队列到交换机,同时指定需要订阅的routing key。假设此处需要update和delete消息 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete"); // 定义队列的消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息体 String msg = new String(body); System.out.println(" [消费者1] received : " + msg + "!"); } }; // 监听队列,自动ACK channel.basicConsume(QUEUE_NAME, true, consumer); } }
生产者:
package com.example.mqproducer.test.producer; import com.example.mqproducer.test.ConnUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Topic { private final static String EXCHANGE_NAME = "topic_exchange"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnUtils.getConn(); // 获取通道 Channel channel = connection.createChannel(); // 声明exchange,指定类型为topic channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 消息内容 String message = "新增商品 : id = 1001"; // 发送消息,并且指定routing key 为:insert ,代表新增商品 channel.basicPublish(EXCHANGE_NAME, "item.insert", null, message.getBytes()); System.out.println(" [商品服务:] Sent '" + message + "'"); channel.close(); connection.close(); } }
消费者:
package com.example.mqproducer.test.consumer; import com.example.mqproducer.test.ConnUtils; import com.rabbitmq.client.*; import java.io.IOException; public class Topic1 { private final static String QUEUE_NAME = "topic_exchange_queue_1"; private final static String EXCHANGE_NAME = "topic_exchange"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnUtils.getConn(); // 获取通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定队列到交换机,同时指定需要订阅的routing key。需要 update、delete channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete"); // 定义队列的消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息体 String msg = new String(body); System.out.println(" [消费者1] received : " + msg + "!"); } }; // 监听队列,自动ACK channel.basicConsume(QUEUE_NAME, true, consumer); } }
package com.example.mqproducer.test.consumer; import com.example.mqproducer.test.ConnUtils; import com.rabbitmq.client.*; import java.io.IOException; public class Topic2 { private final static String QUEUE_NAME = "topic_exchange_queue_2"; private final static String EXCHANGE_NAME = "topic_exchange"; public static void main(String[] argv) throws Exception { // 获取到连接 Connection connection = ConnUtils.getConn(); // 获取通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定队列到交换机,同时指定需要订阅的routing key。需要 update、delete channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.*"); // 定义队列的消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息体 String msg = new String(body); System.out.println(" [消费者1] received : " + msg + "!"); } }; // 监听队列,自动ACK channel.basicConsume(QUEUE_NAME, true, consumer); } }
以上就是今天要讲的内容,仅简单介绍了RabbitMQ的使用。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。