赞
踩
Stream队列保存了发布到其上所有未过期(时间或Size判断)的消息。消费者只可以读取该队列,但是不能让队列将已读消息删除。这样就可以保证相同配置的消费者可以读取到相同的消息。
鉴于它保留了未过期消息,所以非常适合需要读取历史消息的场景。
鉴于消费者不能让其删除已读消息,所以对于需要“扇出”大量相同消息的场景,可以使用一个Stream来替代Fanout交换器绑定多个相同消息队列的方案。这样即可以降低系统设计的复杂度,也会提升Rabbitmq服务效率。
下面代码会创建一个Stream。
action.queueDeclare(roomName, true, false, false,
Collections.singletonMap("x-queue-type", "stream"));
需要注意的是:
private void createChatRoom(String roomName) {
rabbitTemplate.execute(action -> {
action.exchangeDeclare(roomName, "fanout", false, true, null);
action.queueDeclare(roomName, true, false, false,
Collections.singletonMap("x-queue-type", "stream"));
action.queueBind(roomName, roomName, "");
return null;
});
}
发布消息没什么特别,直接给交换器发送消息即可。
rabbitTemplate.send(roomName, "", msg);
由于Stream中保存了所有未超时的消息,所以存在一个起始读取位置的问题。
还有两个比较特殊的情况需要注意:
一般常见的模式如下:
“x-stream-offset"设置为"first”,就是从第一条消息开始读取。
channel.basicQos(100);
channel.basicConsume(roomName, false, username,
false, true,
Collections.singletonMap("x-stream-offset", "first"),
(consumerTag, message) -> {
// Your code
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
},
consumerTag -> { }
);
“x-stream-offset"设置为"last”,就是从第一条消息开始读取。
channel.basicQos(100);
channel.basicConsume(roomName, false, username,
false, true,
Collections.singletonMap("x-stream-offset", "last"),
(consumerTag, message) -> {
// Your code
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
},
consumerTag -> { }
);
Date timestamp = new Date(System.currentTimeMillis() - 60 * 60 * 1_000)
channel.basicQos(100);
channel.basicConsume(roomName, false, username,
false, true,
Collections.singletonMap("x-stream-offset", "last"),
(consumerTag, message) -> {
// Your code
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
},
consumerTag -> { }
);
channel.basicQos(100);
channel.basicConsume(roomName, false, username,
false, true,
Collections.singletonMap("x-stream-offset", offset),
(consumerTag, message) -> {
emitter.next(new String(message.getBody()));
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
},
consumerTag -> { }
);
延续《RabbitMQ实践——搭建多人聊天服务》的案例,上面几个场景的读取代码如下:
public Flux<String> getMessageFromFirst(String username, String roomName) { return Flux.create(emitter -> { rabbitTemplate.execute((ChannelCallback<Void>) channel -> { channel.basicQos(100); channel.basicConsume(roomName, false, username, false, true, Collections.singletonMap("x-stream-offset", "first"), (consumerTag, message) -> { emitter.next(new String(message.getBody())); channel.basicAck(message.getEnvelope().getDeliveryTag(), false); }, consumerTag -> { } ); return null; }); }); } public Flux<String> getMessageFromLast(String username, String roomName) { return Flux.create(emitter -> { rabbitTemplate.execute((ChannelCallback<Void>) channel -> { channel.basicQos(100); channel.basicConsume(roomName, false, username, false, true, Collections.singletonMap("x-stream-offset", "last"), (consumerTag, message) -> { emitter.next(new String(message.getBody())); channel.basicAck(message.getEnvelope().getDeliveryTag(), false); }, consumerTag -> { } ); return null; }); }); } public Flux<String> getMessageFromTimestamp(String username, String roomName, Date timestamp) { return Flux.create(emitter -> { rabbitTemplate.execute((ChannelCallback<Void>) channel -> { channel.basicQos(100); channel.basicConsume(roomName, false, username, false, true, Collections.singletonMap("x-stream-offset", timestamp), (consumerTag, message) -> { emitter.next(new String(message.getBody())); channel.basicAck(message.getEnvelope().getDeliveryTag(), false); }, consumerTag -> { } ); return null; }); }); } public Flux<String> getMessageFromOffset(String username, String roomName, long offset) { return Flux.create(emitter -> { rabbitTemplate.execute((ChannelCallback<Void>) channel -> { channel.basicQos(100); channel.basicConsume(roomName, false, username, false, true, Collections.singletonMap("x-stream-offset", offset), (consumerTag, message) -> { emitter.next(new String(message.getBody())); channel.basicAck(message.getEnvelope().getDeliveryTag(), false); }, consumerTag -> { } ); return null; }); }); }
由于Stream并不会因为消费者而删除消息,导致其保存的消息数量一直在增加。所以需要通过一定的策略控制其大小。
在创建Stream时,我们可以通过x-max-length-bytes设置其最大Size。这样如果Stream内容达到这个Size,最早的消息就会被Stream淘汰掉。
Map<String, Object> args = new HashMap<>();
args.put("x-max-length-bytes", maxSize);
args.put("x-queue-type", "stream");
action.queueDeclare(roomName, true, false, false, args);
在创建Stream时,我们可以通过x-max-age设置消息的最长生命周期。超过这个时长的消息会被淘汰。它的取值可以是如下单位:Y, M, D, h, m, s。比如“1m”表示一分钟。
Map<String, Object> args = new HashMap<>();
args.put("x-max-age", ttl);
args.put("x-queue-type", "stream");
action.queueDeclare(roomName, true, false, false, args);
如果消费者并不关系Stream中所有消息,它可以通过"x-stream-filter"来做过滤。这个过滤会发生在服务端,这样可以大大减轻消费者和服务端的压力。但是需要注意的是,服务端的过滤使用的是布隆过滤器,所以发送到消费者端的消息会包含不符合条件的消息,所以消费端需要做二次校验才可以使用。
channel.basicPublish(
"", // default exchange
"my-stream",
new AMQP.BasicProperties.Builder()
.headers(Collections.singletonMap(
"x-stream-filter-value", "california" // set filter value
))
.build(),
body
);
channel.basicQos(100); // QoS must be specified
channel.basicConsume(
"my-stream",
false,
Collections.singletonMap("x-stream-filter", "california"), // set filter
(consumerTag, message) -> {
Map<String, Object> headers = message.getProperties().getHeaders();
// there must be some client-side filter logic
if ("california".equals(headers.get("x-stream-filter-value"))) {
// message processing
// ...
}
channel.basicAck(message.getEnvelope().getDeliveryTag(), false); // ack is required
},
consumerTag -> { });
https://github.com/f304646673/RabbitMQDemo/tree/main/chat
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。