当前位置:   article > 正文

RabbitMQ Stream插件使用详解

RabbitMQ Stream插件使用详解

2.4版为RabbitMQ流插件引入了对RabbitMQStream插件Java客户端的初始支持。

  • RabbitStreamTemplate
  • StreamListener容器

将spring rabbit流依赖项添加到项目中:

  1. <dependency>
  2. <groupId>org.springframework.amqp</groupId>
  3. <artifactId>spring-rabbit-stream</artifactId>
  4. <version>3.1.4</version>
  5. </dependency>

您可以使用RabbitAdmin bean,使用QueueBuilder.stream()方法指定队列类型,正常地配置队列。例如:

  1. @Bean
  2. Queue stream() {
  3. return QueueBuilder.durable("stream.queue1")
  4. .stream()
  5. .build();
  6. }

然而,这仅在您还使用non-stream 组件(如SimpleMessageListenerContainer或DirectMessageListeneerContainer)时才有效,因为在打开AMQP连接时会触发管理员来声明定义的bean。如果您的应用程序仅使用流组件,或者您希望使用高级流配置功能,则应改为配置StreamAdmin:

  1. @Bean
  2. StreamAdmin streamAdmin(Environment env) {
  3. return new StreamAdmin(env, sc -> {
  4. sc.stream("stream.queue1").maxAge(Duration.ofHours(2)).create();
  5. sc.stream("stream.queue2").create();
  6. });
  7. }

一、Sending Messages

RabbitStreamTemplate提供RabbitTemplate(AMQP)功能的子集。

  1. public interface RabbitStreamOperations extends AutoCloseable {
  2. CompletableFuture<Boolean> send(Message message);
  3. CompletableFuture<Boolean> convertAndSend(Object message);
  4. CompletableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor mpp);
  5. CompletableFuture<Boolean> send(com.rabbitmq.stream.Message message);
  6. MessageBuilder messageBuilder();
  7. MessageConverter messageConverter();
  8. StreamMessageConverter streamMessageConverter();
  9. @Override
  10. void close() throws AmqpException;
  11. }

RabbitStreamTemplate实现具有以下构造函数和属性:

  1. public RabbitStreamTemplate(Environment environment, String streamName) {
  2. }
  3. public void setMessageConverter(MessageConverter messageConverter) {
  4. }
  5. public void setStreamConverter(StreamMessageConverter streamConverter) {
  6. }
  7. public synchronized void setProducerCustomizer(ProducerCustomizer producerCustomizer) {
  8. }

MessageConverter在convertAndSend方法中用于将对象转换为Spring AMQP消息。

StreamMessageConverter用于将Spring AMQP消息转换为本机流消息。

您也可以直接发送本机流消息;使用messageBuilder()方法提供对生产者的消息生成器的访问。

ProducerCustomizer提供了一种机制,用于在生成生产者之前对其进行自定义。

 二、Receiving Messages

异步消息接收由StreamListenerContainer(以及使用@RabbitListener时的StreamRabbitListerContainerFactory)提供。

侦听器容器需要一个Environment以及一个流名称。

您可以使用经典的MessageListener接收Spring AMQP消息,也可以使用新接口接收本地流消息:

  1. public interface StreamMessageListener extends MessageListener {
  2. void onStreamMessage(Message message, Context context);
  3. }

有关支持的属性的信息,请参阅消息侦听器容器配置。

与模板类似,容器具有ConsumerCustomizer属性。

有关自定义环境和使用者的信息,请参阅Java客户端文档。

使用@RabbitListener时,配置StreamRabbitListerContainerFactory;此时,大多数@RabbitListener属性(并发等)将被忽略。仅支持id、队列、autoStartup和containerFactory。此外,队列只能包含一个流名称。

三、Examples

  1. @Bean
  2. RabbitStreamTemplate streamTemplate(Environment env) {
  3. RabbitStreamTemplate template = new RabbitStreamTemplate(env, "test.stream.queue1");
  4. template.setProducerCustomizer((name, builder) -> builder.name("test"));
  5. return template;
  6. }
  7. @Bean
  8. RabbitListenerContainerFactory<StreamListenerContainer> rabbitListenerContainerFactory(Environment env) {
  9. return new StreamRabbitListenerContainerFactory(env);
  10. }
  11. @RabbitListener(queues = "test.stream.queue1")
  12. void listen(String in) {
  13. ...
  14. }
  15. @Bean
  16. RabbitListenerContainerFactory<StreamListenerContainer> nativeFactory(Environment env) {
  17. StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(env);
  18. factory.setNativeListener(true);
  19. factory.setConsumerCustomizer((id, builder) -> {
  20. builder.name("myConsumer")
  21. .offset(OffsetSpecification.first())
  22. .manualTrackingStrategy();
  23. });
  24. return factory;
  25. }
  26. @RabbitListener(id = "test", queues = "test.stream.queue2", containerFactory = "nativeFactory")
  27. void nativeMsg(Message in, Context context) {
  28. ...
  29. context.storeOffset();
  30. }
  31. @Bean
  32. Queue stream() {
  33. return QueueBuilder.durable("test.stream.queue1")
  34. .stream()
  35. .build();
  36. }
  37. @Bean
  38. Queue stream() {
  39. return QueueBuilder.durable("test.stream.queue2")
  40. .stream()
  41. .build();
  42. }

2.4.5版将adviceChain属性添加到StreamListenerContainer(及其工厂)。还提供了一个新的工厂bean来创建一个无状态重试拦截器,该拦截器带有一个可选的StreamMessageRecoverer,用于在使用原始流消息时使用。

  1. @Bean
  2. public StreamRetryOperationsInterceptorFactoryBean sfb(RetryTemplate retryTemplate) {
  3. StreamRetryOperationsInterceptorFactoryBean rfb =
  4. new StreamRetryOperationsInterceptorFactoryBean();
  5. rfb.setRetryOperations(retryTemplate);
  6. rfb.setStreamMessageRecoverer((msg, context, throwable) -> {
  7. ...
  8. });
  9. return rfb;
  10. }

四、Super Streams

超级流是分区流的抽象概念,通过将多个流队列绑定到具有参数x-Super-Stream:true的交换来实现。

1、调配

为了方便起见,可以通过定义类型为SuperStream的单个bean来提供超级流。

  1. @Bean
  2. SuperStream superStream() {
  3. return new SuperStream("my.super.stream", 3);
  4. }

RabbitAdmin检测到这个bean,并将声明交换(my.super.stream)和3个队列(分区)-my.super-stream-n,其中n是0,1,2,绑定的路由密钥等于n。

如果您还希望通过AMQP向exchange 发布,您可以提供自定义路由密钥:

  1. @Bean
  2. SuperStream superStream() {
  3. return new SuperStream("my.super.stream", 3, (q, i) -> IntStream.range(0, i)
  4. .mapToObj(j -> "rk-" + j)
  5. .collect(Collectors.toList()));
  6. }

key 的数量必须等于分区的数量。

2、向超级流生产消息

你必须向 RabbitStreamTemplate 添加一个 superStreamRoutingFunction

  1. @Bean
  2. RabbitStreamTemplate streamTemplate(Environment env) {
  3. RabbitStreamTemplate template = new RabbitStreamTemplate(env, "stream.queue1");
  4. template.setSuperStreamRouting(message -> {
  5. // some logic to return a String for the client's hashing algorithm
  6. });
  7. return template;
  8. }

你也可以通过AMQP发布,使用 RabbitTemplate

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/438556
推荐阅读
相关标签
  

闽ICP备14008679号