赞
踩
在之前的案例中,我们在管理后台收发消息都是通过短连接的形式。本文我们将探索对队列中消息的实时读取,并通过流式数据返回给客户端。
webflux是反应式Web框架,客户端可以通过一个长连接和服务端相连,后续服务端可以通过该连接持续给客户端发送消息。可以达到:发送一次,多次接收的效果。
由于我们要使用Rabbitmq,所以要新增如下依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-stream</artifactId>
</dependency>
webflux的依赖如下:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.6.7</version>
</dependency>
下面代码会返回一个监听队列的Container
private SimpleMessageListenerContainer getListener(String queueName, MessageListener messageListener) { lock.lock(); try { SimpleMessageListenerContainer listener = listeners.get(queueName); if (listener == null && messageListener != null) { listener = new SimpleMessageListenerContainer(); listener.setConnectionFactory(connectionFactory); listener.setQueueNames(queueName); listener.setMessageListener(messageListener); listeners.put(queueName, listener); } return listener; } finally { lock.unlock(); } }
一旦消费者读取到消息,onMessage方法会被调用。然后Flux的消费者会将消息投递到流上。
public Flux<String> listen(String queueName) {
return Flux.create(emitter -> {
SimpleMessageListenerContainer container = getListener(queueName, new MessageListener() {
@Override
public void onMessage(Message message) {
String msg = new String(message.getBody());
System.out.println("listen function Received message: " + msg);
emitter.next(msg);
}
});
container.start();
});
}
由于OpenApi不能支持实时展现流式数据,所以我们采用Postman来测试。
发送请求后,该页面一直处于滚动状态。
在管理后台发送一条消息
可以看到Postman收到了该消息
然后在发一条,Postman又会收到一条
这样我们就完成了“请求一次,多次返回”的效果。
需要注意的是,返回的格式需要标记为produces = “text/event-stream”。
// controller package com.rabbitmq.consumer.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; import com.rabbitmq.consumer.service.ConsumerService; import reactor.core.publisher.Flux; @RestController @RequestMapping("/consumer") public class ConsumerController { @Autowired private ConsumerService comsumerService; @GetMapping(value = "/listen", produces = "text/event-stream") public Flux<String> listen(@RequestParam String queueName) { return comsumerService.listen(queueName); } }
// service package com.rabbitmq.consumer.service; import java.util.Map; import java.util.concurrent.locks.ReentrantLock; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import jakarta.annotation.PostConstruct; import reactor.core.publisher.Flux; @Service public class ConsumerService { @Autowired private RabbitTemplate rabbitTemplate; private ConnectionFactory connectionFactory; private final ReentrantLock lock = new ReentrantLock(); private Map<String, SimpleMessageListenerContainer> listeners = new java.util.HashMap<>(); @PostConstruct public void init() { connectionFactory = rabbitTemplate.getConnectionFactory(); } public Flux<String> listen(String queueName) { return Flux.create(emitter -> { SimpleMessageListenerContainer container = getListener(queueName, new MessageListener() { @Override public void onMessage(Message message) { String msg = new String(message.getBody()); System.out.println("listen function Received message: " + msg); emitter.next(msg); } }); container.start(); }); } private SimpleMessageListenerContainer getListener(String queueName, MessageListener messageListener) { lock.lock(); try { SimpleMessageListenerContainer listener = listeners.get(queueName); if (listener == null && messageListener != null) { listener = new SimpleMessageListenerContainer(); listener.setConnectionFactory(connectionFactory); listener.setQueueNames(queueName); listener.setMessageListener(messageListener); listeners.put(queueName, listener); } return listener; } finally { lock.unlock(); } } }
https://github.com/f304646673/RabbitMQDemo/tree/main/consumer
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。