当前位置:   article > 正文

SpringCloud - Spring Cloud 之 Stream构建消息驱动微服务框架;Spring Cloud Alibaba集成RocketMQ(二十四)_spring cloud alibaba stream

spring cloud alibaba stream

阅读本文前可先参考

SpringCloud - Spring Cloud 之 Stream构建消息驱动微服务框架;RabbitMQ(十九)_MinggeQingchun的博客-CSDN博客_springcloud stream

一、Spring Cloud Stream

在微服务的开发过程中,会经常用到消息中间件,通过消息中间件在服务与服务之间传递消息,不管使用哪款消息中间件,如RabbitMQ、Kafka和RocketMQ,那么消息中间件和服务之间都有耦合性(如原来使用RabbitMQ,要替换为RocketMQ,那么微服务都需要修改,变动会比较大),因为这两款消息中间件有一些区别,如果我们Spring Cloud Stream来整合我们的消息中间件,就可以降低微服务和消息中间件的耦合性,做到轻松在不同消息中间件间切换

Spring Cloud Stream就是负责整合我们的消息中间件,降低微服务和消息中间件的耦合性,做到轻松在不同消息中间件间切换

官网地址:

Spring Cloud Stream

Spring Cloud Stream是一个框架,用于构建与共享消息系统连接的高度可伸缩的事件驱动微服务。 

Spring Cloud Stream解决了开发人员无感知的使用消息中间件的问题,Spring Cloud Stream对消息中间件的进一步封装,可以做到代码层面对消息中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为rocketmq或者kafka),使得微服务开发的高度解耦。

注:

目前Spring Cloud Stream仅支持RabbitMQ、Kafka,Spring Cloud Alibaba新写了一个starter可以支持RocketMQ

Spring Cloud Stream架构 

Spring Cloud Stream 是一个构建消息驱动微服务的框架

应用程序通过Input(相当于消费者consumer)、Output(相当于生产者producer)来与Spring Cloud Stream中Binder交互,而Binder负责与消息中间件交互,因此,我们只需关注如何与Binder交互即可,而无需关注与具体消息中间件的交互

1、Binder

与外部消息中间件集成的组件,用来创建Binding,各消息中间件都有自己的 Binder 实现

Kafka 的实现 KafkaMessageChannelBinder

RabbitMQ 的实现 RabbitMessageChannelBinder

RocketMQ 的实现 RocketMQMessageChannelBinder

2、Binding

包括 Input Binding 和 Output Binding

Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触

3、Input

应用程序通过input(相当于消费者consumer)与Spring Cloud Stream中Binder交互,而Binder负责与消息中间件交互,因此,我们只需关注如何与Binder交互即可,而无需关注与具体消息中间件的交互

4、Output

output(相当于生产者producer)与Spring Cloud Stream中Binder交互

组成

说明

Binder

Binder是应用与消息中间件之间的封装,目前实现了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现

@Input

该注解标识输入通道,通过该输入通道接收消息进入应用程序

@Output

该注解标识输出通道,发布的消息将通过该通道离开应用程序

@StreamListener

监听队列,用于消费者的队列的消息接收

@EnableBinding

将信道channel和exchange、topic绑定在一起

二、Spring Cloud Stream应用(RocketMQ)

(一)Spring Cloud Stream应用(RocketMQ)

1、创建Springboot工程 springcloud-alibaba-4-stream-rocketmq

