当前位置:   article > 正文

linux 安装 RocketMQ_linux安装rocketmq

linux安装rocketmq

RocketMQ 因其架构简单、业务功能丰富、具备极强可扩展性等特点被众多企业开发者以及云厂商广泛采用。历经十余年的大规模场景打磨,RocketMQ 已经成为业内共识的金融级可靠业务消息首选方案,被广泛应用于互联网、大数据、移动互联网、物联网等领域的业务场景

目录

1、官网下载 RocketMQ

2、安装 RocketMQ

3、启动 RocketMQ

4、代码测试

5、RocketMQ Dashboard 安装

6、关闭 RocketMQ


1、官网下载 RocketMQ

RocketMQ 官网:https://rocketmq.apache.org/

 笔者这里选择 4.9.4版本

2、安装 RocketMQ

下载安装包后,将安装包上传到 linux

创建 RocketMQ 安装目录

mkdir -p /usr/local/rocketmq

将安装包解压到新建的安装目录

unzip -d /usr/local/rocketmq rocketmq-all-4.9.4-bin-release.zip

如果没有安装 unzip,可以执行下面命令安装

yum install -y unzip zip 

 进入 rocketmq 安装目类

cd /usr/local/rocketmq/rocketmq-all-4.9.4-bin-release

创建存放数据的目录

mkdir -p store store/commitlog store/consumequeue

进入 conf 目录

cd conf

编辑 broker.conf 文件

vi broker.conf

 在  broker.conf 文件中追加下面内容

  1. listenPort=10911
  2. namesrvAddr=localhost:9876
  3. storePathRootDir=/usr/local/rocketmq/rocketmq-all-4.9.4-bin-release/store
  4. storePathCommitLog=/usr/local/rocketmq/rocketmq-all-4.9.4-bin-release/store/commitlog
  5. storePathConsumerQueue=/usr/local/rocketmq/rocketmq-all-4.9.4-bin-release/store/consumequeue

修改内存大小(这步不是必须的,如果你的机器内存足够,可以跳过这步,笔者机器内存只有1g,因此需要修改内存大小)

进入 bin 目录

cd /usr/local/rocketmq/rocketmq-all-4.9.4-bin-release/bin

编辑 runbroker.sh 文件和 runserver.sh 文件,修改内存大小

编辑 runbroker.sh 文件,将 8g 修改为 512m

vi runbroker.sh

 修改后

编辑 runserver.sh 文件

vi runserver.sh

 这里对 java 版本进行了判断,上面 2 处配置笔者都修改为 512m

开放防火墙 9876 和 10911 端口

firewall-cmd --zone=public --add-port=9876/tcp --permanent
firewall-cmd --zone=public --add-port=10911/tcp --permanent

更新防火墙规则(无需断开连接,动态添加规则)

firewall-cmd --reload

查看防火墙所有开放的端口

firewall-cmd --list-port

3、启动 RocketMQ

在 bin 目录下,执行下面命令

启动nameserver

nohup sh mqnamesrv &

启动 broker

nohup sh mqbroker -c /usr/local/rocketmq/rocketmq-all-4.9.4-bin-release/conf/broker.conf &

4、代码测试

新建 maven 项目,添加 rocketmq-client 依赖

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-client</artifactId>
  4. <version>4.9.4</version>
  5. </dependency>

官网示例

生产者代码

将 ip 地址修改为 linux 地址,笔者这里是 192.168.0.103

  1. package client.sample;
  2. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  3. import org.apache.rocketmq.client.producer.SendResult;
  4. import org.apache.rocketmq.common.message.Message;
  5. import org.apache.rocketmq.remoting.common.RemotingHelper;
  6. /**
  7. * SyncProducer
  8. *
  9. * @author wsjz
  10. * @date 2022/07/01
  11. */
  12. public class SyncProducer {
  13. public static void main(String[] args) throws Exception {
  14. //Instantiate with a producer group name.
  15. DefaultMQProducer producer = new
  16. DefaultMQProducer("please_rename_unique_group_name");
  17. // Specify name server addresses.
  18. producer.setNamesrvAddr("192.168.0.103:9876");
  19. //Launch the instance.
  20. producer.start();
  21. for (int i = 0; i < 100; i++) {
  22. //Create a message instance, specifying topic, tag and message body.
  23. Message msg = new Message("TopicTest" /* Topic */,
  24. "TagA" /* Tag */,
  25. ("Hello RocketMQ " +
  26. i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
  27. );
  28. //Call send message to deliver message to one of brokers.
  29. SendResult sendResult = producer.send(msg);
  30. System.out.printf("%s%n", sendResult);
  31. }
  32. //Shut down once the producer instance is not longer in use.
  33. producer.shutdown();
  34. }
  35. }

消费者代码

  1. package client.sample;
  2. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  3. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  4. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  5. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  6. import org.apache.rocketmq.client.exception.MQClientException;
  7. import org.apache.rocketmq.common.message.MessageExt;
  8. import java.util.List;
  9. /**
  10. * Consumer
  11. *
  12. * @author wsjz
  13. * @date 2022/07/01
  14. */
  15. public class Consumer {
  16. public static void main(String[] args) throws InterruptedException, MQClientException {
  17. // Instantiate with specified consumer group name.
  18. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
  19. // Specify name server addresses.
  20. consumer.setNamesrvAddr("192.168.0.103:9876");
  21. // Subscribe one more more topics to consume.
  22. consumer.subscribe("TopicTest", "*");
  23. // Register callback to execute on arrival of messages fetched from brokers.
  24. consumer.registerMessageListener(new MessageListenerConcurrently() {
  25. @Override
  26. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
  27. ConsumeConcurrentlyContext context) {
  28. System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
  29. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  30. }
  31. });
  32. //Launch the consumer instance.
  33. consumer.start();
  34. System.out.printf("Consumer Started.%n");
  35. }
  36. }

运行效果

5、RocketMQ Dashboard 安装

RocketMQ Dashboard安装可以选择使用 docker方式也可以使用源码方式安装,笔者下面介绍源码方式安装

官方文档:https://rocketmq.apache.org/zh/docs/deploymentOperations/17Dashboard/

先在github上下载源码

RocketMQ Dashboard 源码地址:https://github.com/apache/rocketmq-dashboard

将项目克隆到本地文件夹

使用eclipse等编辑器打开项目

项目是 springboot项目,找到 application.yml 文件,修改 rocketmq 地址

笔者的地址是192.168.0.103:9876

不使用 VIP 通道,改为 false

启动项目,浏览器访问 http://localhost:8080 

如果需要的话,可以将RocketMQ Dashboard打成 jar 包,进行部署

RocketMQ Dashboard安装成功

6、关闭 RocketMQ

进入bin目录

关闭 broker

sh mqshutdown broker

关闭 nameserver

sh mqshutdown namesrv

至此完

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/代码探险家/article/detail/877407
推荐阅读
相关标签
  

闽ICP备14008679号