当前位置:   article > 正文

RocketMq集群安装&整合Springboot_rocketmq springboot多个namesrv 怎么订阅

rocketmq springboot多个namesrv 怎么订阅

目录

一、RocketMQ介绍

二、名词解释

三、安装

安装方式

准备工作

传统方式搭建RocketMQ集群

四、整合Springboot


一、RocketMQ介绍

        RocketMQ是一款分布式消息中间件,最初是由阿里巴巴消息中间件团队研发并大规模应用于生产系统,满足线上海量消息堆积的需求, 在2016年底捐赠给Apache开源基金会成为孵化项目,经过不到一年时间正式成为了Apache顶级项目;早期阿里曾经基于ActiveMQ研发消息系统, 随着业务消息的规模增大,瓶颈逐渐显现,后来也考虑过Kafka,但因为在低延迟和高可靠性方面没有选择,最后才自主研发了RocketMQ, 各方面的性能都比目前已有的消息队列要好,RocketMQ和Kafka在概念和原理上都非常相似,所以也经常被拿来对比;RocketMQ默认采用长轮询的拉模式, 单机支持千万级别的消息堆积,可以非常好的应用在海量消息系统中。

二、名词解释

消息模型:RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实例构成。

消息生产者(Producer):负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。

消息消费者(Consumer):负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。

主题(Topic):表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。

代理服务器(Broker Server):消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

名字服务(Name Server):名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。

拉取式消费(Pull Consumer):Consumer消费的一种类型,应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。

推动式消费(Push Consumer):Consumer消费的一种类型,该模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。

生产者组(Producer Group):同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。

 消费者组(Consumer Group):同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。

 集群消费(Clustering):集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。

广播消费(Broadcasting):广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。

普通顺序消息(Normal Ordered Message):普通顺序消费模式下,消费者通过同一个消息队列( Topic 分区,称作 Message Queue) 收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。

严格顺序消息(Strictly Ordered Message):严格顺序消息模式下,消费者收到的所有消息均是有顺序的。

 消息(Message):消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过Message ID和Key查询消息的功能。

 标签(Tag):为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。

摘自官网 https://github.com/apache/rocketmq/blob/master/docs/cn/concept.md

三、安装

安装方式

单Master模式:这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。

多Master模式:

一个集群无Slave,全是Master。

  • 优点:配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;

  • 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。

多Master多Slave模式-异步复制:

每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级)。

  • 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样;

  • 缺点:Master宕机,磁盘损坏情况下会丢失少量消息。

多Master多Slave模式-同步双写:

每个Master配置一个Slave,有多对Master-Slave,HA(双机集群)采用同步双写方式,即只有主备都写成功,才向应用返回成功。:

  • 优点:数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;

  • 缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT(响应时间)会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。

本文以多Master多Slave模式-异步复制这种方式安装

准备工作

1、由于rocketmq是使用java语言编写,所以首先需要安装jdk环境,本文使用jdk17 安装参考 Linux系统下安装jdk17&jdk8安装_熟透的蜗牛的博客-CSDN博客_linux安装jdk17

2、集群示意图

两台服务器互为主从

传统方式搭建RocketMQ集群

1、下载安装包 版本4.9.2

Apache Downloads

2、上传到服务器,并解压,如果没有安装unzipan,安装指令

[root@localhost ~]# yum install unzip zip

3、解压文件

  1. #解压
  2. [root@localhost ~]# unzip rocketmq-all-4.9.2-bin-release.zip
  3. #移动到/usr/local 并重命名为rocketmq
  4. [root@localhost ~]# mv rocketmq-4.9.2 /usr/local/rocketmq

4、修改启动脚本 