2、添加依赖 spring-cloud-starter-stream-rocketmq

  1. <!--spring-cloud-starter-stream-rocketmq-->
  2. <dependency>
  3. <groupId>com.alibaba.cloud</groupId>
  4. <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
  5. </dependency>
  1. <groupId>com.bjpowernode</groupId>
  2. <artifactId>31-rocketmq-spring-cloud-stream</artifactId>
  3. <version>1.0.0</version>
  4. <name>31-rocketmq-spring-cloud-stream</name>
  5. <description>31-rocketmq-spring-cloud-stream project for Spring Boot</description>
  6. <properties>
  7. <java.version>1.8</java.version>
  8. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  9. <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  10. <spring-boot.version>2.2.5.RELEASE</spring-boot.version>
  11. <spring-cloud-alibaba.version>2.2.1.RELEASE</spring-cloud-alibaba.version>
  12. </properties>
  13. <dependencies>
  14. <dependency>
  15. <groupId>org.springframework.boot</groupId>
  16. <artifactId>spring-boot-starter-web</artifactId>
  17. </dependency>
  18. <!--spring-cloud-starter-stream-rocketmq-->
  19. <dependency>
  20. <groupId>com.alibaba.cloud</groupId>
  21. <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
  22. </dependency>
  23. <dependency>
  24. <groupId>org.projectlombok</groupId>
  25. <artifactId>lombok</artifactId>
  26. </dependency>
  27. </dependencies>
  28. <dependencyManagement>
  29. <dependencies>
  30. <dependency>
  31. <groupId>org.springframework.boot</groupId>
  32. <artifactId>spring-boot-dependencies</artifactId>
  33. <version>${spring-boot.version}</version>
  34. <type>pom</type>
  35. <scope>import</scope>
  36. </dependency>
  37. <dependency>
  38. <groupId>com.alibaba.cloud</groupId>
  39. <artifactId>spring-cloud-alibaba-dependencies</artifactId>
  40. <version>${spring-cloud-alibaba.version}</version>
  41. <type>pom</type>
  42. <scope>import</scope>
  43. </dependency>
  44. </dependencies>
  45. </dependencyManagement>
  46. <build>
  47. <plugins>
  48. <plugin>
  49. <groupId>org.apache.maven.plugins</groupId>
  50. <artifactId>maven-compiler-plugin</artifactId>
  51. <version>3.8.1</version>
  52. <configuration>
  53. <source>1.8</source>
  54. <target>1.8</target>
  55. <encoding>UTF-8</encoding>
  56. </configuration>
  57. </plugin>
  58. <plugin>
  59. <groupId>org.springframework.boot</groupId>
  60. <artifactId>spring-boot-maven-plugin</artifactId>
  61. <version>2.3.0.RELEASE</version>
  62. <configuration>
  63. <mainClass>com.bjpowernode.Application</mainClass>
  64. </configuration>
  65. <executions>
  66. <execution>
  67. <id>repackage</id>
  68. <goals>
  69. <goal>repackage</goal>
  70. </goals>
  71. </execution>
  72. </executions>
  73. </plugin>
  74. </plugins>
  75. </build>

3、application.properties配置文件

  1. server.port=8081
  2. spring.application.name=springcloud-alibaba-4-stream-rocketmq
  3. # 日志级别
  4. logging.level.com.alibaba.cloud.stream.binder.rocketmq=INFO
  5. ########## RocketMQ 通用配置
  6. # 客户端接入点,必填,rocketmq的连接地址, binder高度抽象
  7. spring.cloud.stream.rocketmq.binder.name-server=192.168.133.128:9876
  8. ########## 生产者Producer Config
  9. # output 的配置如下: bindings 具体生产消息、消费消息的桥梁(消费者Consumer和生产者Producer的destination 目的地必须保持一致)
  10. spring.cloud.stream.bindings.output.destination=test-topic
  11. spring.cloud.stream.bindings.output.content-type=text/plain
  12. spring.cloud.stream.bindings.output.group=test-group
  13. ########## 消费者Consumer Config
  14. # input 的配置:(消费者Consumer和生产者Producer的destination 目的地必须保持一致)
  15. spring.cloud.stream.bindings.input.destination=test-topic
  16. spring.cloud.stream.bindings.input.content-type=text/plain
  17. spring.cloud.stream.bindings.input.group=test-group

4、消息发送(生产者)

  1. @Service
  2. public class SenderService {
  3. //spring cloud stream里面发消息通过 Source 发送
  4. @Autowired
  5. private Source source;
  6. //原来springboot里面通过 RocketMQTemplate 发送
  7. /**
  8. * 发送消息的方法
  9. *
  10. * @param msg
  11. * @throws Exception
  12. */
  13. public void send(String msg) throws Exception {
  14. // source.output() == MessageChannel 消息通道
  15. boolean flag = source.output().send(MessageBuilder.withPayload(msg).build());
  16. System.out.println("消息发送:" + flag);
  17. }
  18. }

