当前位置:   article > 正文

RocketMQ实战:springcloud集成RocketMQ_spring-cloud-starter-stream-rocketmq

spring-cloud-starter-stream-rocketmq

一、引入依赖

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
  • 1
  • 2
  • 3
  • 4

二、消息发送demo

1. application.properties配置

server.port=8080
spring.application.name=rocketmq-broadcast-producer-example

spring.cloud.stream.function.definition=producer
spring.cloud.stream.rocketmq.binder.name-server=192.168.0.104:9876
spring.cloud.stream.rocketmq.bindings.producer-out-0.producer.group=output_1
spring.cloud.stream.bindings.producer-out-0.destination=broadcast

logging.level.org.springframework.context.support=debug
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

2. 消息发送demo代码


import com.strongculture.rocketmqserver.entity.SimpleMsg;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.apache.rocketmq.common.message.MessageConst;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

/**
 * @Author lucky_wxn
 * @Date 4/3/2024 下午5:27
 * @Content
 */
@Api(tags = "Rockmq测试服务")
@RestController
public class ProducerController {

    @Autowired
    private StreamBridge streamBridge;

    @ApiOperation("发送实时消息")
    @PostMapping("/sendMessage")
    public String sendMessage(@RequestParam(value = "message") String message) {

        Map<String, Object> headers = new HashMap<String, Object>();
        headers.put(MessageConst.PROPERTY_KEYS, "test");
        headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, UUID.randomUUID());
        Message<SimpleMsg> msg = new GenericMessage<SimpleMsg>(new SimpleMsg(message), headers);
        streamBridge.send("producer-out-0", msg);

        return "成功";
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

错误一:

No route info of this topic: inMemorySwaggerResourcesProvider-out-0
  • 1
错误原因与解决方案参考:
  1. Spring Cloud Strem - Swagger: A default binder has been requested, but there is no binder available
  2. spring还有这种编码?A default binder has been requested, but there is no binder available

错误二:

Type must be one of Supplier, Function or Consumer
  • 1
解决方案:

去掉官方代码示例中的“public ApplicationRunner producer()”这部分代码

三、消息接收demo

1. application.properties配置中增加如下配置

spring.cloud.stream.function.definition=consumer;
spring.cloud.stream.rocketmq.bindings.consumer-in-0.consumer.messageModel=BROADCASTING
spring.cloud.stream.bindings.consumer-in-0.destination=broadcast
spring.cloud.stream.bindings.consumer-in-0.group=broadcast-consumer
  • 1
  • 2
  • 3
  • 4

2. 消息接收demo代码

import com.strongculture.rocketmqserver.entity.SimpleMsg;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import java.util.function.Consumer;

/**
 * @Author lucky_wxn
 * @Date 4/3/2024 下午8:30
 * @Content
 */
@Slf4j
@Component
public class RocketLister {

    @Bean
    public Consumer<Message<SimpleMsg>> consumer() {
        return msg -> {
            log.info(Thread.currentThread().getName() + " Consumer1 Receive New Messages: " + msg.getPayload().getMessage());
        };
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

附言

参考资料

  1. SpringCloud集成RocketMQ
  2. RocketMQ(二)SpringCloud集成RocketMQ样例
  3. RocketMQ消息发送5种demo案例
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/712952
推荐阅读
相关标签
  

闽ICP备14008679号