赞
踩
生产者是消息的发送方。它将准备好的消息发送到指定的 Topic(主题)。生产者可以将消息发送给一个或多个 Broker,Broker 负责将消息存储起来,并确保消息可被消费者订阅和接收。生产者创建所需的消息,并将其发送到指定的 Topic,可以根据业务需求选择同步发送或异步发送的方式。生产者还可以设置消息的重试、延迟等特性。
消费者是消息的接收方,它从 Broker 订阅 Topic,即选择订阅感兴趣的消息。消费者可以以不同的方式处理接收到的消息,例如进行数据处理、更新状态、发送通知等。消费者可以以 PUSH 或 PULL 的方式从 Broker 获取消息。PUSH 方式是 Broker 主动将消息推送给消费者,而 PULL 方式则是消费者主动从 Broker 拉取消息。
NameServer 是一个轻量级的路由服务,用于管理 Topic 和 Consumer Group 的注册信息。它充当消息队列的路由和寻址服务,生产者发送消息时,需要向 NameServer 查询 Topic 所在的 Broker 地址列表;消费者订阅 Topic 时,也需要向 NameServer 注册消费者组并获取订阅的 Topic 信息。NameServer 还负责处理动态变更,例如新的 Broker 上线或下线时的注册更新。
Broker 是实际存储和传输消息的节点。它接收来自生产者的消息并将其存储到合适的 Topic 中,同时负责将消息传递给订阅了该 Topic 的消费者。Broker 管理着一部分 Topic 和它们的消息队列。在 RocketMQ 中,Broker 通过主从模式实现高可用性,其中每个 Broker 可能具有多个角色,如 Master 和 Slave。
项目结构如下:
首先一个父目录:SpringBoot-RocketMQ
然后下面分别是三个子目录:
生产者:springboot-dubbo-provider
接口:springboot-dubbo-interface
消费者:springboot-dubbo-consumer
(由于复用的之前dubbo项目,目录名称没有改过来)
项目源码我挂在github上面了,直接拉取master分支即可:https://github.com/shengwanping/SpringBoot-RocketMQ
1、SpringBoot-RocketMQ中pom.xml加入:
<!--dependencyManagement 依赖管理,子项目不会继承父依赖,需要重新声明--> <dependencyManagement> <dependencies> <!-- spring-boot依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>2.3.12.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> <!-- rocketmq-apache --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
2、springboot-dubbo-provider中pom.xml加入:
<dependencies> <!--引入 springboot-dubbo-interface 接口服务--> <dependency> <groupId>org.example</groupId> <artifactId>springboot-dubbo-interface</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <!-- springboot 依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- rocketmq-apache --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.0</version> </dependency> </dependencies>
3、springboot-dubbo-consumer中pom.xml加入:
<dependencies> <!--引入 springboot-dubbo-interface 接口服务--> <dependency> <groupId>org.example</groupId> <artifactId>springboot-dubbo-interface</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <!-- springboot 依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- rocketmq-apache --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.0</version> </dependency> </dependencies>
4、springboot-dubbo-interface 只是一个接口工具包,pom.xml中不需要额外配置
1、生产者application.yml配置如下
server:
port: 8010
rocketmq:
name-server: localhost:9876 # 连接Rocketmq Name Server服务注册中心
producer: # 生产者
group: producer-one # 生产者组.随意取名
2、消费者application.yml配置如下
server:
port: 8011
rocketmq:
name-server: localhost:9876 # 连接Rocketmq Name Server服务注册中心
producer: # 消费者者
group: consumer-one # 消费者组.随意取名
1、接口工具包下面只有一个接口:
public interface DemoService {
void sendHello();
}
2、生产者包下有两个类:
这里用启动类直接调用了接口
package org.dubbo.provider; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.dubbo.DemoService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * 模拟 消息生产者 */ @Service("demoService") public class DemoServiceImpl implements DemoService { @Autowired private RocketMQTemplate rocketMQTemplate; @Override public void sendHello() { // 向Proder发送消息 topic 发送的消息 rocketMQTemplate.convertAndSend("topic_001", "Hello RocketMQ"); } }
package org.dubbo.provider; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ConfigurableApplicationContext; @SpringBootApplication public class ProviderApplication { public static void main(String[] args) { // 获取上下文 ConfigurableApplicationContext context = SpringApplication.run(ProviderApplication.class, args); // 模拟Controller调用这个接口(启动后直接调用sendHello()方法,发送消息) DemoServiceImpl demoService = (DemoServiceImpl) context.getBean("demoService"); demoService.sendHello(); } }
3、消费者包下有两个类:
package org.dubbo.consumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
}
package org.dubbo.consumer; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; /** * 消息 消费者 */ @Component // 指定topic 和 消费者组 @RocketMQMessageListener(topic = "topic_001", consumerGroup = "${rocketmq.producer.group}") public class ConsumerMode implements RocketMQListener<String> { // 继承RocketMQListener接口 @Override public void onMessage(String s) { System.out.println("收到的消息是:"+s); } }
完成了如上配置,生产者和消费者的代码就完成了,然后需要启动rocketmq的NameServer(路由注册中心)和BrokerServer
如何启动NameServer和Broker请参考下面这篇文章:
RocketMQ下载,RocketMQ可视化控制台下载
启动 NameServer和Broker 之后我们启动生产者和消费者 这时如果在消费者下面打印“收到的消息是:Hello RocketMQ”,则说明成功了。
如果有安装启动RocketMQ可视化管理平台,我们还能在上面清晰看到生产者发送的消息
本人也是刚刚学习RocketMQ,有什么不足的欢迎大家留言交流!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。