赞
踩
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
server.port=8080
spring.application.name=rocketmq-broadcast-producer-example
spring.cloud.stream.function.definition=producer
spring.cloud.stream.rocketmq.binder.name-server=192.168.0.104:9876
spring.cloud.stream.rocketmq.bindings.producer-out-0.producer.group=output_1
spring.cloud.stream.bindings.producer-out-0.destination=broadcast
logging.level.org.springframework.context.support=debug
import com.strongculture.rocketmqserver.entity.SimpleMsg;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.apache.rocketmq.common.message.MessageConst;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* @Author lucky_wxn
* @Date 4/3/2024 下午5:27
* @Content
*/
@Api(tags = "Rockmq测试服务")
@RestController
public class ProducerController {
@Autowired
private StreamBridge streamBridge;
@ApiOperation("发送实时消息")
@PostMapping("/sendMessage")
public String sendMessage(@RequestParam(value = "message") String message) {
Map<String, Object> headers = new HashMap<String, Object>();
headers.put(MessageConst.PROPERTY_KEYS, "test");
headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, UUID.randomUUID());
Message<SimpleMsg> msg = new GenericMessage<SimpleMsg>(new SimpleMsg(message), headers);
streamBridge.send("producer-out-0", msg);
return "成功";
}
}
No route info of this topic: inMemorySwaggerResourcesProvider-out-0
Type must be one of Supplier, Function or Consumer
去掉官方代码示例中的“public ApplicationRunner producer()”这部分代码
spring.cloud.stream.function.definition=consumer;
spring.cloud.stream.rocketmq.bindings.consumer-in-0.consumer.messageModel=BROADCASTING
spring.cloud.stream.bindings.consumer-in-0.destination=broadcast
spring.cloud.stream.bindings.consumer-in-0.group=broadcast-consumer
import com.strongculture.rocketmqserver.entity.SimpleMsg;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.util.function.Consumer;
/**
* @Author lucky_wxn
* @Date 4/3/2024 下午8:30
* @Content
*/
@Slf4j
@Component
public class RocketLister {
@Bean
public Consumer<Message<SimpleMsg>> consumer() {
return msg -> {
log.info(Thread.currentThread().getName() + " Consumer1 Receive New Messages: " + msg.getPayload().getMessage());
};
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。