当前位置:   article > 正文

Spring Boot 集成 RocketMQ_springboot rocketmq

springboot rocketmq


官网地址https://github.com/apache/rocketmq-spring

特性

  • 同步发送消息
  • 异步发送消息
  • 以单向模式发送消息
  • 发送有序消息
  • 发送批量消息
  • 发送交易消息
  • 发送具有延迟级别的预定消息
  • 以并发模式(广播/集群)消费消息
  • 消费有序消息
  • 使用标记或 sql92 表达式过滤消息
  • 支持消息追踪
  • 支持认证和授权
  • 支持请求-回复消息交换模式
  • 使用推/拉模式消费消息

集成

直接使用官方的rocketmq-spring-boot-starter

地址:https://search.maven.org/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter

pom.xml

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.1.1</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

2.1.1版本,内置的rocketmq-client版本为4.7.1应该与你rocketmq服务保持一致。

生产者

配置文件如下:

rocketmq:
  # rocketMQ的命名服务器,格式为: host:port;host:port
  name-server: localhost:9876
  producer:
    # 生产者的组名
    group: my-group1
    # 发送消息超时时间 默认3秒
    send-message-timeout: 3000
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

注意:

请将上述示例配置中的127.0.0.1:9876替换成真实RocketMQ的NameServer地址与端口

引入RocketMQTemplate

@Resource
private RocketMQTemplate rocketMQTemplate;
  • 1
  • 2

同步发送消息

可靠的同步传输应用于广泛的场景,如重要通知消息、短信通知、短信营销系统等。

// 发送字符串
rocketMQTemplate.syncSend("springTopic", "Hello, World!");
// 同步发送
rocketMQTemplate.convertAndSend("test-topic-1", "Hello, World!");
// 发送对象
rocketMQTemplate.syncSend("userTopic", new User().setUserAge((byte) 18).setUserName("Kitty"));
// 发送spring 消息
rocketMQTemplate.syncSend(userTopic, MessageBuilder.withPayload(
        new User().setUserAge((byte) 21).setUserName("Lester")).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON_VALUE).build());
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

异步发送消息

异步传输一般用于响应时间敏感的业务场景。

rocketMQTemplate.asyncSend("orderPaidTopic", "异步发送", new SendCallback() {
    @Override
    public void onSuccess(SendResult var1) {
        // 成功回调
        System.out.printf("async onSucess SendResult=%s %n", var1);
    }
    @Override
    public void onException(Throwable var1) {
        // 失败回调
        System.out.printf("async onException Throwable=%s %n", var1);
    }
});
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

单向发送消息

单向传输用于需要中等可靠性的情况,例如日志收集。

rocketMQTemplate.sendOneway("springTopic", "Hello, World!");
  • 1

发送有序消息

rocketMQTemplate.syncSendOrderly("orderly_topic",MessageBuilder.withPayload("Hello, World").build(),"hashkey")
  • 1

发送事务消息

Message msg = MessageBuilder.withPayload("rocketMQTemplate transactional message ").build();
// 第一个参数必须与@RocketMQTransactionListener的成员字段'transName'相同
rocketMQTemplate.sendMessageInTransaction("test-topic", msg, null);

// 使用注解@RocketMQTransactionListener定义事务监听器
@RocketMQTransactionListener
class TransactionListenerImpl implements RocketMQLocalTransactionListener {
      @Override
      public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            // ... local transaction process, return bollback, commit or unknown
            return RocketMQLocalTransactionState.UNKNOWN;
      }

      @Override
      public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
            // ... check transaction status and return bollback, commit or unknown
            return RocketMQLocalTransactionState.COMMIT;
      }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

发送特殊标签(tag)消息

rocketMQTemplate.convertAndSend(msgExtTopic + ":tag0", "I'm from tag0");  // tag0 不是消费者选择的,可以通过tag过滤掉
rocketMQTemplate.convertAndSend(msgExtTopic + ":tag1", "I'm from tag1");
  • 1
  • 2

发送批量消息

List<Message> msgs = new ArrayList<Message>();
for (int i = 0; i < 10; i++) {
    msgs.add(MessageBuilder.withPayload("Hello RocketMQ Batch Msg#" + i).
            setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build());
}
SendResult sr = rocketMQTemplate.syncSend(springTopic, msgs, 60000);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

消费者

配置文件如下:

