赞
踩
目录
8.4.3 创建(修订)订阅组:updateSubGroup
8.4.5 更新Broker 配置⽂件:updateBrokerConfig
8.4.9 查看Broker 统计信息:brokerStats
8.4.10 根据消息ID 查询消息:queryMsgById
8.4.11 根据消息Key 查询消息:queryMsgByKey
8.4.12 根据Offset 查询消息:queryMsgByOffset
8.4.13 查询Producer 的⽹络连接:producerConnection
8.4.14 查询Consumer 的⽹络连接:consumerConnection
8.4.15 查看订阅组消费状态:consumerProgress
8.4.17 添加(更新)KV 配置信息:updateKvConfig
8.4.18 删除KV 配置信息:deleteKvConfig
8.4.19 添加(更新)Project group 配置信息:updateProjectGroup
8.4.20 删除Project group 配置信息:deleteProjectGroup
8.4.21 取得Project group 配置信息:getProjectGroup
8.4.22 设置消费进度:resetOffsetByTime
8.4.23 清除特定Broker权限:wipeWritePerm
8.4.24 获取Consumer消费进度:getConsumerStatus
不会的可以去网上找教程,也可以看我写的这两篇文章:
安装VMware虚拟机、Linux系统(CentOS7)_何苏三月的博客-CSDN博客
Linux环境下安装JDK1.8_何苏三月的博客-CSDN博客
官网下载地址:
https://rocketmq.apache.org/download
这里,我选择的版本是4.7.1。你也可以选择其他版本。
下载完成后,我们上传安装的压缩包到虚拟机上。
开始解压,由于是zip格式的压缩包,所以得输入如下命令解压
unzip -d /opt/software/rocketmq/ rocketmq-all-4.7.1-bin-release.zip
-d 用来指定解压路径。
ok,解压完成。你可以选择删除掉刚才的压缩包了。
其实,你在window下使用也是ok的,而且它的bin目录下也带有window的可执行命令。
jdk我们需要配置环境变量,同样rocketmq也需要!
我们进入/etc/profile文件,添加配置:
让环境变量生效,执行source /etc/profile
ok,到这里安装就搞定了。
首先要启动NameServer,它是服务注册中心,相当于zookeeper,nacos。
然后启动Borker。启动Borker的时候,我们先不启动集群,使用单机版启动即可。
当然了,rocketmq正常使用都是集群模式。
ps:如果你的虚拟机内存很小,或者可使用的内存不多了。我们可以修改一下rocketmq的内存。
首先修改nameserver的内存:
然后修改broker的内存:
nohup ./mqnamesrv -n 192.168.17.160:9876 &
启动之前,在 conf/broker.conf ⽂件中加⼊如下配置,开启⾃动创建Topic功能。
这是为了方便我们后面测试发送消息演示。它会自动创建一个topic,不用我们自己创建了。
启动broker
nohup ./mqbroker -n 192.168.17.160:9876 &
首先配置一下nameserver的环境变量
使⽤bin/tools.sh⼯具验证消息的发送,默认会发1000条消息
./tools.sh org.apache.rocketmq.example.quickstart.Producer
使⽤bin/tools.sh⼯具验证消息的接收
./tools.sh org.apache.rocketmq.example.quickstart.Consumer
- ./mqshutdown broker
- ./mqshutdown namesrv
ok,至此单机版本的rocketmq就安装完成了。
当然为了高可用,我们都会使用集群版,这个后面有时间也会再补充~
遇到的问题:
broker使用的IP一般是本机IP地址,默认系统自动识别,但是某些多网卡机器会存在识别错误的情况,导致无法识别到正确的本地IP地址,从而导致broker启动是使用了内网IP。
比如172.17.0.1,这就导致我们本地运行Java项目时,找不到这台服务器。
Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException:...
因此,我们需要手动修改一下broker.conf配置文件
步骤
编辑"/rocketmq-all-4.4.0-bin-release/conf/broker.conf" 文件
brokerIP1=xx.xx.xx.xx # 你的公网IP,这个写你当前linux机器的ip地址
然后重启 BrokerStartup,记得先杀死你的BrokerStartup进程
为了追求更好的性能,RocketMQ的最佳实践方式都是在集群模式下完成。
RocketMQ官方提供了三种集群搭建方式:
2主2从异步通信方式
使用异步方式进行主从之间的数据复制,吞吐量大,但可能会丢消息。
使用 conf/2m-2s-async 文件夹内的配置文件做集群配置。
2主2从同步通信方式
使用同步方式进行主从之间的数据复制,保证消息安全投递,不会丢失,但影响吞吐量
使用 conf/2m-2s-sync 文件夹内的配置⽂件做集群配置。
2主无从方式
会存在单点故障,且读的性能没有前两种方式好。
使用 conf/2m-noslave 文件夹内的配置文件做集群配置。
上述三种官方提供的集群没办法实现高可用,即在master节点挂掉后,slave节点没办法自动被选举为新的master,而需要人工实现。RocketMQ在4.5版本之后引入了第三方的Dleger高可用集群。
三台Linux服务器中nameserver和broker之间的关系如下:
三台服务器都需要安装jdk和rocketmq,安装步骤参考上⼀章节。
我们现在有一台了,所以可以直接克隆两台~
不会克隆的可以参考这篇文章:克隆Linux系统(centos)_何苏三月的博客-CSDN博客
nameserver是⼀个轻量级的注册中心,broker把自己的信息注册到nameserver上。
而且,nameserver是无状态的,直接启动即可。三台nameserver之间不需要通信,而是被请求方来关联三台nameserver的地址。
修改三台服务器的的runserver.sh⽂件
修改JVM内存默认的4g为512m。
- JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -
- XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
在每台服务器的bin⽬录下执⾏如下命令
服务器1
nohup ./mqnamesrv -n 192.168.17.100:9876 &
服务器2
nohup ./mqnamesrv -n 192.168.17.101:9876 &
服务器3
nohup ./mqnamesrv -n 192.168.17.102:9876 &
启动之后,通过cat nohup.out 如果每台都显示成功则表示启动成功
ps:这三台nameserver是无状态,互相间没有关联,各自启动各自的,互相间不需要通信。
broker-a,broker-b-s这两台broker是配置在服务器2上,broker-b,broker-a-s这两台broker是配置在服务器3上。这两对主从节点在不同的服务器上,服务器1上没有部署broker。
需要修改每台broker的配置⽂件。注意,同⼀台服务器上的两个broker保存路径不能⼀样。
broker-a的master节点
在服务器2上,进⼊到conf/2m-2s-async⽂件夹内,修改broker-a.properties⽂件。
- # 所属集群名称
- brokerClusterName=DefaultCluster
- # broker名字
- brokerName=broker-a
- # broker所在服务器的ip
- brokerIP1=192.168.17.101
- # broker的id,0表示master,>0表示slave
- brokerId=0
- # 删除⽂件时间点,默认在凌晨4点
- deleteWhen=04
- # ⽂件保留时间为48⼩时
- fileReservedTime=48
- # broker的⻆⾊为master
- brokerRole=ASYNC_MASTER
- # 使⽤异步刷盘的⽅式
- flushDiskType=ASYNC_FLUSH
- # 名称服务器的地址列表
- namesrvAddr=192.168.17.100:9876;192.168.17.101:9876;192.168.17.102
- :9876
- # 在发送消息⾃动创建不存在的topic时,默认创建的队列数为4个
- defaultTopicQueueNums=4
- # 是否允许 Broker ⾃动创建Topic,建议线下开启,线上关闭
- autoCreateTopicEnable=true
- # 是否允许 Broker ⾃动创建订阅组,建议线下开启,线上关闭
- autoCreateSubscriptionGroup=true
- # broker对外服务的监听端⼝
- listenPort=10911
- # abort⽂件存储路径
- abortFile=/usr/local/rocketmq/store/abort
- # 消息存储路径
- storePathRootDir=/usr/local/rocketmq/store
- # commitLog存储路径
- storePathCommitLog=/usr/local/rocketmq/store/commitlog
- # 消费队列存储路径
- storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
- # 消息索引存储路径
- storePathIndex=/usr/local/rocketmq/store/index
- # checkpoint⽂件存储路径
- storeCheckpoint=/usr/local/rocketmq/store/checkpoint
- # 限制的消息⼤⼩
- maxMessageSize=65536
- # commitLog每个⽂件的⼤⼩默认1G
- mapedFileSizeCommitLog=1073741824
- # ConsumeQueue每个⽂件默认存30W条,根据业务情况调整
- mapedFileSizeConsumeQueue=300000
broker-a的slave节点
在服务器3上,进⼊到conf/2m-2s-async⽂件夹内,修改broker-a-s.properties⽂件。
- brokerClusterName=DefaultCluster
- brokerName=broker-a
- brokerIP1=192.168.17.102
- brokerId=1
- deleteWhen=04
- fileReservedTime=48
- brokerRole=SLAVE
- flushDiskType=ASYNC_FLUSH
- namesrvAddr=192.168.17.100:9876;192.168.17.101:9876;192.168.17.102:9876
- defaultTopicQueueNums=4
- autoCreateTopicEnable=true
- autoCreateSubscriptionGroup=true
- listenPort=11011
- abortFile=/usr/local/rocketmq/store-slave/abort
- storePathRootDir=/usr/local/rocketmq/store-slave
- storePathCommitLog=/usr/local/rocketmq/store-slave/commitlog
- storePathConsumeQueue=/usr/local/rocketmq/store-slave/consumequeue
- storePathIndex=/usr/local/rocketmq/store-slave/index
- storeCheckpoint=/usr/local/rocketmq/store-slave/checkpoint
- maxMessageSize=65536
broker-b的master节点
在服务器3上,进⼊到conf/2m-2s-async⽂件夹内,修改broker-b.properties⽂件。
- brokerClusterName=DefaultCluster
- brokerName=broker-b
- brokerIP1=192.168.17.102
- brokerId=0
- deleteWhen=04
- fileReservedTime=48
- brokerRole=ASYNC_MASTER
- flushDiskType=ASYNC_FLUSH
- namesrvAddr=192.168.17.100:9876;192.168.17.101:9876;192.168.17.102:9876
- defaultTopicQueueNums=4
- autoCreateTopicEnable=true
- autoCreateSubscriptionGroup=true
- listenPort=10911
- abortFile=/usr/local/rocketmq/store/abort
- storePathRootDir=/usr/local/rocketmq/store
- storePathCommitLog=/usr/local/rocketmq/store/commitlog
- storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
- storePathIndex=/usr/local/rocketmq/store/index
- storeCheckpoint=/usr/local/rocketmq/store/checkpoint
- maxMessageSize=65536
broker-b的slave节点
在服务器2上,进⼊到conf/2m-2s-async⽂件夹内,修改broker-b-s.properties⽂件。
- brokerClusterName=DefaultCluster
- brokerName=broker-b
- brokerIP1=192.168.17.101
- brokerId=1
- deleteWhen=04
- fileReservedTime=48
- brokerRole=SLAVE
- flushDiskType=ASYNC_FLUSH
- namesrvAddr=192.168.17.100:9876;192.168.17.101:9876;192.168.17.102:9876
- defaultTopicQueueNums=4
- autoCreateTopicEnable=true
- autoCreateSubscriptionGroup=true
- listenPort=11011
- abortFile=/usr/local/rocketmq/store-slave/abort
- storePathRootDir=/usr/local/rocketmq/store-slave
- storePathCommitLog=/usr/local/rocketmq/store-slave/commitlog
- storePathConsumeQueue=/usr/local/rocketmq/store-slave/consumequeue
- storePathIndex=/usr/local/rocketmq/store-slave/index
- storeCheckpoint=/usr/local/rocketmq/store-slave/checkpoint
- maxMessageSize=65536
修改服务器2和服务器3的runbroker.sh文件
修改JVM内存默认的8g为512m。
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"
在服务器2中启动broker-a(master)和broker-b-s(slave)
- nohup ./mqbroker -c ../conf/2m-2s-async/broker-a.properties &
- nohup ./mqbroker -c ../conf/2m-2s-async/broker-b-s.properties &
在服务器3中启动broker-b(master),broker-a-s(slave)
- nohup ./mqbroker -c ../conf/2m-2s-async/broker-b.properties &
- nohup ./mqbroker -c ../conf/2m-2s-async/broker-a-s.properties &
使用RocketMQ提供的tools工具验证集群是否正常工作。
在服务器2上配置环境变量用于被tools中的生产者和消费者程序读取该变量。
export NAMESRV_ADDR='192.168.241.100:9876;192.168.241.101:9876;192.168.241.102:9876'
启动生产者
./tools.sh org.apache.rocketmq.example.quickstart.Producer
RocketMQ提供了命令工具用于管理topic、broker、集群、消息等。比如可以使用mqadmin创建topic:
./mqadmin updateTopic -n 192.168.17.101:9876 -c DefaultCluster -t myTopic1
下面提供了mqadmin工具的各种命令。
根据时间来设置消费进度,设置之前要关闭这个订阅组的所有consumer,设置完再启动,方可生效。
该命令只打印当前与cluster 连接的consumer 的消费进度
以上列举了mqadmin的常见的一些命令,有需要的可以对照着使用。
这里再举例说明,比如我想查看一下mq的集群。
通过上面提供的mqadmin工具我们发现,记住这些命令还是蛮复杂的。操作起来不是很方便,那么有没有可视化界面供我们直接操作呢,答案是有的!
但是RocketMQ没有提供可视化管理控制平台,可以使用第三方管理控制平台:
https://github.com/apache/rocketmq-externals/tree/rocketmq-console-1.0.0/rocketmq-console
可以安装在服务器1上
由于下载下来的是一个springboot项目,所以我们需要先修改一下它里面的设置,然后打成jar包再发送到服务器上去。
先用idea打开该项目,然后修改application.properties文件。
将nameserver的地址填上去,如下图所示
然后我们用maven把它达成jar包,并上传到服务器1上,当然这个你上传到哪台都行
然后运行该jar包
java -jar rocketmq-console-ng-1.0.0.jar
测试一下,发现没有数据!
出现这种问题的原因可能是防火墙打开了。
请确保nameserver和broker的端口5777/10911(默认)能够被访问。如果防火墙挡住了请求,您将无法查看集群的任何信息。
我们可以检查一下防火墙,然后将每台服务器防火墙都关闭,再看看。就ok了。
rocketmq的单机部署和集群部署就讲到这里了。感谢你的耐心观看~
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。