当前位置:   article > 正文

Springboot整合RocketMQ ——使用 rocketmq-spring-boot-starter 来配置发送和消费 RocketMQ 消息

rocketmq-spring-boot-starter

       

本文解析将 RocketMQ Client 端集成为 spring-boot-starter 框架的开发细节,然后通过一个简单的示例来一步一步的讲解如何使用这个 spring-boot-starter 工具包来配置,发送和消费 RocketMQ 消息。

一、使用方法

添加maven依赖

  1. <!--在pom.xml中添加依赖-->
  2. <dependency>
  3.     <groupId>org.apache.rocketmq</groupId>
  4.     <artifactId>rocketmq-spring-boot-starter</artifactId>
  5.     <version>${RELEASE.VERSION}</version>
  6. </dependency>

二、发送消息

修改application.properties

  1. ## application.properties
  2. rocketmq.name-server=127.0.0.1:9876
  3. rocketmq.producer.group=my-group

注意:

请将上述示例配置中的127.0.0.1:9876替换成真实RocketMQ的NameServer地址与端口

1、编写代码

  1. @SpringBootApplication
  2. public class ProducerApplication implements CommandLineRunner{
  3. @Resource
  4. private RocketMQTemplate rocketMQTemplate;
  5. public static void main(String[] args){
  6. SpringApplication.run(ProducerApplication.class, args);
  7. }
  8. public void run(String... args) throws Exception {
  9. //send message synchronously
  10. rocketMQTemplate.convertAndSend("test-topic-1", "Hello, World!");
  11. //send spring message
  12. rocketMQTemplate.send("test-topic-1", MessageBuilder.withPayload("Hello, World! I'm from spring message").build());
  13. //send messgae asynchronously
  14. rocketMQTemplate.asyncSend("test-topic-2", new OrderPaidEvent("T_001", new BigDecimal("88.00")), new SendCallback() {
  15. @Override
  16. public void onSuccess(SendResult var1) {
  17. System.out.printf("async onSucess SendResult=%s %n", var1);
  18. }
  19. @Override
  20. public void onException(Throwable var1) {
  21. System.out.printf("async onException Throwable=%s %n", var1);
  22. }
  23. });
  24. //Send messages orderly
  25. rocketMQTemplate.syncSendOrderly("orderly_topic",MessageBuilder.withPayload("Hello, World").build(),"hashkey")
  26. //rocketMQTemplate.destroy(); // notes: once rocketMQTemplate be destroyed, you can not send any message again with this rocketMQTemplate
  27. }
  28. @Data
  29. @AllArgsConstructor
  30. public class OrderPaidEvent implements Serializable{
  31. private String orderId;
  32. private BigDecimal paidMoney;
  33. }
  34. }

三、接收消息

1、Push模式

修改application.properties

  1. ## application.properties
  2. rocketmq.name-server=127.0.0.1:9876

注意:

请将上述示例配置中的127.0.0.1:9876替换成真实RocketMQ的NameServer地址与端口

编写代码

  1. @SpringBootApplication
  2. public class ConsumerApplication{
  3. public static void main(String[] args){
  4. SpringApplication.run(ConsumerApplication.class, args);
  5. }
  6. @Slf4j
  7. @Service
  8. @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
  9. public class MyConsumer1 implements RocketMQListener<String>{
  10. public void onMessage(String message) {
  11. log.info("received message: {}", message);
  12. }
  13. }
  14. @Slf4j
  15. @Service
  16. @RocketMQMessageListener(topic = "test-topic-2", consumerGroup = "my-consumer_test-topic-2")
  17. public class MyConsumer2 implements RocketMQListener<OrderPaidEvent>{
  18. public void onMessage(OrderPaidEvent orderPaidEvent) {
  19. log.info("received orderPaidEvent: {}", orderPaidEvent);
  20. }
  21. }
  22. }

2、Pull模式

RocketMQ Spring 2.2.0开始,RocketMQ Srping支持Pull模式消费