5、消息接收(消费者)

  1. @Service
  2. public class ReceiveService {
  3. //spring cloud stream里面发消息通过 Sink 发送
  4. @Autowired
  5. private Sink sink;
  6. //原来springboot里面通过 RocketMQTemplate 发送
  7. /* 接收消息
  8. 第一种:通过手动调用receive()方法接收消息,while循环监听消息
  9. * */
  10. public void receive() {
  11. while(true){
  12. // SubscribableChannel = sink.input() 消息订阅的信道
  13. sink.input().subscribe((Message<?> message) -> {
  14. System.out.println("Sink接收到的消息是:" + message.getPayload());
  15. });
  16. }
  17. }
  18. /* 接收消息
  19. 第二种:通过@StreamListener监听消息,不需要调用receiveMessage(String message)方法
  20. * */
  21. @StreamListener(value = Sink.INPUT)
  22. public void receiveMessage(String message) {
  23. System.out.println("StreamListener接收到的消息是:" + message);
  24. }
  25. }

6、springboot启动程序类

  1. import com.company.consumer.ReceiveService;
  2. import com.company.producer.SenderService;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.boot.CommandLineRunner;
  5. import org.springframework.boot.SpringApplication;
  6. import org.springframework.boot.autoconfigure.SpringBootApplication;
  7. import org.springframework.cloud.stream.annotation.EnableBinding;
  8. import org.springframework.cloud.stream.messaging.Sink;
  9. import org.springframework.cloud.stream.messaging.Source;
  10. @EnableBinding(value = {Source.class, Sink.class}) //使其生效
  11. @SpringBootApplication
  12. //Spring boot的CommandLineRunner接口主要用于实现在应用初始化后,去执行一段代码块逻辑,这段初始化代码在整个应用生命周期内只会执行一次
  13. public class Stream4RocketmqApplication implements CommandLineRunner {
  14. @Autowired
  15. private SenderService senderService;
  16. @Autowired
  17. private ReceiveService receiveService;
  18. public static void main(String[] args) {
  19. SpringApplication.run(Stream4RocketmqApplication.class, args);
  20. }
  21. @Override
  22. public void run(String... args) throws Exception {
  23. senderService.send("Hello spring cloud stream rocketmq!");
  24. receiveService.receive();
  25. }
  26. }

(二)Spring Cloud Stream自定义信道

消息传递主要使用的是系统提供的 Source (output)、Sink(input);因此我们自定义Source和Sink接口即可

  1. server.port=8081
  2. spring.application.name=springcloud-alibaba-4-stream-rocketmq
  3. # 日志级别
  4. logging.level.com.alibaba.cloud.stream.binder.rocketmq=INFO
  5. ########## RocketMQ 通用配置
  6. # 客户端接入点,必填,rocketmq的连接地址, binder高度抽象
  7. spring.cloud.stream.rocketmq.binder.name-server=192.168.133.128:9876
  8. ########## 生产者Producer Config
  9. # output 的配置如下: bindings 具体生产消息、消费消息的桥梁(消费者Consumer和生产者Producer的destination 目的地必须保持一致)
  10. spring.cloud.stream.bindings.output.destination=test-topic
  11. spring.cloud.stream.bindings.output.content-type=text/plain
  12. spring.cloud.stream.bindings.output.group=test-group
  13. # output1 要对应到一个Source里面去
  14. spring.cloud.stream.bindings.output1.destination=test-topic1
  15. spring.cloud.stream.bindings.output1.content-type=text/plain
  16. spring.cloud.stream.bindings.output1.group=test-group1
  17. # output2 要对应到一个Source里面去
  18. spring.cloud.stream.bindings.output2.destination=test-topic2
  19. spring.cloud.stream.bindings.output2.content-type=text/plain
  20. spring.cloud.stream.bindings.output2.group=test-group2
  21. ########## 消费者Consumer Config
  22. # input 的配置:(消费者Consumer和生产者Producer的destination 目的地必须保持一致)
  23. spring.cloud.stream.bindings.input.destination=test-topic
  24. spring.cloud.stream.bindings.input.content-type=text/plain
  25. spring.cloud.stream.bindings.input.group=test-group
  26. spring.cloud.stream.bindings.input1.destination=test-topic1
  27. spring.cloud.stream.bindings.input1.content-type=text/plain
  28. spring.cloud.stream.bindings.input1.group=test-group1
  29. spring.cloud.stream.rocketmq.bindings.input1.consumer.tags=myTag
  30. spring.cloud.stream.bindings.input2.destination=test-topic2
  31. spring.cloud.stream.bindings.input2.content-type=text/plain
  32. spring.cloud.stream.bindings.input2.group=test-group2

