赞
踩
1、下载RocketMQ,下载地址:下载 | RocketMQ
2、linux新建文件夹并解压下载的RocketMQ到文件夹中
使用命令 unzip -d /usr/local/rocketmq rocketmq-all-4.9.6-bin-release.zip 进行解压,如果不能使用unzip解压命令则可以输入命令 yum install -y unzip zip 进行安装
3、进入解压后的rocketmq目录,创建存放数据的目录
使用命令 mkdir -p store store/commitlog store/consumequeue
4、进入rocketmq解压目录中的conf目录,编辑 broker.conf 文件,在broker.conf文件内容最后加上如下配置:
listenPort=10911
namesrvAddr=127.0.0.1:9876
storePathRootDir=/usr/local/rocketmq/rocketmq-all-4.9.6-bin-release/store
storePathCommitLog=/usr/local/rocketmq/rocketmq-all-4.9.6-bin-release/store/commitlog
storePathConsumerQueue=/usr/local/rocketmq/rocketmq-all-4.9.6-bin-release/store/consumequeue
5、修改配置文件
(1)修改内存大小(自行根据部署的机器硬件实际情况选择修改或跳过)
进入rocketmq解压目录中的bin目录,编辑 runbroker.sh 文件和 runserver.sh 文件,修改内存大小
runserver.sh文件
runbroker.sh文件
(2)修改jdk路径
进入到解压后rocketMQ的bin目录,修改文件runbroker.sh、runserver.sh中的jdk路径
6、启动RocketMQ,步骤4配置的端口防火墙需要先开启
(1)先使用命令 nohup sh mqnamesrv & 启动namesever服务
(2)再使用命令 nohup sh mqbroker -c /usr/local/rocketmq/rocketmq-all-4.9.6-bin-release/conf/broker.conf & 启动broke服务(指定配置路径根据自己的安装路径来)
(3)输入命令jps查看nameserver服务和broke服务是否启动成功
(4)如果输入启动命令后jps没查看有对应的服务,可以在rocketmq的安装目录下的bin目录查看nohup.out文件的输出内容看看是否出现了错误异常
7、关闭RocketMQ
进入rocketmq的安装目录下的bin目录
(1)关闭 broker 命令 sh mqshutdown broker
(2)关闭 nameserver 命令 sh mqshutdown namesrv
8、安装RocketMQ控制台
(1)下载地址:https://github.com/apache/rocketmq-dashboard
(2)下载完后用idea打开项目
(3)修改配置文件application.yml,修改成部署的nameserver所在的ip地址
(4)启动项目,输入地址访问控制台
(5)可以打包成jar包部署在linux中
打包命令 mvn clean package -Dmaven.test.skip=true
9、spingboot项目pom中引入rocketmq依赖包
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.1</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.9.6</version> //根据自己使用的版本来引入 </dependency>
(1)生产者application.yml配置文件
rocketmq: name-server: 192.168.200.100:9876 #nameserver的地址 producer: # 生产组名 group: demoGroup # 消息发送超时时间 send-message-timeout: 3000 # 消息体阈值,4k以上会压缩 compress-message-body-threshold: 4096 # 在同步模式下发送失败之前在内部执行的最大重试次数。 retry-times-when-send-failed: 3 # 在异步模式下发送失败之前在内部执行的最大重试次数。 retry-times-when-send-async-failed: 3 # 消息阈值,最大4MB,在 4KB 之内性能最佳 max-message-size: 4096
(2)消费者application.yml配置文件
rocketmq: name-server: 192.168.200.100:9876 #nameserver的地址
(3)生产者发送消息代码
@Autowired private RocketMQTemplate rocketMQTemplate; @RequestMapping("/sendMq") public String sendMq() { TestRocketMq rocketMq = new TestRocketMq(); rocketMq.setName("mq消息订单"); rocketMq.setAge(56); rocketMQTemplate.convertAndSend("order_topic", rocketMq); return "success"; }
(4)消费者消费消息代码
/** * consumerGroup = "demo-consumer-group", // consumerGroup:消费者组名 * topic = "Demo", // topic:订阅的主题 * selectorExpression = "*", // selectorExpression:控制可以选择的消息,可以使用SelectorType.SQL92语法。设置为 * 时,表示全部。 * messageModel = MessageModel.CLUSTERING // messageModel: 控制消息模式。MessageModel.CLUSTERING:负载均衡;MessageModel.BROADCASTING:广播模式 */ @Service @RocketMQMessageListener(consumerGroup = "stockConsumer", topic = "order_topic") public class RocketMqService implements RocketMQListener<TestRocketMq> { @Override public void onMessage(TestRocketMq testRocketMq) { System.out.println("消费消息-"+ testRocketMq); } }
10、测试消息发送和消费
11、rocketmq集群部署
因为我用的是一台虚拟机来搭建集群,所以是个伪集群,但是真正的分布式集群思路也是一样的
(1)创建集群和节点文件夹
(2)配置nameserver集群,在节点目录node9801和node9802下分别创建namesrv.properties配置文件,编辑加上对应的端口
listenPort=9801
(3)配置broker集群,在rocketmq的解压目录的conf目录中有3种类型的集群部署配置方式,想要用哪一种就进去对应的目录下拷贝出来对应的配置文件到步骤(1)的集群节点目录中,我这里使用的是主从同步双写的方式(2m-2s-sync)
(4)编辑broker集群对应的配置文件
主节点 broker-a.properties
从节点 broker-a-s.properties
注意:这里的listenPort参数主从节点的端口号尽量不要使用相邻的,有些端口号可能会被主节点占用导致从节点不能绑定
其他节点也按照以上的配置来配置好,可根据实际情况参考下面的可配置参数进行配置
可配置的参数说明:
# 所属集群名字
brokerClusterName=myRocketmqCluster
# broker名字,注意此处不同的配置文件填写的不一样(按配置文件文件名来匹配)
brokerName=broker-a
# 0 表示Master, > 0 表示slave
brokerId=0
# 注册中心,可使用集群模式
namesrvAddr=192.168.0.129:9876;192.168.0.130:9876
# 在发送消息时,自动创建服务器不存在的Topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 是否允许Broker自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# Broker 对外服务的监听端口
listenPort=10933
# 删除文件时间点,默认是凌晨4点
deleteWhen=04
# 文件保留时间,默认48小时
fileReservedTime=120
# commitLog每个文件的大小默认1G
# 附加:消息实际存储位置,和ConsumeQueue是mq的核心存储概念,之前搭建2m环境的时候创建在store下面,用于数据存储,consumequeue是一个逻辑的概念,消息过来之后,consumequeue并不是把消息所有保存起来,而是记录一个数据的位置,记录好之后再把消息存到commitlog文件里
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
destroyMapedFileIntervalForcibly=120000
redeleteHangedFileInterval=120000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
storePathRootDir=/usr/local/rocketMQ/store
# commitLog存储路径
storePathCommitLog=/usr/local/rocketMQ/commitlog
# 消费队列存储路径
storePathConsumeQueue=/usr/local/rocketMQ/consumequeue
# 消息索引存储路径
storePathIndex=/usr/local/rocketMQ/index
# checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketMQ/checkpoint
# abort 文件存储路径
abortFile=/usr/local/rocketMQ/abort
# 限制的消息大小
maxMessageSize=65536
flushCommitLogLeastPages=4
flushConsumeQueueLeastPages=2
flushCommitLogThoroughInterval=10000
flushConsumeQueueThoroughInterval=60000
# Broker 的角色
# ASYNC_MASTER 异步复制Master
# SYNC_MASTER 同步双写Master
# SLAVE
brokerRole=SYNC_MASTER
# 刷盘方式
# ASYNC_FLUSH 异步刷盘
# SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
checkTransactionMessageEnable=false
# 发消息线程池数量
sendMessageTreadPoolNums=128
# 拉消息线程池数量
pullMessageTreadPoolNums=128
(5)启动集群所有的nameserver和broker节点,进入到rocketmq解压目录的bin目录
启动nameserver 命令:
nohup sh mqnamesrv -c /usr/local/rocketmqcluster/node9801/namesrv.properties &
启动brokerServer命令:
nohup sh mqbroker -c /usr/local/rocketmqcluster/node9801/broker-a.properties &
nohup sh mqbroker -c /usr/local/rocketmqcluster/node9801/broker-a-s.properties &
2个nameserver服务,2个broker服务集群,1个broker服务集群由1个主节点和1个从节点组成
(6)使用rocketmq控制台查看集群情况,控制台项目的配置文件需要修改集群的nameserver地址
(7)客户端可配置多个nameserver地址
有问题和建议欢迎大家留言评论,谢谢~
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。