赞
踩
RocketMQ是阿里开发的消息中间件,吸取了RabbitMQ和Kafka的优点:并发高,功能丰富,适用场景广。双十一里可以承受数亿级的高并发,主要是它的功劳。
上图就是rocketMQ的架构图,需要注意的是:每一个组件都是集群形式,因为它被开发创造的时候,就是为了解决大规模数据的生产环境下的消息发送。所以集群形式部署才是它最适合的用法。
以上4个概念,可以理解为是物理上有区别的概念,还有一些逻辑概念:
Topic
表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息
订阅的基本单位。
Group
Producer Group具有相同角色的生产者组合在一起 或者 Consumer Group具有相同角色的消费者组合在一起。组与组之间可以是不同的业务逻辑,彼此不影响。
MessageQueue
同一个Topic下的数据,会分片保存到不同的Broker上,而每一个分片单位,就叫做MessageQueue。
Message
消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题Topic。每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。Message上有一个为消息设置的标志,Tag标签。用于同一主题下区分不同类型的消息。
因为RocketMQ的源码是Java编写的,所以它是一个Java项目,在启动它之前,我们要求必须有jdk的环境。具体怎么安装jdk我就不在这里讲解了。我这里用的CentOS7的Linux系统:
执行安装命令:
wget https://archive.apache.org/dist/rocketmq/4.7.1/rocketmq-all-4.7.1-bin-release.zip
解压命令
unzip rocketmq-all-4.7.1-bin-release.zip
cd到解压的文件后,输入pwd查看文件目录
然后把目录配置到环境变量中:
# 编辑环境变量文件
vi ~/.bash_profile
修改完环境变量文件后,执行刷新命令,让环境变量生效。
source ~/.bash_profile
用静默启动的方式启动NameServer服务
nohup bin/mqnamesrv &
启动成功后,用jps命令,查看是否有 NameservStartup程序,表示启动成功。
也以静默启动的方式启动runbroker.sh
nohup ./mqbroker &
启动后,jps指令可以看到一个BrokerStartup进程。
至此,我们启动RocketMQ就完成了,其实就是启动了两个项目,对应我们上面将的组件:一个是NameServer 和 Broker
创建一个空的springBoot项目。
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.1</version> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </exclusion> <exclusion> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> </exclusion> <exclusion> <groupId>org.springframework</groupId> <artifactId>spring-webmvc</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>2.1.6.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <version>2.1.6.RELEASE</version> </dependency>
在这个配置文件:application.properties 配置RocketMQ的ID地址和端口 以及 分组名称
rocketmq.name-server=192.168.69.128:9876
rocketmq.producer.group=springBootGroup
创建MQTestController文件,用户对外提供的接口,可以调取发送消息。
@RestController
@RequestMapping("/MQTest")
public class MQTestController {
private final String topic = "TestTopic";
@Resource
private SpringProducer producer;
@RequestMapping("/sendMessage")
public String sendMessage(String message){
producer.sendMessage(topic,message);
return "消息发送完成";
}
}
创建 生产者类 SpringProducer
import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; import javax.annotation.Resource; @Component public class SpringProducer { @Resource private RocketMQTemplate rocketMQTemplate; public void sendMessage(String topic,String msg){ this.rocketMQTemplate.convertAndSend(topic,msg); } }
创建消费者类SpringConsumer,去监听消息。
@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic",consumeMode= ConsumeMode.CONCURRENTLY)
public class SpringConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("===我是消费者===收到了消息 : "+ message);
}
}
启动main方法,我们的生产者类和消费者类都会加载到spring容器里,并启动。
@SpringBootApplication
public class RocketMQSBApplication {
public static void main(String[] args) {
SpringApplication.run(RocketMQSBApplication.class,args);
}
}
启动项目后,默认是8080端口。我们调用Controller里配好的接口地址,并加上message消息。
可以看到控制台里,消费者已经把这条消息消费并打印出来了:
这就是非常简单的一次Java通过springBoot使用RocketMQ的实战操作。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。