赞
踩
application.yml
spring:
rabbitmq:
port: 5672
host: 127.0.0.1
username: guest
password: guest
# listener:
# simple:
# prefetch: 1
生产者生产消息发送给队列,消费者监听到队列有消息就进行消费(底层实现是生产者发送消息给默认交换机,再由交换机转发到队列)
package com.yzm.rabbitmq_01.config; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { public static final String HELLO_QUEUE = "hello_queue"; /** * 消息队列 * durable:设置是否持久化。持久化的队列会存盘,在服务器重启的时候可以保证不丢失相关信息 * exclusive:设置是否排他。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除 * autoDelete:设置是否自动删除。自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除 */ @Bean public Queue helloQueue() { return new Queue(HELLO_QUEUE, true, false, false); } }
消息发布
package com.yzm.rabbitmq_01.sender; import com.yzm.rabbitmq_01.config.RabbitConfig; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; /** * 消息发布 */ @RestController public class HelloSender { private final AmqpTemplate template; public HelloSender(AmqpTemplate template) { this.template = template; } @GetMapping("/send") public void send(@RequestParam(value = "message", required = false, defaultValue = "Hello World") String message) { for (int i = 1; i <= 10; i++) { String msg = message + " ..." + i; System.out.println(" [ 生产者 ] Sent ==> '" + msg + "'"); template.convertAndSend(RabbitConfig.HELLO_QUEUE, msg); } } }
消息监听
package com.yzm.rabbitmq_01.receiver; import com.yzm.rabbitmq_01.config.RabbitConfig; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 消息监听 */ @Component @RabbitListener(queues = RabbitConfig.HELLO_QUEUE) public class HelloReceiver { @RabbitHandler public void receive(String message) { System.out.println(" [ 消费者 ] Received ==> '" + message + "'"); } }
启动项目
RabbitMQ服务器上能看到hello_queue队列
其中:
Ready:表示待消费数量;队列中拥有可以被消费者消费的消息数量。
Unacked:表示待确认数量;队列分配消息给消费者时,给该条消息一个待确认状态,当消费者确认消息之后,队列才会移除该条消息。
Total:表示待消费数和待确认数的总和
访问:http://localhost:8080/send
新增一个消费者
注意:这里的监听方式跟之前的不一样,接收的参数是Message消息主体,这种比较推荐
package com.yzm.rabbitmq_01.receiver; import com.yzm.rabbitmq_01.config.RabbitConfig; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 消息监听 */ @Component public class HelloReceiver2 { /** * 使用 Message 接收消息,不能使用HelloReceiver那种监听队列方式 * 而是直接在方法上监听队列,不然会报错 No method found for class java.lang.String */ @RabbitListener(queues = RabbitConfig.HELLO_QUEUE) public void receive2(Message message) { System.out.println(" [ 消费者@2 ] Received ==> '" + new String(message.getBody()) + "'"); } }
重新启动,访问:localhost:8080/send
10条消息被2个消费者平均分配了
队列的消息分配方式默认是平均分配,即第一条消息分配给一个消息者,第二条消息就分配给另一个消息者,以此类推…
上面示例有2个消费者监听,由于只是简单的打印语句,所以看不出有什么问题。
我进行修改一下,通过设置线程休眠时间来表示消费者处理消费的任务时间
/** * 消息监听 */ //@Component 注释掉,不用这种监听方式了 @RabbitListener(queues = RabbitConfig.HELLO_QUEUE) public class HelloReceiver { @RabbitHandler public void receive(String message) { System.out.println(" [ 消费者 ] Received ==> '" + message + "'"); } } /** * 消息监听 */ @Component public class HelloReceiver2 { private int count1=1; private int count2=1; @RabbitListener(queues = RabbitConfig.HELLO_QUEUE) public void receive1(Message message) throws InterruptedException { Thread.sleep(200); System.out.println(" [ 消费者@1号 ] Received ==> '" + new String(message.getBody()) + "'"); System.out.println(" [ 消费者@1号 ] 处理消息数:" + count1++); } @RabbitListener(queues = RabbitConfig.HELLO_QUEUE) public void receive2(Message message) throws InterruptedException { Thread.sleep(1000); System.out.println(" [ 消费者@2号 ] Received ==> '" + new String(message.getBody()) + "'"); System.out.println(" [ 消费者@2号 ] 处理消息数:" + count2++); } }
运行结果:
现在就能很明显的看出,消费者1号很快地处理完消息后就处于空闲状态;而消费者2号却一直很忙碌。当消息数量成千上万的时候,由消费者2号处理的消息会堆积很多,达不到时效性。
针对这种问题,rabbitmq提供了一种解决方案。
设置prefetch参数=1,实现原理是:队列只会分配一条消息给对应的监听消费者,收到消费者的确认回复之后才会重新分配另一条消息。
启动prefetch功能(方式一)
@Configuration public class RabbitConfig { public static final String HELLO_QUEUE = "hello_queue"; public static final String PREFETCH_ONE = "prefetchOne"; /** * 消息队列 * durable:设置是否持久化。持久化的队列会存盘,在服务器重启的时候可以保证不丢失相关信息 * exclusive:设置是否排他。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除 * autoDelete:设置是否自动删除。自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除 */ @Bean public Queue helloQueue() { return new Queue(HELLO_QUEUE, true, false, false); } @Bean(name = PREFETCH_ONE) public RabbitListenerContainerFactory<SimpleMessageListenerContainer> prefetchOne(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); // 手动确认 // factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 设置prefetch factory.setPrefetchCount(1); return factory; } }
同时修改消费者,containerFactory = RabbitConfig.PREFETCH_ONE
@Component public class HelloReceiver2 { private int count1 = 1; private int count2 = 1; @RabbitListener(queues = RabbitConfig.HELLO_QUEUE, containerFactory = RabbitConfig.PREFETCH_ONE) public void receive1(Message message) throws InterruptedException { Thread.sleep(200); System.out.println(" [ 消费者@1号 ] Received ==> '" + new String(message.getBody()) + "'"); System.out.println(" [ 消费者@1号 ] 处理消息数:" + count1++); } @RabbitListener(queues = RabbitConfig.HELLO_QUEUE, containerFactory = RabbitConfig.PREFETCH_ONE) public void receive2(Message message) throws InterruptedException { Thread.sleep(1000); System.out.println(" [ 消费者@2号 ] Received ==> '" + new String(message.getBody()) + "'"); System.out.println(" [ 消费者@2号 ] 处理消息数:" + count2++); } }
运行结果:
1号处理了8条消息,2号2条,工作效率提高了不少
启动prefetch功能(方式二,全局)
spring:
rabbitmq:
port: 5672
host: 127.0.0.1
username: guest
password: guest
listener:
simple:
prefetch: 1
@Component public class HelloReceiver2 { private int count1 = 1; private int count2 = 1; // @RabbitListener(queues = RabbitConfig.HELLO_QUEUE, containerFactory = RabbitConfig.PREFETCH_ONE) public void receive1(Message message) throws InterruptedException { Thread.sleep(200); System.out.println(" [ 消费者@1号 ] Received ==> '" + new String(message.getBody()) + "'"); System.out.println(" [ 消费者@1号 ] 处理消息数:" + count1++); } // @RabbitListener(queues = RabbitConfig.HELLO_QUEUE, containerFactory = RabbitConfig.PREFETCH_ONE) public void receive2(Message message) throws InterruptedException { Thread.sleep(1000); System.out.println(" [ 消费者@2号 ] Received ==> '" + new String(message.getBody()) + "'"); System.out.println(" [ 消费者@2号 ] 处理消息数:" + count2++); } @RabbitListener(queues = RabbitConfig.HELLO_QUEUE) public void receive3(Message message) throws InterruptedException { Thread.sleep(200); System.out.println(" [ 消费者@3号 ] Received ==> '" + new String(message.getBody()) + "'"); System.out.println(" [ 消费者@3号 ] 处理消息数:" + count1++); } @RabbitListener(queues = RabbitConfig.HELLO_QUEUE) public void receive4(Message message) throws InterruptedException { Thread.sleep(1000); System.out.println(" [ 消费者@4号 ] Received ==> '" + new String(message.getBody()) + "'"); System.out.println(" [ 消费者@4号 ] 处理消息数:" + count2++); } }
运行结果:
第二种方式是全局配置,应用于所有监听消费者
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。