赞
踩
本文解析将 RocketMQ Client 端集成为 spring-boot-starter 框架的开发细节,然后通过一个简单的示例来一步一步的讲解如何使用这个 spring-boot-starter 工具包来配置,发送和消费 RocketMQ 消息。
添加maven依赖:
- <!--在pom.xml中添加依赖-->
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-spring-boot-starter</artifactId>
- <version>${RELEASE.VERSION}</version>
- </dependency>
修改application.properties
- ## application.properties
- rocketmq.name-server=127.0.0.1:9876
- rocketmq.producer.group=my-group
注意:
请将上述示例配置中的
127.0.0.1:9876
替换成真实RocketMQ的NameServer地址与端口
- @SpringBootApplication
- public class ProducerApplication implements CommandLineRunner{
- @Resource
- private RocketMQTemplate rocketMQTemplate;
-
- public static void main(String[] args){
- SpringApplication.run(ProducerApplication.class, args);
- }
-
- public void run(String... args) throws Exception {
- //send message synchronously
- rocketMQTemplate.convertAndSend("test-topic-1", "Hello, World!");
- //send spring message
- rocketMQTemplate.send("test-topic-1", MessageBuilder.withPayload("Hello, World! I'm from spring message").build());
- //send messgae asynchronously
- rocketMQTemplate.asyncSend("test-topic-2", new OrderPaidEvent("T_001", new BigDecimal("88.00")), new SendCallback() {
- @Override
- public void onSuccess(SendResult var1) {
- System.out.printf("async onSucess SendResult=%s %n", var1);
- }
-
- @Override
- public void onException(Throwable var1) {
- System.out.printf("async onException Throwable=%s %n", var1);
- }
-
- });
- //Send messages orderly
- rocketMQTemplate.syncSendOrderly("orderly_topic",MessageBuilder.withPayload("Hello, World").build(),"hashkey")
-
- //rocketMQTemplate.destroy(); // notes: once rocketMQTemplate be destroyed, you can not send any message again with this rocketMQTemplate
- }
-
- @Data
- @AllArgsConstructor
- public class OrderPaidEvent implements Serializable{
- private String orderId;
-
- private BigDecimal paidMoney;
- }
- }
修改application.properties
- ## application.properties
- rocketmq.name-server=127.0.0.1:9876
注意:
请将上述示例配置中的
127.0.0.1:9876
替换成真实RocketMQ的NameServer地址与端口
编写代码
- @SpringBootApplication
- public class ConsumerApplication{
-
- public static void main(String[] args){
- SpringApplication.run(ConsumerApplication.class, args);
- }
-
- @Slf4j
- @Service
- @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
- public class MyConsumer1 implements RocketMQListener<String>{
- public void onMessage(String message) {
- log.info("received message: {}", message);
- }
- }
-
- @Slf4j
- @Service
- @RocketMQMessageListener(topic = "test-topic-2", consumerGroup = "my-consumer_test-topic-2")
- public class MyConsumer2 implements RocketMQListener<OrderPaidEvent>{
- public void onMessage(OrderPaidEvent orderPaidEvent) {
- log.info("received orderPaidEvent: {}", orderPaidEvent);
- }
- }
- }
从RocketMQ Spring 2.2.0开始,RocketMQ Srping支持Pull模式消费
修改application.properties
- ## application.properties
- rocketmq.name-server=127.0.0.1:9876
- # When set rocketmq.pull-consumer.group and rocketmq.pull-consumer.topic, rocketmqTemplate will start lite pull consumer
- # If you do not want to use lite pull consumer, please do not set rocketmq.pull-consumer.group and rocketmq.pull-consumer.topic
- rocketmq.pull-consumer.group=my-group1
- rocketmq.pull-consumer.topic=test
注意之前lite pull consumer的生效配置为rocketmq.consumer.group和rocketmq.consumer.topic,但由于非常容易与push-consumer混淆,因此在2.2.3版本之后修改为rocketmq.pull-consumer.group和rocketmq.pull-consumer.topic.
编写代码
- @SpringBootApplication
- public class ConsumerApplication implements CommandLineRunner {
-
- @Resource
- private RocketMQTemplate rocketMQTemplate;
-
- @Resource(name = "extRocketMQTemplate")
- private RocketMQTemplate extRocketMQTemplate;
-
- public static void main(String[] args) {
- SpringApplication.run(ConsumerApplication.class, args);
- }
-
- @Override
- public void run(String... args) throws Exception {
- //This is an example of pull consumer using rocketMQTemplate.
- List<String> messages = rocketMQTemplate.receive(String.class);
- System.out.printf("receive from rocketMQTemplate, messages=%s %n", messages);
-
- //This is an example of pull consumer using extRocketMQTemplate.
- messages = extRocketMQTemplate.receive(String.class);
- System.out.printf("receive from extRocketMQTemplate, messages=%s %n", messages);
- }
- }
修改application.properties
- ## application.properties
- rocketmq.name-server=127.0.0.1:9876
- rocketmq.producer.group=my-group
注意:
请将上述示例配置中的
127.0.0.1:9876
替换成真实RocketMQ的NameServer地址与端口
编写代码
- @SpringBootApplication
- public class ProducerApplication implements CommandLineRunner{
- @Resource
- private RocketMQTemplate rocketMQTemplate;
-
- public static void main(String[] args){
- SpringApplication.run(ProducerApplication.class, args);
- }
-
- public void run(String... args) throws Exception {
- try {
- // Build a SpringMessage for sending in transaction
- Message msg = MessageBuilder.withPayload(..)...;
- // In sendMessageInTransaction(), the first parameter transaction name ("test")
- // must be same with the @RocketMQTransactionListener's member field 'transName'
- rocketMQTemplate.sendMessageInTransaction("test-topic", msg, null);
- } catch (MQClientException e) {
- e.printStackTrace(System.out);
- }
- }
-
- // Define transaction listener with the annotation @RocketMQTransactionListener
- @RocketMQTransactionListener
- class TransactionListenerImpl implements RocketMQLocalTransactionListener {
- @Override
- public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
- // ... local transaction process, return bollback, commit or unknown
- return RocketMQLocalTransactionState.UNKNOWN;
- }
-
- @Override
- public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
- // ... check transaction status and return bollback, commit or unknown
- return RocketMQLocalTransactionState.COMMIT;
- }
- }
- }
Producer 端要想使用消息轨迹,需要多配置两个配置项:
- ## application.properties
- rocketmq.name-server=127.0.0.1:9876
- rocketmq.producer.group=my-group
-
- rocketmq.producer.enable-msg-trace=true
- rocketmq.producer.customized-trace-topic=my-trace-topic
Consumer 端消息轨迹的功能需要在 @RocketMQMessageListener
中进行配置对应的属性:
- @Service
- @RocketMQMessageListener(
- topic = "test-topic-1",
- consumerGroup = "my-consumer_test-topic-1",
- enableMsgTrace = true,
- customizedTraceTopic = "my-trace-topic"
- )
- public class MyConsumer implements RocketMQListener<String> {
- ...
- }
注意:
默认情况下 Producer 和 Consumer 的消息轨迹功能是开启的且 trace-topic 为 RMQ_SYS_TRACE_TOPIC Consumer 端的消息轨迹 trace-topic 可以在配置文件中配置
rocketmq.consumer.customized-trace-topic
配置项,不需要为在每个@RocketMQMessageListener
配置。
若需使用阿里云消息轨迹,则需要在
@RocketMQMessageListener
中将accessChannel
配置为CLOUD
。
Producer 端要想使用 ACL 功能,需要多配置两个配置项:
- ## application.properties
- rocketmq.name-server=127.0.0.1:9876
- rocketmq.producer.group=my-group
-
- rocketmq.producer.access-key=AK
- rocketmq.producer.secret-key=SK
Consumer 端 ACL 功能需要在 @RocketMQMessageListener
中进行配置:
- @Service
- @RocketMQMessageListener(
- topic = "test-topic-1",
- consumerGroup = "my-consumer_test-topic-1",
- accessKey = "AK",
- secretKey = "SK"
- )
- public class MyConsumer implements RocketMQListener<String> {
- ...
- }
注意:
可以不用为每个
@RocketMQMessageListener
注解配置 AK/SK,在配置文件中配置rocketmq.consumer.access-key
和rocketmq.consumer.secret-key
配置项,这两个配置项的值就是默认值
RocketMQ-Spring 提供 请求/应答 语义支持。
发送Request消息使用SendAndReceive方法
注意
同步发送需要在方法的参数中指明返回值类型
异步发送需要在回调的接口中指明返回值类型
- @SpringBootApplication
- public class ProducerApplication implements CommandLineRunner{
- @Resource
- private RocketMQTemplate rocketMQTemplate;
-
- public static void main(String[] args){
- SpringApplication.run(ProducerApplication.class, args);
- }
-
- public void run(String... args) throws Exception {
- // 同步发送request并且等待String类型的返回值
- String replyString = rocketMQTemplate.sendAndReceive("stringRequestTopic", "request string", String.class);
- System.out.printf("send %s and receive %s %n", "request string", replyString);
-
- // 异步发送request并且等待User类型的返回值
- rocketMQTemplate.sendAndReceive("objectRequestTopic", new User("requestUserName",(byte) 9), new RocketMQLocalRequestCallback<User>() {
- @Override public void onSuccess(User message) {
- System.out.printf("send user object and receive %s %n", message.toString());
- }
-
- @Override public void onException(Throwable e) {
- e.printStackTrace();
- }
- }, 5000);
- }
-
- @Data
- @AllArgsConstructor
- public class User implements Serializable{
- private String userName;
- private Byte userAge;
- }
- }
需要实现RocketMQReplyListener<T, R> 接口,其中T表示接收值的类型,R表示返回值的类型。
- @SpringBootApplication
- public class ConsumerApplication{
-
- public static void main(String[] args){
- SpringApplication.run(ConsumerApplication.class, args);
- }
-
- @Service
- @RocketMQMessageListener(topic = "stringRequestTopic", consumerGroup = "stringRequestConsumer")
- public class StringConsumerWithReplyString implements RocketMQReplyListener<String, String> {
- @Override
- public String onMessage(String message) {
- System.out.printf("------- StringConsumerWithReplyString received: %s \n", message);
- return "reply string";
- }
- }
-
- @Service
- @RocketMQMessageListener(topic = "objectRequestTopic", consumerGroup = "objectRequestConsumer")
- public class ObjectConsumerWithReplyUser implements RocketMQReplyListener<User, User>{
- public User onMessage(User user) {
- System.out.printf("------- ObjectConsumerWithReplyUser received: %s \n", user);
- User replyUser = new User("replyUserName",(byte) 10);
- return replyUser;
- }
- }
-
- @Data
- @AllArgsConstructor
- public class User implements Serializable{
- private String userName;
- private Byte userAge;
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。