消息发送(生产者)

  1. public interface MySource {
  2. /**
  3. * Name of the output channel.
  4. */
  5. String OUTPUT1 = "output1";
  6. /**
  7. * Name of the output channel.
  8. */
  9. String OUTPUT2 = "output2";
  10. /**
  11. * @return output channel
  12. */
  13. @Output(MySource.OUTPUT1)
  14. MessageChannel output1();
  15. @Output(MySource.OUTPUT2)
  16. MessageChannel output2();
  17. }

  1. import org.apache.rocketmq.common.message.MessageConst;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.cloud.stream.messaging.Source;
  4. import org.springframework.messaging.Message;
  5. import org.springframework.messaging.MessageHeaders;
  6. import org.springframework.messaging.support.MessageBuilder;
  7. import org.springframework.stereotype.Service;
  8. import org.apache.rocketmq.spring.core.RocketMQTemplate;
  9. import org.springframework.util.MimeTypeUtils;
  10. @Service
  11. public class SenderService {
  12. //spring cloud stream里面发消息通过 Source 发送
  13. @Autowired
  14. private Source source;
  15. //原来springboot里面通过 RocketMQTemplate 发送
  16. @Autowired
  17. private MySource mySource;
  18. @Autowired
  19. private RocketMQTemplate rocketMQTemplate;
  20. /**
  21. * 发送消息的方法
  22. *
  23. * @param msg
  24. * @throws Exception
  25. */
  26. public void send(String msg) throws Exception {
  27. // source.output() == MessageChannel 消息通道
  28. boolean flag = source.output().send(MessageBuilder.withPayload(msg).build());
  29. System.out.println("消息发送:" + flag);
  30. }
  31. /**
  32. * 发送消息的方法
  33. *
  34. * @param msg
  35. * @throws Exception
  36. */
  37. public void send(String msg) throws Exception {
  38. // source.output() == MessageChannel 消息通道
  39. boolean flag = source.output().send(MessageBuilder.withPayload(msg).build());
  40. System.out.println("消息发送:" + flag);
  41. }
  42. /**
  43. * 发送消息的方法
  44. *
  45. * @param msg
  46. * @throws Exception
  47. */
  48. public void send1(String msg) throws Exception {
  49. // source.output() == MessageChannel 消息通道
  50. boolean flag = mySource.output1().send(MessageBuilder.withPayload(msg).build());
  51. System.out.println("消息发送1:" + flag);
  52. }
  53. /**
  54. * 发送消息的方法,发到3个topic中
  55. *
  56. * @param msg
  57. * @throws Exception
  58. */
  59. public void multiSend(String msg) throws Exception {
  60. // source.output() == MessageChannel 消息通道
  61. boolean flag = source.output().send(MessageBuilder.withPayload(msg).build());
  62. System.out.println("消息发送:" + flag);
  63. // source.output() == MessageChannel 消息通道
  64. boolean flag1 = mySource.output1().send(MessageBuilder.withPayload(msg).build());
  65. System.out.println("消息发送1:" + flag1);
  66. // source.output() == MessageChannel 消息通道
  67. boolean flag2 = mySource.output2().send(MessageBuilder.withPayload(msg).build());
  68. System.out.println("消息发送2:" + flag2);
  69. }
  70. public <T> void sendObject(T msg, String tag) throws Exception {
  71. Message message = MessageBuilder.withPayload(msg)
  72. .setHeader(MessageConst.PROPERTY_TAGS, tag)
  73. .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
  74. .build();
  75. boolean flag2 = mySource.output1().send(message);
  76. System.out.println("对象消息发送2:" + flag2);
  77. }
  78. /**
  79. * 发送消息的方法
  80. *
  81. * @param msg
  82. * @throws Exception
  83. */
  84. public void sendTemplate(String msg) throws Exception {
  85. Message message = MessageBuilder.withPayload(msg)
  86. .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
  87. .build();
  88. rocketMQTemplate.send("test-topic1", message);
  89. System.out.println("发送完毕......");
  90. }
  91. }

