赞
踩
本教程使⽤的是RocketMQ4.7.1版本,建议使⽤该版本进⾏之后的demo训练。
运⾏版:https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.7.1/rocketmq-all-4.7.1-bin-release.zip
源码:https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.7.1/rocketmqall-4.7.1-source-release.zip
- 准备⼀台装有Linux系统的虚拟机。本教程使⽤的是Ubuntu16.04版本。
- 安装jdk,上传jdk-8u191安装包并解压缩在 /usr/local/java ⽬录下。
- 安装rocketmq,上传rocketmq安装包并使⽤unzip命令解压缩在 /usr/local/rocketmq ⽬录下。
- 配置jdk和rocketmq的环境变量
export JAVA_HOME=/usr/local/java/jdk1.8.0_191
export JRE_HOME=/usr/local/java/jdk1.8.0_191/jre
export ROCKETMQ_HOME=/usr/local/rocketmq/rocketmq-all-4.7.1-binrelease
export CLASSPATH=$CLASSPATH:$JAVA_HOME/lib:$JAVA_HOME/jre/lib
export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$ROCKETMQ_HOME/bin:$PATH:$HOME/bin
注意,RocketMQ的环境变量⽤来加载 ROCKETMQ_HOME/conf 下的配置⽂件,如果不配置则⽆法启动NameServer和Broker。
完成后执⾏命令,让环境变量生效
source /etc/profile 1
修改bin/runserver.sh⽂件,由于RocketMQ默认设置的JVM内存为4G,但虚拟机⼀般没有这么4G内存,因此调整为512mb。
JAVA_OPT=“${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m”
在runserver.sh⽂件中找到上⾯这段内容,改为下⾯的参数。
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
在上⼀章中介绍了RocketMQ的架构,启动RocketMQ服务需要先启动NameServer。 在bin⽬录内使⽤静默⽅式启动。
nohup ./mqnamesrv &
查看bin/nohup.out显示如下内容表示启动成功
root@ubuntu:/usr/local/rocketmq/rocketmq-all-4.7.1-bin-release/bin#cat nohup.out
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning:UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON
修改broker的JVM参数配置,将默认8G内存修改为512m。修改
bin/runbroker.sh
⽂件
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"
在conf/broker.conf
⽂件中加⼊如下配置,开启⾃动创建Topic功能
autoCreateTopicEnable=true
以静默⽅式启动broker
nohup ./mqbroker -n localhost:9876 &
查看 bin/nohup.out ⽇志,显示如下内容表示启动成功
The broker[ubuntu, 172.17.0.1:10911] boot success.serializeType=JSON
配置nameserver的环境变量
在发送/接收消息之前,需要告诉客户端nameserver的位置。配置环境变量
NAMESRV_ADDR
:
export NAMESRV_ADDR=localhost:9876
使⽤bin/tools.sh⼯具验证消息的发送,默认会发1000条消息
./tools.sh org.apache.rocketmq.example.quickstart.Producer
发送的消息⽇志:
...
SendResult [sendStatus=SEND_OK,msgId=FD154BA55A2B1008020C29FFFED6A0855CFC12A3A380885CB70A0235,offsetMsgId=AC11000100002A9F000000000001F491,messageQueue=MessageQueue [topic=TopicTest, brokerName=ubuntu,queueId=0], queueOffset=141]
使⽤bin/tools.sh⼯具验证消息的接收
./tools.sh org.apache.rocketmq.example.quickstart.Consumer
看到接收到的消息:
...
ConsumeMessageThread_12 Receive New Messages: [MessageExt[brokerName=ubuntu, queueId=1, storeSize=227, queueOffset=245,sysFlag=0, bornTimestamp=1658892578234,bornHost=/172.16.253.100:48524, storeTimestamp=1658892578235,storeHost=/172.17.0.1:10911,msgId=AC11000100002A9F0000000000036654, commitLogOffset=222804,bodyCRC=683694034, reconsumeTimes=0, preparedTransactionOffset=0,toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1658892813497,UNIQ_KEY=FD154BA55A2B1008020C29FFFED6A0855CFC12A3A380885CB9BA03D6,CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 56, 50],transactionId='null'}]]
关闭broker
./mqshutdown broker
关闭nameserver
./mqshutdown namesrv
为了追求更好的性能,RocketMQ的最佳实践⽅式都是在集群模式下完成。RocketMQ官⽅提供了三种集群搭建⽅式。
2主2从异步通信⽅式
使⽤异步⽅式进⾏主从之间的数据复制,吞吐量⼤,但可能会丢消息。使⽤conf/2m-2s-async
⽂件夹内的配置⽂件做集群配置。
2主2从同步通信⽅式
使⽤同步⽅式进⾏主从之间的数据复制,保证消息安全投递,不会丢失,但影响吞吐量
使⽤ conf/2m-2s-sync
⽂件夹内的配置⽂件做集群配置。
2主⽆从⽅式
会存在单点故障,且读的性能没有前两种⽅式好。使⽤ conf/2m-noslave
⽂件夹内的配置⽂件做集群配置。
上述三种官⽅提供的集群没办法实现⾼可⽤,即在master节点挂掉后,slave节点没办法⾃动被选举为新的master,⽽需要⼈⼯实现。
Dledger⾼可⽤集群:RocketMQ在4.5版本之后引⼊了第三⽅的Dleger⾼可⽤集群。
三台Linux服务器中nameserver和broker之间的关系如下:
服务器 | 服务器ip | NameServer | broker节点部署 |
---|---|---|---|
服务器1 | 172.16.253.103 | 172.16.253.103:9876 | |
服务器2 | 172.16.253.101 | 172.16.253.101:9876 | broker-a(master), broker-b-s(slave) |
服务器3 | 172.16.253.102 | 172.16.253.102:9876 | broker-b(master), broker-b-a(slave) |
三台服务器都需要安装jdk和rocketmq,安装步骤参考上⼀章节。
nameserver是⼀个轻量级的注册中⼼,broker把⾃⼰的信息注册到nameserver上。⽽且,nameserver是⽆状态的,直接启动即可。三台nameserver之间不需要通信,⽽是被请求⽅来关联三台nameserver的地址。
修改三台服务器的的runserver.sh⽂件,修改JVM内存默认的4g为512m。
在每台服务器的bin⽬录下执⾏如下命令:
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
服务器1:
nohup ./mqnamesrv -n 172.16.253.103:9876 &
服务器2:
nohup ./mqnamesrv -n 172.16.253.101:9876 &
服务器3:
nohup ./mqnamesrv -n 172.16.253.102:9876 &
broker-a,broker-b-s这两台broker是配置在服务器2上,broker-b,broker-a-s这两台broker是配置在服务器3上。这两对主从节点在不同的服务器上,服务器1上没有部署broker。
需要修改每台broker的配置⽂件。注意,同⼀台服务器上的两个broker保存路径不能⼀样。
broker-a的master节点
在服务器2上,进⼊到conf/2m-2s-async
⽂件夹内,修改broker-a.properties
⽂件。
# 所属集群名称 brokerClusterName=DefaultCluster # broker名字 brokerName=broker-a # broker所在服务器的ip brokerIP1=172.16.253.101 # broker的id,0表示master,>0表示slave brokerId=0 # 删除⽂件时间点,默认在凌晨4点 deleteWhen=04 # ⽂件保留时间为48⼩时 fileReservedTime=48 # broker的⻆⾊为master brokerRole=ASYNC_MASTER # 使⽤异步刷盘的⽅式 flushDiskType=ASYNC_FLUSH # 名称服务器的地址列表 namesrvAddr=172.16.253.103:9876;172.16.253.101:9876;172.16.253.102:9876 # 在发送消息⾃动创建不存在的topic时,默认创建的队列数为4个 defaultTopicQueueNums=4 # 是否允许 Broker ⾃动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true # 是否允许 Broker ⾃动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true # broker对外服务的监听端⼝ listenPort=10911 # abort⽂件存储路径 abortFile=/usr/local/rocketmq/store/abort # 消息存储路径 storePathRootDir=/usr/local/rocketmq/store # commitLog存储路径 storePathCommitLog=/usr/local/rocketmq/store/commitlog # 消费队列存储路径 storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue # 消息索引存储路径 storePathIndex=/usr/local/rocketmq/store/index # checkpoint⽂件存储路径 storeCheckpoint=/usr/local/rocketmq/store/checkpoint # 限制的消息⼤⼩ maxMessageSize=65536 # commitLog每个⽂件的⼤⼩默认1G mapedFileSizeCommitLog=1073741824 # ConsumeQueue每个⽂件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=300000
broker-a的slave节点
在服务器3上,进⼊到conf/2m-2s-async
⽂件夹内,修改broker-a-s.properties
⽂件。
brokerClusterName=DefaultCluster brokerName=broker-a brokerIP1=172.16.253.102 brokerId=1 deleteWhen=04 fileReservedTime=48 brokerRole=SLAVE flushDiskType=ASYNC_FLUSH namesrvAddr=172.16.253.103:9876;172.16.253.101:9876;172.16.253.102:9876 defaultTopicQueueNums=4 autoCreateTopicEnable=true autoCreateSubscriptionGroup=true listenPort=11011 abortFile=/usr/local/rocketmq/store-slave/abort storePathRootDir=/usr/local/rocketmq/store-slave storePathCommitLog=/usr/local/rocketmq/store-slave/commitlog storePathConsumeQueue=/usr/local/rocketmq/store-slave/consumequeue storePathIndex=/usr/local/rocketmq/store-slave/index storeCheckpoint=/usr/local/rocketmq/store-slave/checkpoint maxMessageSize=65536
broker-b的master节点
在服务器3上,进⼊到conf/2m-2s-async⽂件夹内,修改broker-b.properties⽂件。
brokerClusterName=DefaultCluster brokerName=broker-b brokerIP1=172.16.253.102 brokerId=0 deleteWhen=04 fileReservedTime=48 brokerRole=ASYNC_MASTER flushDiskType=ASYNC_FLUSH namesrvAddr=172.16.253.103:9876;172.16.253.101:9876;172.16.253.102 :9876 defaultTopicQueueNums=4 autoCreateTopicEnable=true autoCreateSubscriptionGroup=true listenPort=10911 abortFile=/usr/local/rocketmq/store/abort storePathRootDir=/usr/local/rocketmq/store storePathCommitLog=/usr/local/rocketmq/store/commitlog storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue storePathIndex=/usr/local/rocketmq/store/index storeCheckpoint=/usr/local/rocketmq/store/checkpoint maxMessageSize=65536
broker-b的slave节点
在服务器2上,进⼊到conf/2m-2s-async
⽂件夹内,修改broker-b-s.properties
⽂件。
brokerClusterName=DefaultCluster brokerName=broker-b brokerIP1=172.16.253.101 brokerId=1 deleteWhen=04 fileReservedTime=48 brokerRole=SLAVE flushDiskType=ASYNC_FLUSH namesrvAddr=172.16.253.103:9876;172.16.253.101:9876;172.16.253.102:9876 defaultTopicQueueNums=4 autoCreateTopicEnable=true autoCreateSubscriptionGroup=true listenPort=11011 abortFile=/usr/local/rocketmq/store-slave/abort storePathRootDir=/usr/local/rocketmq/store-slave storePathCommitLog=/usr/local/rocketmq/store-slave/commitlog storePathConsumeQueue=/usr/local/rocketmq/store-slave/consumequeue storePathIndex=/usr/local/rocketmq/store-slave/index storeCheckpoint=/usr/local/rocketmq/store-slave/checkpoint maxMessageSize=65536
修改服务器2和服务器3的runbroker.sh⽂件
修改JVM内存默认的8g为512m。
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"
在服务器2中启动broker-a(master)和broker-b-s(slave)
nohup ./mqbroker -c ../conf/2m-2s-async/broker-a.properties &
nohup ./mqbroker -c ../conf/2m-2s-async/broker-b-s.properties &
在服务器3中启动broker-b(master),broker-a-s(slave)
nohup ./mqbroker -c ../conf/2m-2s-async/broker-b.properties &
nohup ./mqbroker -c ../conf/2m-2s-async/broker-a-s.properties &
使⽤RocketMQ提供的tools⼯具验证集群是否正常⼯作。
在服务器2上配置环境变量⽤于被tools中的⽣产者和消费者程序读取该变量。
export NAMESRV_ADDR='172.16.253.103:9876;172.16.253.101:9876;172.16.253.102:9876'
启动⽣产者
./tools.sh org.apache.rocketmq.example.quickstart.Producer
启动消费者
./tools.sh org.apache.rocketmq.example.quickstart.Consumer
RocketMQ提供了命令⼯具⽤于管理topic、broker、集群、消息等。⽐如可以使⽤mqadmin创建topic:
./mqadmin updateTopic -n 172.16.253.101:9876 -c DefaultCluster -t myTopic1
下⾯提供了mqadmin⼯具的各种命令。
参数 | 是否必填 | 说明 |
---|---|---|
-b | 如果-c为空,则必填 | broker地址,表示topic建在该broker |
-c | 如果-b为空,则必填 | cluster名称,表示topic建在该集群(集群可通过clusterList查询) |
-h | 否 | 打印帮助 |
-n | 是 | nameserver服务地址列表,格式ip:port;ip:port;… |
-p | 否 | 指定新topic的权限控制(W|R|WR) |
-r | 否 | 可读队列数,默认为8 |
-w | 否 | 可写队列数,默认为8 |
-t | 是 | topic名称,注意名称格式:字下美人数骆驼 |
参数 | 是否必填 | 说明 |
---|---|---|
-c | 是 | cluster名称,表示删除某集群下的某个topic(集群可通过clusterList查询) |
-h | 否 | 打印帮助 |
-n | 是 | nameserver服务地址列表,格式ip:port;ip:port;… |
-t | 是 | topic名称,注意名称格式:字下美人数骆驼 |
参数 | 是否必填 | 说明 |
---|---|---|
-b | 如果-c为空,则必填 | broker地址,表示订阅组建在该broker |
-c | 如果-b为空,则必填 | cluster名称,表示topic建在该集群(集群可通过clusterList查询) |
-d | 否 | 是否容许广播方式消费 |
-g | 是 | 订阅组名 |
-i | 否 | 从哪个broker开始消费 |
-m | 否 | 是否容许从队列的最小位置开始消费,默认会设置为false |
-q | 否 | 消费失败的消息放到一个重试队列,每个订阅组配置几个重试队列 |
-r | 否 | 重试消费最大次数,超过则投递到死信队列,不再投递并报警 |
-s | 否 | 消费功能是否开启 |
-w | 否 | 发现消息堆积后,将Consumer的消费请求重定向到另一台slave机器 |
-h | 否 | 打印帮助 |
-n | 是 | nameserver服务地址列表,格式ip:port;ip:port;… |
参数 | 是否必填 | 说明 |
---|---|---|
-b | 如果-c为空,则必填 | broker地址,表示订阅组建在该broker |
-c | 如果-b为空,则必填 | cluster地址,表示topic建在该集群(集群可通过clusterList查询) |
-g | 是 | 订阅组名 |
-h | 否 | 打印帮助 |
-n | 是 | nameserver服务地址列表,格式ip:port;ip:port;… |
参数 | 是否必填 | 说明 |
---|---|---|
-b | 如果-c为空,则必填 | broker地址,表示订阅组建在该broker |
-c | 如果-b为空,则必填 | cluster地址,表示topic建在该集群(集群可通过clusterList查询) |
-k | 是 | key值 |
-v | 否 | value值 |
-h | 否 | 打印帮助 |
-n | 是 | nameserver服务地址列表,格式ip:port;ip:port;… |
参数 | 是否必填 | 说明 |
---|---|---|
-h | 否 | 打印帮助 |
-n | 是 | nameserver服务地址列表,格式ip:port;ip:port;… |
参数 | 是否必填 | 说明 |
---|---|---|
-t | 是 | topic名称 |
-h | 否 | 打印帮助 |
-n | 是 | nameserver服务地址列表,格式ip:port;ip:port;… |
参数 | 是否必填 | 说明 |
---|---|---|
-t | 是 | topic名称 |
-h | 否 | 打印帮助 |
-n | 是 | nameserver服务地址列表,格式ip:port;ip:port;… |
参数 | 是否必填 | 说明 |
---|---|---|
-b | 是 | broker地址 |
-h | 否 | 打印帮助 |
-n | 是 | nameserver服务地址列表,格式ip:port;ip:port;… |
参数 | 是否必填 | 说明 |
---|---|---|
-i | 是 | 消息id |
-h | 否 | 打印帮助 |
-n | 是 | nameserver服务地址列表,格式ip:port;ip:port;… |
参数 | 是否必填 | 说明 |
---|---|---|
-f | 否 | 被查询消息的截止时间 |
-k | 是 | msgKey |
-t | 是 | topic名称 |
-h | 否 | 打印帮助 |
-n | 是 | nameserver服务地址列表,格式ip:port;ip:port;… |
参数 | 是否必填 | 说明 |
---|---|---|
-b | 是 | broker名称,表示订阅组建在该broker(这里需要注意填写的是boker的名称,而不是broker地址,broker名称可以再clusterList查询) |
-i | 是 | query队列id |
-o | 是 | offset值 |
-t | 是 | topic名称 |
-h | 否 | 打印帮助 |
-n | 是 | nameserver服务地址列表,格式ip:port;ip:port;… |
参数 | 是否必填 | 说明 |
---|---|---|
-g | 是 | 生产者所属组名 |
-t | 是 | topic名称 |
-h | 否 | 打印帮助 |
-n | 是 | nameserver服务地址列表,格式ip:port;ip:port;… |
参数 | 是否必填 | 说明 |
---|---|---|
-g | 是 | 生产者所属组名 |
-h | 否 | 打印帮助 |
-n | 是 | nameserver服务地址列表,格式ip:port;ip:port;… |
参数 | 是否必填 | 说明 |
---|---|---|
-g | 是 | 生产者所属组名 |
-h | 否 | 打印帮助 |
-n | 是 | nameserver服务地址列表,格式ip:port;ip:port;… |
参数 | 是否必填 | 说明 |
---|---|---|
-m | 否 | 打印更多信息 |
-h | 否 | 打印帮助 |
-n | 是 | nameserver服务地址列表,格式ip:port;ip:port;… |
参数 | 是否必填 | 说明 |
---|---|---|
-k | 是 | key值 |
-v | 是 | value值 |
-s | 是 | namespace值 |
-h | 否 | 打印帮助 |
-n | 是 | nameserver服务地址列表,格式ip:port;ip:port;… |
参数 | 是否必填 | 说明 |
---|---|---|
-k | 是 | key值 |
-s | 是 | namespace值 |
-h | 否 | 打印帮助 |
-n | 是 | nameserver服务地址列表,格式ip:port;ip:port;… |
参数 | 是否必填 | 说明 |
---|---|---|
-p | 是 | project group名 |
-i | 否 | 服务器ip |
-h | 否 | 打印帮助 |
-n | 是 | nameserver服务地址列表,格式ip:port;ip:port;… |
参数 | 是否必填 | 说明 |
---|---|---|
-p | 是 | project group名 |
-i | 否 | 服务器ip |
-h | 否 | 打印帮助 |
-n | 是 | nameserver服务地址列表,格式ip:port;ip:port;… |
参数 | 是否必填 | 说明 |
---|---|---|
-p | 是 | project group名 |
-i | 否 | 服务器ip |
-h | 否 | 打印帮助 |
-n | 是 | nameserver服务地址列表,格式ip:port;ip:port;… |
根据时间来设置消费进度,设置之前要关闭这个订阅组的所有consumer,设置完再启动,⽅可⽣效
参数 | 是否必填 | 说明 |
---|---|---|
-f | 否 | 通过时间戳强制回滚(true|false),默认为true |
-s | 是 | 时间戳 |
-g | 是 | 消费者所属组名 |
-t | 是 | topic名称 |
-h | 否 | 打印帮助 |
-n | 是 | nameserver服务地址列表,格式ip:port;ip:port;… |
参数 | 是否必填 | 说明 |
---|---|---|
-b | 是 | broker地址 |
-h | 否 | 打印帮助 |
-n | 是 | nameserve服务地址列表,格式ip:port;ip:port |
该命令只打印当前与cluster 连接的consumer 的消费进度
参数 | 是否必填 | 说明 |
---|---|---|
-g | 是 | 消费者所属组名 |
-t | 是 | 查询主题 |
-i | 否 | Consumer客户端-p |
-h | 否 | 打印帮助 |
-n | 是 | nameserve服务地址列表,格式为ip:port;ip:port |
RocketMQ没有提供可视化管理控制平台,可以使⽤第三⽅管理控制平台:https://github.com/apache/rocketmq-externals/tree/rocketmq-console-1.0.0/rocketmq-console
下载管理控制平台,解压缩在linux服务器上,可以安装在服务器1上,给服务器安装maven环境
apt install maven
修改
rocketmq-externals/rocketmq-externals-master/rocketmq-console/src/main/resources/application.properties
配置⽂件中的nameserver地址
rocketmq.config.namesrvAddr=172.16.253.103:9876;172.16.253.101:9876;172.16.253.102:9876
回到
rocketmq-externals/rocketmq-externals-master/rocketmq-console
路径下执⾏maven命令进⾏打包
mvn clean package -Dmaven.test.skip=true
运⾏jar包。进⼊到
rocketmq-externals/rocketmq-externals-master/rocketmq-console/target
⽬录内执⾏如下命令:
nohup java -jar rocketmq-console-ng-1.0.1.jar
访问所在服务器的8080端⼝,查看集群界⾯,可以看到之前部署的集群
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。