修改application.properties

  1. ## application.properties
  2. rocketmq.name-server=127.0.0.1:9876
  3. # When set rocketmq.pull-consumer.group and rocketmq.pull-consumer.topic, rocketmqTemplate will start lite pull consumer
  4. # If you do not want to use lite pull consumer, please do not set rocketmq.pull-consumer.group and rocketmq.pull-consumer.topic
  5. rocketmq.pull-consumer.group=my-group1
  6. rocketmq.pull-consumer.topic=test

注意之前lite pull consumer的生效配置为rocketmq.consumer.group和rocketmq.consumer.topic,但由于非常容易与push-consumer混淆,因此在2.2.3版本之后修改为rocketmq.pull-consumer.group和rocketmq.pull-consumer.topic.

编写代码

  1. @SpringBootApplication
  2. public class ConsumerApplication implements CommandLineRunner {
  3. @Resource
  4. private RocketMQTemplate rocketMQTemplate;
  5. @Resource(name = "extRocketMQTemplate")
  6. private RocketMQTemplate extRocketMQTemplate;
  7. public static void main(String[] args) {
  8. SpringApplication.run(ConsumerApplication.class, args);
  9. }
  10. @Override
  11. public void run(String... args) throws Exception {
  12. //This is an example of pull consumer using rocketMQTemplate.
  13. List<String> messages = rocketMQTemplate.receive(String.class);
  14. System.out.printf("receive from rocketMQTemplate, messages=%s %n", messages);
  15. //This is an example of pull consumer using extRocketMQTemplate.
  16. messages = extRocketMQTemplate.receive(String.class);
  17. System.out.printf("receive from extRocketMQTemplate, messages=%s %n", messages);
  18. }
  19. }

四、事务消息

修改application.properties

  1. ## application.properties
  2. rocketmq.name-server=127.0.0.1:9876
  3. rocketmq.producer.group=my-group

注意:

请将上述示例配置中的127.0.0.1:9876替换成真实RocketMQ的NameServer地址与端口

编写代码

  1. @SpringBootApplication
  2. public class ProducerApplication implements CommandLineRunner{
  3. @Resource
  4. private RocketMQTemplate rocketMQTemplate;
  5. public static void main(String[] args){
  6. SpringApplication.run(ProducerApplication.class, args);
  7. }
  8. public void run(String... args) throws Exception {
  9. try {
  10. // Build a SpringMessage for sending in transaction
  11. Message msg = MessageBuilder.withPayload(..)...;
  12. // In sendMessageInTransaction(), the first parameter transaction name ("test")
  13. // must be same with the @RocketMQTransactionListener's member field 'transName'
  14. rocketMQTemplate.sendMessageInTransaction("test-topic", msg, null);
  15. } catch (MQClientException e) {
  16. e.printStackTrace(System.out);
  17. }
  18. }
  19. // Define transaction listener with the annotation @RocketMQTransactionListener
  20. @RocketMQTransactionListener
  21. class TransactionListenerImpl implements RocketMQLocalTransactionListener {
  22. @Override
  23. public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
  24. // ... local transaction process, return bollback, commit or unknown
  25. return RocketMQLocalTransactionState.UNKNOWN;
  26. }
  27. @Override
  28. public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
  29. // ... check transaction status and return bollback, commit or unknown
  30. return RocketMQLocalTransactionState.COMMIT;
  31. }
  32. }
  33. }

五、消息轨迹

Producer 端要想使用消息轨迹,需要多配置两个配置项:

  1. ## application.properties
  2. rocketmq.name-server=127.0.0.1:9876
  3. rocketmq.producer.group=my-group
  4. rocketmq.producer.enable-msg-trace=true
  5. rocketmq.producer.customized-trace-topic=my-trace-topic

Consumer 端消息轨迹的功能需要在 @RocketMQMessageListener 中进行配置对应的属性:

  1. @Service
  2. @RocketMQMessageListener(
  3. topic = "test-topic-1",
  4. consumerGroup = "my-consumer_test-topic-1",
  5. enableMsgTrace = true,
  6. customizedTraceTopic = "my-trace-topic"
  7. )
  8. public class MyConsumer implements RocketMQListener<String> {
  9. ...
  10. }

注意:

默认情况下 Producer 和 Consumer 的消息轨迹功能是开启的且 trace-topic 为 RMQ_SYS_TRACE_TOPIC Consumer 端的消息轨迹 trace-topic 可以在配置文件中配置 rocketmq.consumer.customized-trace-topic 配置项,不需要为在每个 @RocketMQMessageListener 配置。

