当前位置:   article > 正文

SpringCloudAlibaba整合RocketMQ_springalibaba 集成 rocketmq

springalibaba 集成 rocketmq

1、下载RocketMQ,下载地址:下载 | RocketMQ

2、linux新建文件夹并解压下载的RocketMQ到文件夹中

使用命令 unzip -d /usr/local/rocketmq rocketmq-all-4.9.6-bin-release.zip 进行解压,如果不能使用unzip解压命令则可以输入命令 yum install -y unzip zip 进行安装

3、进入解压后的rocketmq目录,创建存放数据的目录

使用命令 mkdir -p store store/commitlog store/consumequeue

4、进入rocketmq解压目录中的conf目录,编辑 broker.conf 文件,在broker.conf文件内容最后加上如下配置:

listenPort=10911

namesrvAddr=127.0.0.1:9876

storePathRootDir=/usr/local/rocketmq/rocketmq-all-4.9.6-bin-release/store

storePathCommitLog=/usr/local/rocketmq/rocketmq-all-4.9.6-bin-release/store/commitlog

storePathConsumerQueue=/usr/local/rocketmq/rocketmq-all-4.9.6-bin-release/store/consumequeue

5、修改配置文件

(1)修改内存大小(自行根据部署的机器硬件实际情况选择修改或跳过)

进入rocketmq解压目录中的bin目录,编辑 runbroker.sh 文件和 runserver.sh 文件,修改内存大小

runserver.sh文件

runbroker.sh文件

(2)修改jdk路径

进入到解压后rocketMQ的bin目录,修改文件runbroker.sh、runserver.sh中的jdk路径

6、启动RocketMQ,步骤4配置的端口防火墙需要先开启

(1)先使用命令 nohup sh mqnamesrv & 启动namesever服务

(2)再使用命令 nohup sh mqbroker -c /usr/local/rocketmq/rocketmq-all-4.9.6-bin-release/conf/broker.conf & 启动broke服务(指定配置路径根据自己的安装路径来)

(3)输入命令jps查看nameserver服务和broke服务是否启动成功

(4)如果输入启动命令后jps没查看有对应的服务,可以在rocketmq的安装目录下的bin目录查看nohup.out文件的输出内容看看是否出现了错误异常

7、关闭RocketMQ

进入rocketmq的安装目录下的bin目录

(1)关闭 broker 命令 sh mqshutdown broker

(2)关闭 nameserver 命令 sh mqshutdown namesrv

8、安装RocketMQ控制台

(1)下载地址:https://github.com/apache/rocketmq-dashboard

(2)下载完后用idea打开项目

(3)修改配置文件application.yml,修改成部署的nameserver所在的ip地址

(4)启动项目,输入地址访问控制台

(5)可以打包成jar包部署在linux中

打包命令 mvn clean package -Dmaven.test.skip=true

9、spingboot项目pom中引入rocketmq依赖包

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.1.1</version>
</dependency>

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.9.6</version> //根据自己使用的版本来引入
</dependency>

(1)生产者application.yml配置文件

rocketmq:
  name-server: 192.168.200.100:9876 #nameserver的地址
  producer:
    # 生产组名
    group: demoGroup
    # 消息发送超时时间
    send-message-timeout: 3000
    # 消息体阈值,4k以上会压缩
    compress-message-body-threshold: 4096
    # 在同步模式下发送失败之前在内部执行的最大重试次数。
    retry-times-when-send-failed: 3
    # 在异步模式下发送失败之前在内部执行的最大重试次数。
    retry-times-when-send-async-failed: 3
    # 消息阈值,最大4MB,在 4KB 之内性能最佳
    max-message-size: 4096

(2)消费者application.yml配置文件

rocketmq:
  name-server: 192.168.200.100:9876 #nameserver的地址

(3)生产者发送消息代码

@Autowired
private RocketMQTemplate rocketMQTemplate;

@RequestMapping("/sendMq")
public String sendMq()
{
    TestRocketMq rocketMq = new TestRocketMq();
    rocketMq.setName("mq消息订单");
    rocketMq.setAge(56);
    rocketMQTemplate.convertAndSend("order_topic", rocketMq);
    return "success";
}

(4)消费者消费消息代码

/**
 *  consumerGroup = "demo-consumer-group",  // consumerGroup:消费者组名
 *  topic = "Demo",                         // topic:订阅的主题
 *  selectorExpression = "*",               // selectorExpression:控制可以选择的消息,可以使用SelectorType.SQL92语法。设置为 * 时,表示全部。
 *  messageModel = MessageModel.CLUSTERING  // messageModel: 控制消息模式。MessageModel.CLUSTERING:负载均衡;MessageModel.BROADCASTING:广播模式
 */
@Service
@RocketMQMessageListener(consumerGroup = "stockConsumer", topic = "order_topic")
public class RocketMqService implements RocketMQListener<TestRocketMq> {

