赞
踩
目录
拉取镜像
docker pull rabbitmq:3.8.8-management
查看镜像
docker images rabbitmq
启动镜像
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.8.8-management
Linux虚拟机记得开放5672端口或者关闭防火墙,在window通过 主机ip:15672 访问rabbitmq控制台
用户名密码默认为guest
- <!--指定 jdk 编译版本-->
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <source>8</source>
- <target>8</target>
- </configuration>
- </plugin>
- </plugins>
- </build>
- <dependencies>
- <!--rabbitmq 依赖客户端-->
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>5.8.0</version>
- </dependency>
- <!--操作文件流的一个依赖-->
- <dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- <version>2.6</version>
- </dependency>
- </dependencies>
工作原理
我们需要先获取连接(Connection),然后通过连接获取信道(Channel),这里我们演示简单例子,可以直接跳过交换机(Exchange)发送队列(Queue)
- public class Producer {
-
- private final static String QUEUE_NAME = "hello";
-
- public static void main(String[] args) throws Exception {
- //创建一个连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- // 设置主机ip
- factory.setHost("182.92.234.71");
- // 设置用户名
- factory.setUsername("guest");
- // 设置密码
- factory.setPassword("guest");
- //channel 实现了自动 close 接口 自动关闭 不需要显示关闭
- Connection connection = factory.newConnection();
- // 获取信道
- Channel channel = connection.createChannel();
- /*
- * 生成一个队列
- * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
- Map<String, Object> arguments)
- * 1.队列名称
- * 2.队列里面的消息是否持久化 默认消息存储在内存中
- * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
- * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
- * 5.其他参数
- **/
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- String message = "hello rabbitmq";
- /*
- * 发送一个消息
- * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
- * 1.发送到哪个交换机
- * 2.路由的key是哪个
- * 3.其他的参数信息
- * 4.发送消息的消息体
- *
- **/
- channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
- System.out.println("发送成功");
- }
- }
- public class Consumer {
-
- private static final String QUEUE_NAME = "hello";
-
- public static void main(String[] args) throws Exception {
- //创建一个连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- // 设置主机ip
- factory.setHost("182.92.234.71");
- // 设置用户名
- factory.setUsername("guest");
- // 设置密码
- factory.setPassword("guest");
- //channel 实现了自动 close 接口 自动关闭 不需要显示关闭
- Connection connection = factory.newConnection();
- // 获取信道
- Channel channel = connection.createChannel();
-
- // 推送的消息如何进行消费的回调接口
- DeliverCallback deliverCallback = (consumerTag, message) -> {
- System.out.println(new String(message.getBody()));
- };
- // 取消消费的一个回调接口,如在消费的时候队列被删除了
- CancelCallback cancelCallback = (consumerTag) -> {
- System.out.println("消息消费被中断");
- };
- /*
- * 消费者消费消息
- * basicConsume(String queue, boolean autoAck,
- * DeliverCallback deliverCallback, CancelCallback cancelCallback)
- * 1.消费哪个队列
- * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
- * 3.消费者未成功消费的回调
- **/
- channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
- }
- }
可以发现,上面获取连接工厂,然后获取连接,再获取信道的步骤是一致的,我们可以抽取成一个工具类来调用,并使用单例模式-饿汉式完成信道的初始化
- public class RabbitMqUtils {
-
- private static Channel channel;
-
- static {
- ConnectionFactory factory = new ConnectionFactory();
- // 设置ip地址
- factory.setHost("192.168.23.100");
- // 设置用户名
- factory.setUsername("guest");
- // 设置密码
- factory.setPassword("guest");
- try {
- // 创建连接
- Connection connection = factory.newConnection();
- // 获取信道
- channel = connection.createChannel();
- } catch (Exception e) {
- System.out.println("创建信道失败,错误信息:" + e.getMessage());
- }
- }
-
- public static Channel getChannel() {
- return channel;
- }
- }
相当于前面的消费者,我们只需要写一个类,通过ideal实现多线程启动即可模拟两个线程
- public class Worker01 {
-
- private final static String QUEUE_NAME = "hello";
-
- public static void main(String[] args) throws Exception {
- Channel channel = RabbitMqUtils.getChannel();
- DeliverCallback deliverCallback = ( consumerTag, message) -> {
- System.out.println("接受到消息:" + new String(message.getBody()));
- };
- CancelCallback cancelCallback = (cunsumerTag) -> {
- System.out.println("消费者取消消费接口回调逻辑");
- };
- // 启动两次,第一次为C1, 第二次为C2
- System.out.println("C2消费者等待消费消息");
- channel.basicConsume(QUEUE_NAME, true, deliverCallback,cancelCallback);
- }
- }
- public class Test01 {
-
- private final static String QUEUE_NAME = "hello";
-
- public static void main(String[] args) throws IOException {
- Channel channel = RabbitMqUtils.getChannel();
-
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- // 通过控制台输入充当消息,使轮训演示更明显
- Scanner scanner = new Scanner(System.in);
- while(scanner.hasNext()) {
- String message = scanner.next();
- channel.basicPublish("", QUEUE_NAME,null, message.getBytes() );
- System.out.println("消息发送完成:" + message);
- }
- }
- }
结果
自动应答:消费者发送后立即被认为已经传送成功。这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了。
当然另一方面这种模式消费者那边可以传递过载的消息, 没有对传递的消息数量进行限制 ,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终 使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并 以某种速率能够处理这些消息的情况下使用 。
手动应答:消费者接受到消息并顺利完成业务后再调用方法进行确认,rabbitmq 才可以把该消息删除
- public class Test01 {
-
- private final static String QUEUE_NAME = "ack";
-
- public static void main(String[] args) throws IOException {
- Channel channel = RabbitMqUtils.getChannel();
-
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- Scanner scanner = new Scanner(System.in);
- while(scanner.hasNext()) {
- String message = scanner.next();
- channel.basicPublish("", QUEUE_NAME,null, message.getBytes() );
- System.out.println("消息发送完成:" + message);
- }
- }
- }
- public class SleepUtils {
-
- public static void sleep(int second) {
- try {
- Thread.sleep(1000 * second);
- } catch (InterruptedException _ignored) {
- Thread.currentThread().interrupt();
- }
- }
- }
- public class Worker01 {
-
- private final static String QUEUE_NAME = "ack";
-
- public static void main(String[] args) throws Exception {
- System.out.println("C1,业务时间短");
- Channel channel = RabbitMqUtils.getChannel();
- DeliverCallback deliverCallback = ( consumerTag, message) -> {
- SleepUtils.sleep(1); // 模拟业务执行1秒
- System.out.println("接受到消息:" + new String(message.getBody()));
- /*
- * 1、消息标识
- * 2、是否启动批量确认,false:否。
- * 启用批量有可能造成消息丢失,比如未消费的消息提前被确然删除,后面业务消费该消息
- * 时出现异常会导致该消息的丢失
- */
- channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
- };
- CancelCallback cancelCallback = (cunsumerTag) -> {
- System.out.println("消费者取消消费接口回调逻辑");
- };
- boolean autoAck = false;
- channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback,cancelCallback);
- }
- }
-
- ==============================================================================
- public class Worker02 {
-
- private final static String QUEUE_NAME = "ack";
-
- public static void main(String[] args) throws Exception {
- System.out.println("C2,业务时间长");
- Channel channel = RabbitMqUtils.getChannel();
- DeliverCallback deliverCallback = ( consumerTag, message) -> {
- SleepUtils.sleep(15); // 模拟业务执行15秒
- System.out.println("接受到消息:" + new String(message.getBody()));
- /*
- * 1、消息标识
- * 2、是否启动批量确认,false:否。
- * 启用批量有可能造成消息丢失,比如未消费的消息提前被确然删除,后面业务消费该消息
- * 时出现异常会导致该消息的丢失
- */
- channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
- };
- CancelCallback cancelCallback = (cunsumerTag) -> {
- System.out.println("消费者取消消费接口回调逻辑");
- };
- boolean autoAck = false;
- channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback,cancelCallback);
- }
- }
worker01业务时间短,worker02业务时间长,我们提前终止worker02模拟出异常,可以看到消息dd会被放回队列由worker01接收处理。
注意:这里需要先启动生产者声明队列ack,不然启动消费者会报错
最后一个案例我们可以看到消息轮训+消息自动重新入队+手动应答。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。