若需使用阿里云消息轨迹,则需要在@RocketMQMessageListener中将accessChannel配置为CLOUD

五、ACL功能

Producer 端要想使用 ACL 功能,需要多配置两个配置项:

  1. ## application.properties
  2. rocketmq.name-server=127.0.0.1:9876
  3. rocketmq.producer.group=my-group
  4. rocketmq.producer.access-key=AK
  5. rocketmq.producer.secret-key=SK

Consumer 端 ACL 功能需要在 @RocketMQMessageListener 中进行配置:

  1. @Service
  2. @RocketMQMessageListener(
  3. topic = "test-topic-1",
  4. consumerGroup = "my-consumer_test-topic-1",
  5. accessKey = "AK",
  6. secretKey = "SK"
  7. )
  8. public class MyConsumer implements RocketMQListener<String> {
  9. ...
  10. }

注意:

可以不用为每个 @RocketMQMessageListener 注解配置 AK/SK,在配置文件中配置 rocketmq.consumer.access-key 和 rocketmq.consumer.secret-key 配置项,这两个配置项的值就是默认值

六、请求 应答语义支持

RocketMQ-Spring 提供 请求/应答 语义支持。

  • Producer端

发送Request消息使用SendAndReceive方法

注意

同步发送需要在方法的参数中指明返回值类型

异步发送需要在回调的接口中指明返回值类型

  1. @SpringBootApplication
  2. public class ProducerApplication implements CommandLineRunner{
  3. @Resource
  4. private RocketMQTemplate rocketMQTemplate;
  5. public static void main(String[] args){
  6. SpringApplication.run(ProducerApplication.class, args);
  7. }
  8. public void run(String... args) throws Exception {
  9. // 同步发送request并且等待String类型的返回值
  10. String replyString = rocketMQTemplate.sendAndReceive("stringRequestTopic", "request string", String.class);
  11. System.out.printf("send %s and receive %s %n", "request string", replyString);
  12. // 异步发送request并且等待User类型的返回值
  13. rocketMQTemplate.sendAndReceive("objectRequestTopic", new User("requestUserName",(byte) 9), new RocketMQLocalRequestCallback<User>() {
  14. @Override public void onSuccess(User message) {
  15. System.out.printf("send user object and receive %s %n", message.toString());
  16. }
  17. @Override public void onException(Throwable e) {
  18. e.printStackTrace();
  19. }
  20. }, 5000);
  21. }
  22. @Data
  23. @AllArgsConstructor
  24. public class User implements Serializable{
  25. private String userName;
  26. private Byte userAge;
  27. }
  28. }
  • Consumer端

需要实现RocketMQReplyListener<T, R> 接口,其中T表示接收值的类型,R表示返回值的类型。

  1. @SpringBootApplication
  2. public class ConsumerApplication{
  3. public static void main(String[] args){
  4. SpringApplication.run(ConsumerApplication.class, args);
  5. }
  6. @Service
  7. @RocketMQMessageListener(topic = "stringRequestTopic", consumerGroup = "stringRequestConsumer")
  8. public class StringConsumerWithReplyString implements RocketMQReplyListener<String, String> {
  9. @Override
  10. public String onMessage(String message) {
  11. System.out.printf("------- StringConsumerWithReplyString received: %s \n", message);
  12. return "reply string";
  13. }
  14. }
  15. @Service
  16. @RocketMQMessageListener(topic = "objectRequestTopic", consumerGroup = "objectRequestConsumer")
  17. public class ObjectConsumerWithReplyUser implements RocketMQReplyListener<User, User>{
  18. public User onMessage(User user) {
  19. System.out.printf("------- ObjectConsumerWithReplyUser received: %s \n", user);
  20. User replyUser = new User("replyUserName",(byte) 10);
  21. return replyUser;
  22. }
  23. }
  24. @Data
  25. @AllArgsConstructor
  26. public class User implements Serializable{
  27. private String userName;
  28. private Byte userAge;
  29. }
  30. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/565945
推荐阅读
相关标签
  

闽ICP备14008679号