    @Override
    public void onMessage(TestRocketMq testRocketMq) {
        System.out.println("消费消息-"+ testRocketMq);
    }
}

10、测试消息发送和消费

11、rocketmq集群部署

因为我用的是一台虚拟机来搭建集群,所以是个伪集群,但是真正的分布式集群思路也是一样的

(1)创建集群和节点文件夹

(2)配置nameserver集群,在节点目录node9801和node9802下分别创建namesrv.properties配置文件,编辑加上对应的端口

listenPort=9801

(3)配置broker集群,在rocketmq的解压目录的conf目录中有3种类型的集群部署配置方式,想要用哪一种就进去对应的目录下拷贝出来对应的配置文件到步骤(1)的集群节点目录中,我这里使用的是主从同步双写的方式(2m-2s-sync)

(4)编辑broker集群对应的配置文件

主节点 broker-a.properties

从节点 broker-a-s.properties

注意:这里的listenPort参数主从节点的端口号尽量不要使用相邻的,有些端口号可能会被主节点占用导致从节点不能绑定

其他节点也按照以上的配置来配置好,可根据实际情况参考下面的可配置参数进行配置

可配置的参数说明:

# 所属集群名字

brokerClusterName=myRocketmqCluster

# broker名字,注意此处不同的配置文件填写的不一样(按配置文件文件名来匹配)

brokerName=broker-a

# 0 表示Master, > 0 表示slave

brokerId=0

# 注册中心,可使用集群模式

namesrvAddr=192.168.0.129:9876;192.168.0.130:9876

# 在发送消息时,自动创建服务器不存在的Topic,默认创建的队列数

defaultTopicQueueNums=4

# 是否允许Broker 自动创建Topic,建议线下开启,线上关闭

autoCreateTopicEnable=true

# 是否允许Broker自动创建订阅组,建议线下开启,线上关闭

autoCreateSubscriptionGroup=true

# Broker 对外服务的监听端口

listenPort=10933

# 删除文件时间点,默认是凌晨4点

deleteWhen=04

# 文件保留时间,默认48小时

fileReservedTime=120

# commitLog每个文件的大小默认1G

# 附加:消息实际存储位置,和ConsumeQueue是mq的核心存储概念,之前搭建2m环境的时候创建在store下面,用于数据存储,consumequeue是一个逻辑的概念,消息过来之后,consumequeue并不是把消息所有保存起来,而是记录一个数据的位置,记录好之后再把消息存到commitlog文件里

mapedFileSizeCommitLog=1073741824

# ConsumeQueue每个文件默认存30W条,根据业务情况调整

mapedFileSizeConsumeQueue=300000

destroyMapedFileIntervalForcibly=120000

redeleteHangedFileInterval=120000

# 检测物理文件磁盘空间

diskMaxUsedSpaceRatio=88

# 存储路径

storePathRootDir=/usr/local/rocketMQ/store

# commitLog存储路径

storePathCommitLog=/usr/local/rocketMQ/commitlog

# 消费队列存储路径

storePathConsumeQueue=/usr/local/rocketMQ/consumequeue

# 消息索引存储路径

storePathIndex=/usr/local/rocketMQ/index

# checkpoint 文件存储路径

storeCheckpoint=/usr/local/rocketMQ/checkpoint

# abort 文件存储路径

abortFile=/usr/local/rocketMQ/abort

# 限制的消息大小

maxMessageSize=65536

flushCommitLogLeastPages=4

flushConsumeQueueLeastPages=2

flushCommitLogThoroughInterval=10000

flushConsumeQueueThoroughInterval=60000

# Broker 的角色

# ASYNC_MASTER 异步复制Master

# SYNC_MASTER 同步双写Master

# SLAVE

brokerRole=SYNC_MASTER

# 刷盘方式

# ASYNC_FLUSH 异步刷盘

# SYNC_FLUSH 同步刷盘

flushDiskType=ASYNC_FLUSH

checkTransactionMessageEnable=false

# 发消息线程池数量

sendMessageTreadPoolNums=128

# 拉消息线程池数量

pullMessageTreadPoolNums=128

(5)启动集群所有的nameserver和broker节点,进入到rocketmq解压目录的bin目录

启动nameserver 命令:

nohup sh mqnamesrv -c /usr/local/rocketmqcluster/node9801/namesrv.properties &

启动brokerServer命令:

nohup sh mqbroker -c /usr/local/rocketmqcluster/node9801/broker-a.properties &

nohup sh mqbroker -c /usr/local/rocketmqcluster/node9801/broker-a-s.properties &

2个nameserver服务,2个broker服务集群,1个broker服务集群由1个主节点和1个从节点组成

(6)使用rocketmq控制台查看集群情况,控制台项目的配置文件需要修改集群的nameserver地址

(7)客户端可配置多个nameserver地址

有问题和建议欢迎大家留言评论,谢谢~

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/712945
推荐阅读
相关标签
  

闽ICP备14008679号