rocketmq:
  # rocketMQ的命名服务器,格式为: host:port;host:port
  name-server: localhost:9876
  • 1
  • 2
  • 3

注意:

请将上述示例配置中的127.0.0.1:9876替换成真实RocketMQ的NameServer地址与端口

Push模式

编写代码

@Slf4j
@Service
@RocketMQMessageListener(topic = "laker-123", consumerGroup = "laker_consumer_group")
public class MyConsumer1 implements RocketMQListener<String> {
    public void onMessage(String message) {
        log.info("received message: {}", message);
    }
}

@Slf4j
@Service
@RocketMQMessageListener(topic = "test-topic-2", consumerGroup = "my-consumer_test-topic-2")
public class MyConsumer2 implements RocketMQListener<OrderPaidEvent> {
    public void onMessage(OrderPaidEvent orderPaidEvent) {
        log.info("received orderPaidEvent: {}", orderPaidEvent);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

这里只要不抛出异常,就会认为是成功返回CONSUME_SUCCESS,否则返回RECONSUME_LATER,boker会重试,相应代码如下

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    for (MessageExt messageExt : msgs) {
        log.debug("received msg: {}", messageExt);
        try {
            long now = System.currentTimeMillis();
            handleMessage(messageExt);等价于rocketMQListener.onMessage(doConvertMessage(messageExt));
            long costTime = System.currentTimeMillis() - now;
            log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
        } catch (Exception e) {
            log.warn("consume message failed. messageExt:{}, error:{}", messageExt, e);
            context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }

    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

Pull模式

RocketMQ Spring 2.2.0开始,RocketMQ Srping支持Pull模式消费

修改application.properties

## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.consumer.group=my-group1
rocketmq.consumer.topic=test
  • 1
  • 2
  • 3
  • 4

编写代码

@SpringBootApplication
public class ConsumerApplication implements CommandLineRunner {
    @Resource
    private RocketMQTemplate rocketMQTemplate;
    @Override
    public void run(String... args) throws Exception {
        //This is an example of pull consumer using rocketMQTemplate.
        List<String> messages = rocketMQTemplate.receive(String.class);
        System.out.printf("receive from rocketMQTemplate, messages=%s %n", messages);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

高级

请求应答语义

文档链接

RocketMQ-Spring 提供 请求/应答 语义支持。

  • Producer端

发送Request消息使用SendAndReceive方法

注意

同步发送需要在方法的参数中指明返回值类型

异步发送需要在回调的接口中指明返回值类型

// 同步发送request并且等待String类型的返回值
String replyString = rocketMQTemplate.sendAndReceive("stringRequestTopic", "request string", String.class);
// 异步发送request并且等待User类型的返回值
rocketMQTemplate.sendAndReceive("objectRequestTopic", new User("requestUserName",(byte) 9), new RocketMQLocalRequestCallback<User>() {
            @Override public void onSuccess(User message) {
                System.out.printf("send user object and receive %s %n", message.toString());
            }

            @Override public void onException(Throwable e) {
                e.printStackTrace();
            }
        }, 5000);

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • Consumer端

需要实现RocketMQReplyListener<T, R> 接口,其中T表示接收值的类型,R表示返回值的类型。

@SpringBootApplication
public class ConsumerApplication{
    
    public static void main(String[] args){
        SpringApplication.run(ConsumerApplication.class, args);
    }
    
    @Service
    @RocketMQMessageListener(topic = "stringRequestTopic", consumerGroup = "stringRequestConsumer")
    public class StringConsumerWithReplyString implements RocketMQReplyListener<String, String> {
        @Override
        public String onMessage(String message) {
          System.out.printf("------- StringConsumerWithReplyString received: %s \n", message);
          return "reply string";
        }
      }
   
    @Service
    @RocketMQMessageListener(topic = "objectRequestTopic", consumerGroup = "objectRequestConsumer")
    public class ObjectConsumerWithReplyUser implements RocketMQReplyListener<User, User>{
        public void onMessage(User user) {
          	System.out.printf("------- ObjectConsumerWithReplyUser received: %s \n", user);
          	User replyUser = new User("replyUserName",(byte) 10);	
          	return replyUser;
        }
    }

    @Data
    @AllArgsConstructor
    public class User implements Serializable{
        private String userName;
    		private Byte userAge;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

ACL功能

Producer 端要想使用 ACL 功能,需要多配置两个配置项:

## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group

rocketmq.producer.access-key=AK
rocketmq.producer.secret-key=SK
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

Consumer 端 ACL 功能需要在 @RocketMQMessageListener 中进行配置

@Service
@RocketMQMessageListener(
    topic = "test-topic-1", 
    consumerGroup = "my-consumer_test-topic-1",
    accessKey = "AK",
    secretKey = "SK"
)
public class MyConsumer implements RocketMQListener<String> {
    ...
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

注意:

可以不用为每个 @RocketMQMessageListener 注解配置 AK/SK,在配置文件中配置 rocketmq.consumer.access-keyrocketmq.consumer.secret-key 配置项,这两个配置项的值就是默认值

消息轨迹

Producer 端要想使用消息轨迹,需要多配置两个配置项:

## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group

rocketmq.producer.enable-msg-trace=true
rocketmq.producer.customized-trace-topic=my-trace-topic
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

Consumer 端消息轨迹的功能需要在 @RocketMQMessageListener 中进行配置对应的属性:

@Service
@RocketMQMessageListener(
    topic = "test-topic-1", 
    consumerGroup = "my-consumer_test-topic-1",
    enableMsgTrace = true,
    customizedTraceTopic = "my-trace-topic"
)
public class MyConsumer implements RocketMQListener<String> {
    ...
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

注意:

默认情况下 Producer 和 Consumer 的消息轨迹功能是开启的且 trace-topic 为 RMQ_SYS_TRACE_TOPIC Consumer 端的消息轨迹 trace-topic 可以在配置文件中配置 rocketmq.consumer.customized-trace-topic 配置项,不需要为在每个 @RocketMQMessageListener 配置。

阿里云消息轨迹正常显示需要设置accessChannel配置为CLOUD。

常见问题

  1. 生产环境有多个nameserver该如何连接?

    rocketmq.name-server支持配置多个nameserver地址,采用;分隔即可。例如:172.19.0.1:9876;172.19.0.2:9876

  2. rocketMQTemplate在什么时候被销毁?

    开发者在项目中使用rocketMQTemplate发送消息时,不需要手动执行rocketMQTemplate.destroy()方法, rocketMQTemplate会在spring容器销毁时自动销毁。

  3. 启动报错:Caused by: org.apache.rocketmq.client.exception.MQClientException: The consumer group[xxx] has been created before, specify another name please

    RocketMQ在设计时就不希望一个消费者同时处理多个类型的消息,因此同一个consumerGroup下的consumer职责应该是一样的,不要干不同的事情(即消费多个topic)。建议consumerGrouptopic一一对应。

  4. 发送的消息内容体是如何被序列化与反序列化的?

    RocketMQ的消息体都是以byte[]方式存储。当业务系统的消息内容体如果是java.lang.String类型时,统一按照utf-8编码转成byte[];如果业务系统的消息内容为非java.lang.String类型,则采用jackson-databind序列化成JSON格式的字符串之后,再统一按照utf-8编码转成byte[]

  5. 如何指定topic的tags?

    RocketMQ的最佳实践中推荐:一个应用尽可能用一个Topic,消息子类型用tags来标识,tags可以由应用自由设置。 在使用rocketMQTemplate发送消息时,通过设置发送方法的destination参数来设置消息的目的地,destination的格式为topicName:tagName:前面表示topic的名称,后面表示tags名称。

    注意:

    tags从命名来看像是一个复数,但发送消息时,目的地只能指定一个topic下的一个tag,不能指定多个。

  6. 发送消息时如何设置消息的key?

    可以通过重载的xxxSend(String destination, Message<?> msg, ...)方法来发送消息,指定msgheaders来完成。示例:

    Message<?> message = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, msgId).build();
    rocketMQTemplate.send("topic-test", message);
    
    • 1
    • 2

    同理还可以根据上面的方式来设置消息的FLAGWAIT_STORE_MSG_OK以及一些用户自定义的其它头信息。

    注意:

    在将Spring的Message转化为RocketMQ的Message时,为防止header信息与RocketMQ的系统属性冲突,在所有header的名称前面都统一添加了前缀USERS_。因此在消费时如果想获取自定义的消息头信息,请遍历头信息中以USERS_开头的key即可。

  7. 消费消息时,除了获取消息payload外,还想获取RocketMQ消息的其它系统属性,需要怎么做?

    消费者在实现RocketMQListener接口时,只需要起泛型为MessageExt即可,这样在onMessage方法将接收到RocketMQ原生的MessageExt消息。

    @Slf4j
    @Service
    @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
    public class MyConsumer2 implements RocketMQListener<MessageExt>{
        public void onMessage(MessageExt messageExt) {
            log.info("received messageExt: {}", messageExt);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
  8. 如何指定消费者从哪开始消费消息,或开始消费的位置?

    消费者默认开始消费的位置请参考:RocketMQ FAQ。 若想自定义消费者开始的消费位置,只需在消费者类添加一个RocketMQPushConsumerLifecycleListener接口的实现即可。 示例如下:

    @Slf4j
    @Service
    @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
    public class MyConsumer1 implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
        @Override
        public void onMessage(String message) {
            log.info("received message: {}", message);
        }
    
        @Override
        public void prepareStart(final DefaultMQPushConsumer consumer) {
            // set consumer consume message from now
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
            	  consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    同理,任何关于DefaultMQPushConsumer的更多其它其它配置,都可以采用上述方式来完成。

  9. 如何发送事务消息?

    在客户端,首先用户需要实现RocketMQLocalTransactionListener接口,并在接口类上注解声明@RocketMQTransactionListener,实现确认和回查方法;然后再使用资源模板RocketMQTemplate, 调用方法sendMessageInTransaction()来进行消息的发布。

    注意:从RocketMQ-Spring 2.1.0版本之后,注解@RocketMQTransactionListener不能设置txProducerGroup、ak、sk,这些值均与对应的RocketMQTemplate保持一致

  10. 如何声明不同name-server或者其他特定的属性来定义非标的RocketMQTemplate?

    第一步: 定义非标的RocketMQTemplate使用你需要的属性,可以定义与标准的RocketMQTemplate不同的nameserver、groupname等。如果不定义,它们取全局的配置属性值或默认值。

    // 这个RocketMQTemplate的Spring Bean名是'extRocketMQTemplate', 与所定义的类名相同(但首字母小写)
    @ExtRocketMQTemplateConfiguration(nameServer="127.0.0.1:9876"
       , ... // 定义其他属性,如果有必要。
    )
    public class ExtRocketMQTemplate extends RocketMQTemplate {
      //类里面不需要做任何修改
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    第二步: 使用这个非标RocketMQTemplate

    @Resource(name = "extRocketMQTemplate") // 这里必须定义name属性来指向上述具体的Spring Bean.
    private RocketMQTemplate extRocketMQTemplate; 
    
    • 1
    • 2

    接下来就可以正常使用这个extRocketMQTemplate了。

  11. 如何使用非标的RocketMQTemplate发送事务消息?

    首先用户需要实现RocketMQLocalTransactionListener接口,并在接口类上注解声明@RocketMQTransactionListener,注解字段的rocketMQTemplateBeanName指明为非标的RocketMQTemplate的Bean name(若不设置则默认为标准的RocketMQTemplate),比如非标的RocketMQTemplate Bean name为“extRocketMQTemplate",则代码如下:

    @RocketMQTransactionListener(rocketMQTemplateBeanName = "extRocketMQTemplate")
        class TransactionListenerImpl implements RocketMQLocalTransactionListener {
              @Override
              public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                // ... local transaction process, return bollback, commit or unknown
                return RocketMQLocalTransactionState.UNKNOWN;
              }
    
              @Override
              public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
                // ... check transaction status and return bollback, commit or unknown
                return RocketMQLocalTransactionState.COMMIT;
              }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    然后使用extRocketMQTemplate调用sendMessageInTransaction()来发送事务消息。

  12. MessageListener消费端,是否可以指定不同的name-server而不是使用全局定义的’rocketmq.name-server’属性值 ?

    @Service
    @RocketMQMessageListener(
       nameServer = "NEW-NAMESERVER-LIST", // 可以使用这个optional属性来指定不同的name-server
       topic = "test-topic-1", 
       consumerGroup = "my-consumer_test-topic-1",
       enableMsgTrace = true,
       customizedTraceTopic = "my-trace-topic"
    )
    public class MyNameServerConsumer implements RocketMQListener<String> {
       ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/434527
推荐阅读
相关标签
  

闽ICP备14008679号