当前位置:   article > 正文

RabbitMQ实践——Stream队列的使用方法_stream消息队列

stream消息队列


《RabbitMQ实践——搭建多人聊天服务》一文中,我们使用Stream队列存储了聊天室记录。但是每个进入聊天室的人不能读取历史消息,只能读取当前时间之后的消息。这是因为我们对读取逻辑做了特殊设置。本文我们将全面介绍Stream队列的使用。

什么是Stream队列

Stream队列保存了发布到其上所有未过期(时间或Size判断)的消息。消费者只可以读取该队列,但是不能让队列将已读消息删除。这样就可以保证相同配置的消费者可以读取到相同的消息。
鉴于它保留了未过期消息,所以非常适合需要读取历史消息的场景。
鉴于消费者不能让其删除已读消息,所以对于需要“扇出”大量相同消息的场景,可以使用一个Stream来替代Fanout交换器绑定多个相同消息队列的方案。这样即可以降低系统设计的复杂度,也会提升Rabbitmq服务效率。
在这里插入图片描述

创建Stream

下面代码会创建一个Stream。

action.queueDeclare(roomName, true, false, false,
      Collections.singletonMap("x-queue-type", "stream"));
  • 1
  • 2

需要注意的是:

  • durable(第二个参数)只能设置为true。
  • exclusive(第三个参数)只能设置为false
  • autoDelete(第四个参数)只能设置为false。
    下面完整代码,除了创建了Stream,还创建了交换器以及它们之间的绑定。
    private void createChatRoom(String roomName) {
        rabbitTemplate.execute(action -> {
            action.exchangeDeclare(roomName, "fanout", false, true, null);
            action.queueDeclare(roomName, true, false, false,
                Collections.singletonMap("x-queue-type", "stream"));
            action.queueBind(roomName, roomName, "");
            return null;
        });
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

发布消息

发布消息没什么特别,直接给交换器发送消息即可。

rabbitTemplate.send(roomName, "", msg);
  • 1

消费

由于Stream中保存了所有未超时的消息,所以存在一个起始读取位置的问题。
还有两个比较特殊的情况需要注意:

  • 不可以“自动应答”,即AutoACK只能是false。所以我们要对每条消息手工ack。
  • 必须指定Qos。因为消费者需要手工应答,所以需要设置一个配额,这样可以保证过慢的服务减少获取消息,从而让服务分发消息更加合理。

一般常见的模式如下:

从第一条消息开始读取

“x-stream-offset"设置为"first”,就是从第一条消息开始读取。

channel.basicQos(100);
channel.basicConsume(roomName, false, username,
      false, true,
          Collections.singletonMap("x-stream-offset", "first"),
          (consumerTag, message) -> {
              // Your code
              channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
          },
          consumerTag -> { }
      );
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

从最后一条消息开始读取

“x-stream-offset"设置为"last”,就是从第一条消息开始读取。

channel.basicQos(100);
channel.basicConsume(roomName, false, username,
      false, true,
          Collections.singletonMap("x-stream-offset", "last"),
          (consumerTag, message) -> {
              // Your code
              channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
          },
          consumerTag -> { }
      );
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

从某个时间戳开始读取

Date timestamp = new Date(System.currentTimeMillis() - 60 * 60 * 1_000)
channel.basicQos(100);
channel.basicConsume(roomName, false, username,
      false, true,
          Collections.singletonMap("x-stream-offset", "last"),
          (consumerTag, message) -> {
              // Your code
              channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
          },
          consumerTag -> { }
      );
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

从某个偏移量开始读取

channel.basicQos(100);
channel.basicConsume(roomName, false, username,
	false, true,
	    Collections.singletonMap("x-stream-offset", offset),
	    (consumerTag, message) -> {
	        emitter.next(new String(message.getBody()));
	        channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
	    },
	    consumerTag -> { }
	);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

样例

延续《RabbitMQ实践——搭建多人聊天服务》的案例,上面几个场景的读取代码如下:

    public Flux<String> getMessageFromFirst(String username, String roomName) {
        return Flux.create(emitter -> {
            rabbitTemplate.execute((ChannelCallback<Void>) channel -> {
                channel.basicQos(100);
                channel.basicConsume(roomName, false, username,
                false, true,
                    Collections.singletonMap("x-stream-offset", "first"),
                    (consumerTag, message) -> {
                        emitter.next(new String(message.getBody()));
                        channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
                    },
                    consumerTag -> { }
                );
                return null;
            });
        });
    }

    public Flux<String> getMessageFromLast(String username, String roomName) {
        return Flux.create(emitter -> {
            rabbitTemplate.execute((ChannelCallback<Void>) channel -> {
                channel.basicQos(100);
                channel.basicConsume(roomName, false, username,
                false, true,
                    Collections.singletonMap("x-stream-offset", "last"),
                    (consumerTag, message) -> {
                        emitter.next(new String(message.getBody()));
                        channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
                    },
                    consumerTag -> { }
                );
                return null;
            });
        });
    }

    public Flux<String> getMessageFromTimestamp(String username, String roomName, Date timestamp) {
        return Flux.create(emitter -> {
            rabbitTemplate.execute((ChannelCallback<Void>) channel -> {
                channel.basicQos(100);
                channel.basicConsume(roomName, false, username,
                false, true,
                    Collections.singletonMap("x-stream-offset", timestamp),
                    (consumerTag, message) -> {
                        emitter.next(new String(message.getBody()));
                        channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
                    },
                    consumerTag -> { }
                );
                return null;
            });
        });
    }

    public Flux<String> getMessageFromOffset(String username, String roomName, long offset) {
        return Flux.create(emitter -> {
            rabbitTemplate.execute((ChannelCallback<Void>) channel -> {
                channel.basicQos(100);
                channel.basicConsume(roomName, false, username,
                false, true,
                    Collections.singletonMap("x-stream-offset", offset),
                    (consumerTag, message) -> {
                        emitter.next(new String(message.getBody()));
                        channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
                    },
                    consumerTag -> { }
                );
                return null;
            });
        });
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71

长度控制

由于Stream并不会因为消费者而删除消息,导致其保存的消息数量一直在增加。所以需要通过一定的策略控制其大小。

长度控制

在创建Stream时,我们可以通过x-max-length-bytes设置其最大Size。这样如果Stream内容达到这个Size,最早的消息就会被Stream淘汰掉。

Map<String, Object> args = new HashMap<>();
args.put("x-max-length-bytes", maxSize);
args.put("x-queue-type", "stream");
action.queueDeclare(roomName, true, false, false, args);
  • 1
  • 2
  • 3
  • 4

时间控制

在创建Stream时,我们可以通过x-max-age设置消息的最长生命周期。超过这个时长的消息会被淘汰。它的取值可以是如下单位:Y, M, D, h, m, s。比如“1m”表示一分钟。

Map<String, Object> args = new HashMap<>();
args.put("x-max-age", ttl);
args.put("x-queue-type", "stream");
action.queueDeclare(roomName, true, false, false, args);
  • 1
  • 2
  • 3
  • 4

服务端筛选消息

如果消费者并不关系Stream中所有消息,它可以通过"x-stream-filter"来做过滤。这个过滤会发生在服务端,这样可以大大减轻消费者和服务端的压力。但是需要注意的是,服务端的过滤使用的是布隆过滤器,所以发送到消费者端的消息会包含不符合条件的消息,所以消费端需要做二次校验才可以使用。

发布方设定过滤值

channel.basicPublish(
  "", // default exchange
  "my-stream",
  new AMQP.BasicProperties.Builder()
    .headers(Collections.singletonMap(
      "x-stream-filter-value", "california" // set filter value
    ))
    .build(),
  body
);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

消费方设置服务端过滤,且要二次过滤

channel.basicQos(100); // QoS must be specified
channel.basicConsume(
  "my-stream",
  false,
  Collections.singletonMap("x-stream-filter", "california"), // set filter
  (consumerTag, message) -> {
    Map<String, Object> headers = message.getProperties().getHeaders();
    // there must be some client-side filter logic
    if ("california".equals(headers.get("x-stream-filter-value"))) {
      // message processing
      // ...
    }
    channel.basicAck(message.getEnvelope().getDeliveryTag(), false); // ack is required
  },
  consumerTag -> { });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

工程代码

https://github.com/f304646673/RabbitMQDemo/tree/main/chat

参考资料

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/黑客灵魂/article/detail/958598
推荐阅读
相关标签
  

闽ICP备14008679号