消息接收(消费者)

  1. public interface MySink {
  2. /**
  3. * Input channel name.
  4. */
  5. String INPUT1 = "input1";
  6. /**
  7. * Input channel name.
  8. */
  9. String INPUT2 = "input2";
  10. /**
  11. * @return input channel.
  12. */
  13. @Input(MySink.INPUT1)
  14. SubscribableChannel input1();
  15. /**
  16. * @return input channel.
  17. */
  18. @Input(MySink.INPUT2)
  19. SubscribableChannel input2();
  20. }
  1. @Service
  2. public class ReceiveService {
  3. //spring cloud stream里面发消息通过 Sink 发送
  4. @Autowired
  5. private Sink sink;
  6. //原来springboot里面通过 RocketMQTemplate 发送
  7. @Autowired
  8. private MySink mySink;
  9. /* 接收消息
  10. 第一种:通过手动调用receive()方法接收消息,while循环监听消息
  11. * */
  12. public void receive() {
  13. while(true){
  14. // SubscribableChannel = sink.input() 消息订阅的信道
  15. sink.input().subscribe((Message<?> message) -> {
  16. System.out.println("Sink接收到的消息是:" + message.getPayload());
  17. });
  18. }
  19. }
  20. /* 接收消息
  21. 第二种:通过@StreamListener监听消息,不需要调用receiveMessage(String message)方法
  22. * */
  23. @StreamListener(value = Sink.INPUT)
  24. public void receiveMessage(String message) {
  25. System.out.println("StreamListener接收到的消息是:" + message);
  26. }
  27. public void receive1() {
  28. while (true){
  29. // SubscribableChannel = sink.input() 消息订阅的信道
  30. mySink.input1().subscribe((Message<?> message) -> {
  31. System.out.println("input 1---" + message.getPayload());
  32. });
  33. }
  34. }
  35. @StreamListener(value = MySink.INPUT1)
  36. public void receiveMessage1(String message) {
  37. System.out.println("接收到的消息是1:" + message);
  38. }
  39. @StreamListener(value = MySink.INPUT2)
  40. public void receiveMessage2(String message) {
  41. System.out.println("接收到的消息是2:" + message);
  42. }
  43. }

(三)Spring Cloud Stream事务消息

  1. #--------------------------事务消息-------------------------------------
  2. #生产的配置
  3. spring.cloud.stream.bindings.outputTX.destination=TransactionTopic
  4. spring.cloud.stream.bindings.outputTX.content-type=application/json
  5. spring.cloud.stream.rocketmq.bindings.outputTX.producer.group=myTxProducerGroup
  6. #是否为事务消息,默认为false表示不是事务消息,true表示是事务消息
  7. spring.cloud.stream.rocketmq.bindings.outputTX.producer.transactional=true
  8. #消费的配置:
  9. spring.cloud.stream.bindings.inputTX.destination=TransactionTopic
  10. spring.cloud.stream.bindings.inputTX.content-type=text/plain
  11. spring.cloud.stream.bindings.inputTX.group=transaction-group
  12. spring.cloud.stream.rocketmq.bindings.inputTX.consumer.broadcasting=false

消息发送(生产者)

  1. @Component
  2. public class Sender {
  3. @Autowired
  4. private MySource mySource;
  5. public <T> void sendTransactionalMsg(T msg, int num) throws Exception {
  6. MessageBuilder builder = MessageBuilder.withPayload(msg)
  7. .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
  8. .setHeader("test", String.valueOf(num));
  9. //.setHeader(RocketMQHeaders.TAGS, "binder");
  10. Message message = builder.build();
  11. mySource.outputTX().send(message);
  12. }
  13. }
  1. import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
  2. import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
  3. import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
  4. import org.springframework.messaging.Message;
  5. @RocketMQTransactionListener(txProducerGroup = "myTxProducerGroup", corePoolSize = 5, maximumPoolSize = 10)
  6. public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
  7. /**
  8. * 执行本地事务:也就是执行本地业务逻辑
  9. *
  10. * @param msg
  11. * @param arg
  12. * @return
  13. */
  14. @Override
  15. public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
  16. Object num = msg.getHeaders().get("test");
  17. if ("1".equals(num)) {
  18. System.out.println("executer: " + new String((byte[]) msg.getPayload()) + " unknown");
  19. return RocketMQLocalTransactionState.UNKNOWN;
  20. }
  21. else if ("2".equals(num)) {
  22. System.out.println("executer: " + new String((byte[]) msg.getPayload()) + " rollback");
  23. return RocketMQLocalTransactionState.ROLLBACK;
  24. }
  25. System.out.println("executer: " + new String((byte[]) msg.getPayload()) + " commit");
  26. return RocketMQLocalTransactionState.COMMIT;
  27. }
  28. /**
  29. * 回调检查
  30. *
  31. * @param msg
  32. * @return
  33. */
  34. @Override
  35. public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
  36. System.out.println("check: " + new String((byte[]) msg.getPayload()));
  37. return RocketMQLocalTransactionState.COMMIT;
  38. }
  39. }