注意:因为我的jdk是17,所以需要修改启动脚本,jdk8不需要修改,修改的地方为垃圾回收器,和启动参数(-Xms256m -Xmx256m -Xmn128m

runbroker.sh

  1. #!/bin/sh
  2. # Licensed to the Apache Software Foundation (ASF) under one or more
  3. # contributor license agreements. See the NOTICE file distributed with
  4. # this work for additional information regarding copyright ownership.
  5. # The ASF licenses this file to You under the Apache License, Version 2.0
  6. # (the "License"); you may not use this file except in compliance with
  7. # the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. #===========================================================================================
  17. # Java Environment Setting
  18. #===========================================================================================
  19. error_exit ()
  20. {
  21. echo "ERROR: $1 !!"
  22. exit 1
  23. }
  24. [ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java
  25. [ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
  26. [ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOMEvariable in your environment, We need java(x64)!"
  27. export JAVA_HOME
  28. export JAVA="$JAVA_HOME/bin/java"
  29. export BASE_DIR=$(dirname $0)/..
  30. export CLASSPATH=.${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib/*:${BASE_DIR}/conf:${CLASSPATH}
  31. #export CLASSPATH=${BASE_DIR}/lib/rocketmq-broker-4.5.0.jar:${BASE_DIR}/lib/*:${BASE_DIR}/conf:${CLASSPATH}
  32. #===========================================================================================
  33. # JVM Configuration
  34. #===========================================================================================
  35. JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
  36. JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0"
  37. JAVA_OPT="${JAVA_OPT} -verbose:gc -Xlog:gc:/dev/shm/mq_gc_%p.log -XX:+PrintGCDetails"
  38. JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
  39. JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch"
  40. JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=15g"
  41. JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking"
  42. #JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
  43. JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
  44. JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"
  45. numactl --interleave=all pwd > /dev/null 2>&1
  46. if [ $? -eq 0 ]
  47. then
  48. if [ -z "$RMQ_NUMA_NODE" ] ; then
  49. numactl --interleave=all $JAVA ${JAVA_OPT} $@
  50. else
  51. numactl --cpunodebind=$RMQ_NUMA_NODE --membind=$RMQ_NUMA_NODE $JAVA${JAVA_OPT} $@
  52. fi
  53. else
  54. $JAVA ${JAVA_OPT} --add-exports=java.base/jdk.internal.ref=ALL-UNNAMED $@
  55. fi

runserver.sh

  1. #!/bin/sh
  2. # Licensed to the Apache Software Foundation (ASF) under one or more
  3. # contributor license agreements. See the NOTICE file distributed with
  4. # this work for additional information regarding copyright ownership.
  5. # The ASF licenses this file to You under the Apache License, Version 2.0
  6. # (the "License"); you may not use this file except in compliance with
  7. # the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. #===========================================================================================
  17. # Java Environment Setting
  18. #===========================================================================================
  19. error_exit ()
  20. {
  21. echo "ERROR: $1 !!"
  22. exit 1
  23. }
  24. [ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java
  25. [ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
  26. [ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOMEvariable in your environment, We need java(x64)!"
  27. export JAVA_HOME
  28. export JAVA="$JAVA_HOME/bin/java"
  29. export BASE_DIR=$(dirname $0)/..
  30. export CLASSPATH=.:${BASE_DIR}/conf:${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib/*
  31. #===========================================================================================
  32. # JVM Configuration
  33. #===========================================================================================
  34. JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=64m -XX:MaxMetaspaceSize=160m"
  35. # jdk17 可能丢弃了CMS垃圾回收器,需要使用G1收集器
  36. JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0"
  37. # JAVA_OPT="${JAVA_OPT} -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8"
  38. JAVA_OPT="${JAVA_OPT} -verbose:gc -Xlog:gc:/dev/shm/rmq_srv_gc.log -XX:+PrintGCDetails"
  39. JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
  40. JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages"
  41. # JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib"
  42. #JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
  43. JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
  44. JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"
  45. $JAVA ${JAVA_OPT} $@

tools.sh 

  1. #!/bin/sh
  2. # Licensed to the Apache Software Foundation (ASF) under one or more
  3. # contributor license agreements. See the NOTICE file distributed with
  4. # this work for additional information regarding copyright ownership.
  5. # The ASF licenses this file to You under the Apache License, Version 2.0
  6. # (the "License"); you may not use this file except in compliance with
  7. # the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. #===========================================================================================
  17. # Java Environment Setting
  18. #===========================================================================================
  19. error_exit ()
  20. {
  21. echo "ERROR: $1 !!"
  22. exit 1
  23. }
  24. [ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java
  25. [ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
  26. [ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"
  27. export JAVA_HOME
  28. export JAVA="$JAVA_HOME/bin/java"
  29. export BASE_DIR=$(dirname $0)/..
  30. export CLASSPATH=${BASE_DIR}/lib/*:${BASE_DIR}/conf:.:${CLASSPATH}
  31. export CLASSPATH=.${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib/*:${BASE_DIR}/conf:${CLASSPATH}
  32. #export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH}
  33. #echo "BASE_DIR:$BASE_DIR"
  34. #echo "CLASSPATH:$CLASSPATH"
  35. #===========================================================================================
  36. # JVM Configuration
  37. #===========================================================================================
  38. JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn256m -XX:PermSize=128m-XX:MaxPermSize=128m"
  39. JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"
  40. $JAVA ${JAVA_OPT} $@

 5、修改服务器1上的配置文件

broker-a.properties

  1. # Licensed to the Apache Software Foundation (ASF) under one or more
  2. # contributor license agreements. See the NOTICE file distributed with
  3. # this work for additional information regarding copyright ownership.
  4. # The ASF licenses this file to You under the Apache License, Version 2.0
  5. # (the "License"); you may not use this file except in compliance with
  6. # the License. You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #所属集群名字,同一个集群名字相同
  16. brokerClusterName=rocketmq-cluster
  17. #broker名字
  18. brokerName=broker-a
  19. #0表示master >0 表示slave
  20. brokerId=0
  21. #删除文件的时间点,凌晨4
  22. deleteWhen=04
  23. #文件保留时间 默认是48小时
  24. fileReservedTime=168
  25. #异步复制Master
  26. brokerRole=ASYNC_MASTER
  27. #刷盘方式,ASYNC_FLUSH=异步刷盘,SYNC_FLUSH=同步刷盘
  28. flushDiskType=ASYNC_FLUSH
  29. #Broker 对外服务的监听端口
  30. listenPort=10911
  31. #nameServer地址,这里nameserver是单台,如果nameserver是多台集群的话,就用分号分割(即namesrvAddr=ip1:port1;ip2:port2;ip3:port3
  32. namesrvAddr=192.168.6.145:9876;192.168.6.146:9876
  33. #每个topic对应队列的数量,默认为4,实际应参考consumer实例的数量,值过小不利于consumer负载均衡
  34. defaultTopicQueueNums=8
  35. #是否允许 Broker 自动创建Topic,生产建议关闭
  36. autoCreateTopicEnable=true
  37. #是否允许 Broker 自动创建订阅组,生产建议关闭
  38. autoCreateSubscriptionGroup=true
  39. #设置BrokerIP 服务器1的ip地址
  40. brokerIP1=192.168.6.145
  41. #存储路径
  42. storePathRootDir=/data/rocketmq/store-a
  43. #commitLog 存储路径
  44. storePathCommitLog=/data/rocketmq/store-a/commitlog
  45. #消费队列存储路径存储路径
  46. storePathConsumerQueue=/data/rocketmq/store-a/consumequeue
  47. #消息索引存储路径
  48. storePathIndex=/data/rocketmq/store-a/index
  49. #checkpoint 文件存储路径
  50. storeCheckpoint=/data/rocketmq/store-a/checkpoint
  51. #abort 文件存储路径
  52. abortFile=/data/rocketmq/store-a/abort
  53. #commitLog每个文件的大小默认1G
  54. mapedFileSizeCommitLog=1073741824
  55. #ConsumeQueue每个文件默认存30W条,根据业务情况调整
  56. mapedFileSizeConsumeQueue=300000

broker-b-s.properties

  1. # Licensed to the Apache Software Foundation (ASF) under one or more
  2. # contributor license agreements. See the NOTICE file distributed with
  3. # this work for additional information regarding copyright ownership.
  4. # The ASF licenses this file to You under the Apache License, Version 2.0
  5. # (the "License"); you may not use this file except in compliance with
  6. # the License. You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. brokerClusterName=rocketmq-cluster
  16. brokerName=broker-b
  17. #slave
  18. brokerId=1
  19. deleteWhen=04
  20. fileReservedTime=168
  21. brokerRole=SLAVE
  22. flushDiskType=ASYNC_FLUSH
  23. #Broker 对外服务的监听端口
  24. listenPort=11011
  25. #nameServer地址,这里nameserver是单台,如果nameserver是多台集群的话,就用分号分割(即namesrvAddr=ip1:port1;ip2:port2;ip3:port3
  26. namesrvAddr=192.168.6.145:9876;192.168.6.146:9876
  27. #每个topic对应队列的数量,默认为4,实际应参考consumer实例的数量,值过小不利于consumer负载均衡
  28. defaultTopicQueueNums=8
  29. #是否允许 Broker 自动创建Topic,生产建议关闭
  30. autoCreateTopicEnable=true
  31. #是否允许 Broker 自动创建订阅组,生产建议关闭
  32. autoCreateSubscriptionGroup=true
  33. #设置BrokerIP
  34. brokerIP1=192.168.6.145
  35. #存储路径
  36. storePathRootDir=/data/rocketmq/store-b
  37. #commitLog 存储路径
  38. storePathCommitLog=/data/rocketmq/store-b/commitlog
  39. #消费队列存储路径存储路径
  40. storePathConsumerQueue=/data/rocketmq/store-b/consumequeue
  41. #消息索引存储路径
  42. storePathIndex=/data/rocketmq/store-b/index
  43. #checkpoint 文件存储路径
  44. storeCheckpoint=/data/rocketmq/store-b/checkpoint
  45. #abort 文件存储路径
  46. abortFile=/data/rocketmq/store-b/abort
  47. #commitLog每个文件的大小默认1G
  48. mapedFileSizeCommitLog=1073741824
  49. #ConsumeQueue每个文件默认存30W条,根据业务情况调整
  50. mapedFileSizeConsumeQueue=300000

6、修改服务器2上的配置文件

broker-b.properties

  1. # Licensed to the Apache Software Foundation (ASF) under one or more
  2. # contributor license agreements. See the NOTICE file distributed with
  3. # this work for additional information regarding copyright ownership.
  4. # The ASF licenses this file to You under the Apache License, Version 2.0
  5. # (the "License"); you may not use this file except in compliance with
  6. # the License. You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #所属集群名字
  16. brokerClusterName=rocketmq-cluster
  17. #broker名字,注意此处不同的配置文件填写的不一样 例如:在a.properties 文件中写 broker-a 在b.properties 文件中写 broker-b
  18. brokerName=broker-b
  19. #0 表示 Master,>0 表示 Slave
  20. brokerId=0
  21. #删除文件时间点,默认凌晨 4
  22. deleteWhen=04
  23. #文件保留时间,默认 48 小时
  24. fileReservedTime=168
  25. #Broker 的角色,ASYNC_MASTER=异步复制Master,SYNC_MASTER=同步双写Master,SLAVE=slave节点
  26. brokerRole=ASYNC_MASTER
  27. #刷盘方式,ASYNC_FLUSH=异步刷盘,SYNC_FLUSH=同步刷盘
  28. flushDiskType=SYNC_FLUSH
  29. #Broker 对外服务的监听端口
  30. listenPort=10911
  31. #nameServer地址,这里nameserver是单台,如果nameserver是多台集群的话,就用分号分割(即namesrvAddr=ip1:port1;ip2:port2;ip3:port3
  32. namesrvAddr=192.168.6.145:9876;192.168.6.146:9876
  33. #每个topic对应队列的数量,默认为4,实际应参考consumer实例的数量,值过小不利于consumer负载均衡
  34. defaultTopicQueueNums=8
  35. #是否允许 Broker 自动创建Topic,生产建议关闭
  36. autoCreateTopicEnable=true
  37. #是否允许 Broker 自动创建订阅组,生产建议关闭
  38. autoCreateSubscriptionGroup=true
  39. #设置BrokerIP
  40. brokerIP1=192.168.6.146
  41. #存储路径
  42. storePathRootDir=/data/rocketmq/store-b
  43. #commitLog 存储路径
  44. storePathCommitLog=/data/rocketmq/store-b/commitlog
  45. #消费队列存储路径存储路径
  46. storePathConsumerQueue=/data/rocketmq/store-b/consumequeue
  47. #消息索引存储路径
  48. storePathIndex=/data/rocketmq/store-b/index
  49. #checkpoint 文件存储路径
  50. storeCheckpoint=/data/rocketmq/store-b/checkpoint
  51. #abort 文件存储路径
  52. abortFile=/data/rocketmq/store-b/abort
  53. #commitLog每个文件的大小默认1G
  54. mapedFileSizeCommitLog=1073741824
  55. #ConsumeQueue每个文件默认存30W条,根据业务情况调整
  56. mapedFileSizeConsumeQueue=300000

broker-a-s.properties

  1. # Licensed to the Apache Software Foundation (ASF) under one or more
  2. # contributor license agreements. See the NOTICE file distributed with
  3. # this work for additional information regarding copyright ownership.
  4. # The ASF licenses this file to You under the Apache License, Version 2.0
  5. # (the "License"); you may not use this file except in compliance with
  6. # the License. You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #所属集群名字
  16. brokerClusterName=rocketmq-cluster
  17. #broker名字,注意此处不同的配置文件填写的不一样 例如:在a.properties 文件中写 broker-a 在b.properties 文件中写 broker-b
  18. brokerName=broker-a
  19. #0 表示 Master,>0 表示 Slave
  20. brokerId=1
  21. #删除文件时间点,默认凌晨 4
  22. deleteWhen=04
  23. #文件保留时间,默认 48 小时
  24. fileReservedTime=168
  25. #Broker 的角色,ASYNC_MASTER=异步复制Master,SYNC_MASTER=同步双写Master,SLAVE=slave节点
  26. brokerRole=SLAVE
  27. #刷盘方式,ASYNC_FLUSH=异步刷盘,SYNC_FLUSH=同步刷盘
  28. flushDiskType=SYNC_FLUSH
  29. #Broker 对外服务的监听端口
  30. listenPort=11011
  31. #nameServer地址,这里nameserver是单台,如果nameserver是多台集群的话,就用分号分割(即namesrvAddr=ip1:port1;ip2:port2;ip3:port3
  32. namesrvAddr=192.168.6.145:9876;192.168.6.146:9876
  33. #每个topic对应队列的数量,默认为4,实际应参考consumer实例的数量,值过小不利于consumer负载均衡
  34. defaultTopicQueueNums=8
  35. #是否允许 Broker 自动创建Topic,生产建议关闭
  36. autoCreateTopicEnable=true
  37. #是否允许 Broker 自动创建订阅组,生产建议关闭
  38. autoCreateSubscriptionGroup=true
  39. #设置BrokerIP
  40. brokerIP1=192.168.6.146
  41. #存储路径
  42. storePathRootDir=/data/rocketmq/store-a
  43. #commitLog 存储路径
  44. storePathCommitLog=/data/rocketmq/store-a/commitlog
  45. #消费队列存储路径存储路径
  46. storePathConsumerQueue=/data/rocketmq/store-a/consumequeue
  47. #消息索引存储路径
  48. storePathIndex=/data/rocketmq/store-a/index
  49. #checkpoint 文件存储路径
  50. storeCheckpoint=/data/rocketmq/store-a/checkpoint
  51. #abort 文件存储路径
  52. abortFile=/data/rocketmq/store-a/abort
  53. #commitLog每个文件的大小默认1G
  54. mapedFileSizeCommitLog=1073741824
  55. #ConsumeQueue每个文件默认存30W条,根据业务情况调整
  56. mapedFileSizeConsumeQueue=300000

7、启动nameserver

进入 [root@localhost ~]# cd /usr/local/rocketmq/bin/ 目录 

启动 nohup sh mqnamesrv &

检验是否安装成功

tail -f ~/logs/rocketmqlogs/namesrv.log

如果此时报错

/usr/local/rocketmq/bin/runserver.sh: line 19: syntax error near unexpected token `$'\r''
'usr/local/rocketmq/bin/runserver.sh: line 19: `error_exit ()

解决方法,拷贝的文件格式错乱使用下面命令乎看到文件后面多了^M
[root@bogon bin]# vim -b runserver.sh 

去掉^M 再重新启动
[root@bogon bin]# sed -i 's/\r//g' runserver.sh  

8、启动broker

启动 服务器1上的master

​​​​​​​[root@bogon rocketmq]# nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a.properties &

启动服务器2上的master

[root@localhost rocketmq]# nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b.properties &

 启动服务器1上的slave

[root@bogon rocketmq]# nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b-s.properties &

启动服务器2上的slave

  1. [root@localhost rocketmq]# nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a-s.properties &

9、搭建可视化管理界面

maven打包安装不细说

mvn clean package -Dmaven.test.skip=true

java -jar target/rocketmq-console-ng-1.0.1.jar

下载安装包  Rocketmq可视化工具-Web服务器文档类资源-CSDN下载

​​​​​​​

四、整合Springboot

maven依赖

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-spring-boot-starter</artifactId>
  4. <version>2.2.1</version>
  5. </dependency>

application.yml

  1. server:
  2. port: 8090
  3. rocketmq:
  4. name-server: 192.168.6.145:9876;192.168.6.146:9876
  5. producer:
  6. group: rocketmq-producer

 生产者

  1. package com.xiaojie.rocket.rocket.producer;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.rocketmq.client.producer.SendCallback;
  4. import org.apache.rocketmq.client.producer.SendResult;
  5. import org.apache.rocketmq.spring.core.RocketMQTemplate;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.stereotype.Service;
  8. /**
  9. * @Description: 发送消息:同步消息、异步消息和单向消息。其中前两种消息是可靠的,因为会有发送是否成功的应答
  10. * @author: yan
  11. * @date: 2021.11.08
  12. */
  13. @Service
  14. @Slf4j
  15. public class MqProducer {
  16. @Autowired
  17. private RocketMQTemplate rocketMQTemplate;
  18. /**
  19. * @description: 这种方式主要用在不特别关心发送结果的场景,例如日志发送。
  20. * @param:
  21. * @return: void
  22. * @author xiaojie
  23. * @date: 2021/11/9 23:39
  24. */
  25. public void sendMq() {
  26. for (int i = 0; i < 10; i++) {
  27. rocketMQTemplate.convertAndSend("xiaojie-test", "测试发送消息》》》》》》》》》" + i);
  28. }
  29. }
  30. /**
  31. * @description: 这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
  32. * @param:
  33. * @return: void
  34. * @author xiaojie
  35. * @date: 2021/11/10 22:25
  36. */
  37. public void sync() {
  38. SendResult sendResult = rocketMQTemplate.syncSend("xiaojie-test", "sync发送消息。。。。。。。。。。");
  39. log.info("发送结果{}", sendResult);
  40. }
  41. /**
  42. * @description: 异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
  43. * @param:
  44. * @return: void
  45. * @author xiaojie
  46. * @date: 2021/11/10 22:29
  47. */
  48. public void async() {
  49. String msg = "异步发送消息。。。。。。。。。。";
  50. log.info(">msg:<<" + msg);
  51. rocketMQTemplate.asyncSend("xiaojie-test", msg, new SendCallback() {
  52. @Override
  53. public void onSuccess(SendResult var1) {
  54. log.info("异步发送成功{}", var1);
  55. }
  56. @Override
  57. public void onException(Throwable var1) {
  58. //发送失败可以执行重试
  59. log.info("异步发送失败{}", var1);
  60. }
  61. });
  62. }
  63. }

消费者

  1. package com.xiaojie.rocket.rocket.consumer;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
  4. import org.apache.rocketmq.spring.core.RocketMQListener;
  5. import org.springframework.stereotype.Component;
  6. /**
  7. * @Description:
  8. * @author: yan
  9. * @date: 2021.11.08
  10. */
  11. @RocketMQMessageListener(consumerGroup = "test-group", topic = "xiaojie-test")
  12. @Slf4j
  13. @Component
  14. public class MqConsumer implements RocketMQListener<String> {
  15. @Override
  16. public void onMessage(String message) {
  17. log.info("接收到的数据是:{}", message);
  18. }
  19. }

参考:https://blog.csdn.net/javahongxi/article/details/84931747

完整代码参考: spring-boot: Springboot整合redis、消息中间件等相关代码

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/713017
推荐阅读
相关标签
  

闽ICP备14008679号