消息接收(消费者)

  1. @StreamListener(value = MySink.INPUTTX)
  2. public void receiveTransactionMessage(String message) {
  3. System.out.println("接收到的 事务 消息是:" + message);
  4. }

三、Spring Cloud Stream RocketMQ配置选项

RocketMQ Binder Properties

spring.cloud.stream.rocketmq.binder.name-server

RocketMQ NameServer 地址(老版本使用 namesrv-addr 配置项)

Default: 127.0.0.1:9876.

spring.cloud.stream.rocketmq.binder.access-key

阿里云账号 AccessKey。

Default: null.

spring.cloud.stream.rocketmq.binder.secret-key

阿里云账号 SecretKey。

Default: null.

spring.cloud.stream.rocketmq.binder.enable-msg-trace

是否为 Producer 和 Consumer 开启消息轨迹功能

Default: true.

spring.cloud.stream.rocketmq.binder.customized-trace-topic

消息轨迹开启后存储的 topic 名称。

Default: RMQ_SYS_TRACE_TOPIC.

RocketMQ Consumer Properties

下面的这些配置是以 spring.cloud.stream.rocketmq.bindings.<channelName>.consumer. 为前缀的 RocketMQ Consumer 相关的配置。

enable

是否启用 Consumer

默认值: true.

tags

Consumer 基于 TAGS 订阅,多个 tag 以 || 分割

默认值: empty.

sql

Consumer 基于 SQL 订阅

默认值: empty.

broadcasting

Consumer 是否是广播消费模式。如果想让所有的订阅者都能接收到消息,可以使用广播模式

默认值: false.

orderly

Consumer 是否同步消费消息模式

默认值: false.

delayLevelWhenNextConsume

异步消费消息模式下消费失败重试策略:

-1,不重复,直接放入死信队列

0,broker 控制重试策略

>0,client 控制重试策略

默认值: 0.

suspendCurrentQueueTimeMillis

同步消费消息模式下消费失败后再次消费的时间间隔

默认值: 1000.

RocketMQ Provider Properties

下面的这些配置是以 spring.cloud.stream.rocketmq.bindings.<channelName>.producer. 为前缀的 RocketMQ Producer 相关的配置

enable

是否启用 Producer

默认值: true.

group

Producer group name

默认值: empty.

maxMessageSize

消息发送的最大字节数

默认值: 8249344.

transactional

是否发送事务消息

默认值: false.

sync

是否使用同步得方式发送消息

默认值: false.

vipChannelEnabled

是否在 Vip Channel 上发送消息

默认值: true.

sendMessageTimeout

发送消息的超时时间(毫秒)

默认值: 3000.

compressMessageBodyThreshold

消息体压缩阀值(当消息体超过 4k 的时候会被压缩)

默认值: 4096.

retryTimesWhenSendFailed

在同步发送消息的模式下,消息发送失败的重试次数

默认值: 2.

retryTimesWhenSendAsyncFailed

在异步发送消息的模式下,消息发送失败的重试次数

默认值: 2.

retryNextServer

消息发送失败的情况下是否重试其它的 broker

默认值: false

由此开发使用RocketMQ有两种选择

1、SpringBoot + RocketMQ整合实现消息传送;

2、使用Spring Cloud Stream对消息中间件的包装,来实现消息传送

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/571361
推荐阅读
相关标签
  

闽ICP备14008679号