当前位置:   article > 正文

linux安装rocketmq及简单使用

linux安装rocketmq

rocketmq文档:https://rocketmq.apache.org/docs/quick-start/.

一、安装rocketMQ

下载链接https://rocketmq.apache.org/dowloading/releases/.
在这里插入图片描述

//下载
[root@liar local]# wget https://dist.apache.org/repos/dist/dev/rocketmq/5.0.0-ALPHA-rc2/rocketmq-all-5.0.0-ALPHA-bin-release.zip
解压
[root@liar local]# unzip rocketmq-all-5.0.0-ALPHA-bin-release.zip
  • 1
  • 2
  • 3
  • 4
配置rocketmq环境变量
[root@liar local]# vim /etc/profile

export rocketmq=/usr/local/rocketmq-all-5.0.0-ALPHA-bin-release/
export PATH=$PATH:$rocketmq/bin
  • 1
  • 2
  • 3
  • 4
  • 5

在这里插入图片描述

二、启动RocketMQ

修改一下RocketMQ启动的内存大小

[root@liar bin]# vim runbroker.sh
  • 1

在这里插入图片描述

[root@liar bin]# [root@liar bin]# vim runserver.sh
  • 1

在这里插入图片描述

启动mqnameserver(&表示后台启动,不能少)

进入rocket的bin目录执行命令命令:nohup sh mqnamesrv &

[root@liar bin]# nohup sh mqnamesrv &
[1] 146587
[root@liar bin]# nohup: ignoring input and appending output to 'nohup.out'
  • 1
  • 2
  • 3

查看是否成功

[root@liar bin]# tail -1000f nohup.out
  • 1

在这里插入图片描述
或者jps查看

[root@liar bin]# jps
146587 NamesrvStartup
146604 Jps
  • 1
  • 2
  • 3

启动broker

进入rocket的bin目录执行命令命令:nohup sh mqbroker -n localhost:9876 &

nohup sh mqbroker -n localhost:9876 &
  • 1

发送消息报错指定自动创建topic
自动创建topic:启动broker时加上自动创建topic的参数,如下,其中autoCreateTopicEnable=true表示自动创建topic

nohup sh mqbroker -n localhost:9876 autoCreateTopicEnable=true > ../broker.log &
  • 1

命令行发送消息

export NAMESRV_ADDR=localhost:9876

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
  • 1
  • 2
  • 3

在这里插入图片描述

命令行接收消息

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
  • 1

在这里插入图片描述

三、集群搭建

配置相关属性

序号主机名/IPIP功能BROKER角色
1rocketmqOS192.168.182.147NameServer + BrokerMaster1 + Slave2
2rocketmqOS292.168.182.148NameServer + BrokerMaster2 + Slave1

修改rocketmqOS1配置文件

在这里插入图片描述
相关属性解析

# broker所属集群的名称
brokerClusterName = DefaultCluster
 
# broker的名称
brokerName = broker-a
 
# broker的ID, 0表示Master,非0表示Slave
brokerId = 0
 
# 删除文件时间点,默认是凌晨4点
deleteWhen = 04
 
# 文件保留时间,默认保留48小时
fileReservedTime = 48
 
# broker的角色
# ASYNC_MASTER: 异步复制Master
# SYNC_MASTER: 同步双写Master
# SLAVE: slave
brokerRole = ASYNC_MASTER
 
# 刷盘方式
# ASYNC_FLUSH: 异步刷盘
# SYNC_FLUSH: 同步刷盘
flushDiskType = ASYNC_FLUSH
 
# NameServer的地址,如果有多个的话,使用分号分隔开
namesrvAddr=10.0.90.59:9876
 
# 当前broker监听的IP地址
brokerIP1=10.0.90.59
 
# 在发送消息时,自动创建服务器不存在的topic,默认创建4个队列 
defaultTopicQueueNums=4
 
# 是否允许broker自动创建Topic
autoCreateTopicEnable=true
 
#是否允许broker自动创建订阅组
autoCreateSubscriptionGroup=true
 
# broker对外服务的监听端口 
listenPort=10911
 
# 每个commitLog文件的大小默认是1G
mapedFileSizeCommitLog=1073741824
 
# ConsumeQueue每个文件默认存30W条
mapedFileSizeConsumeQueue=300000
 
# store的存储路径
storePathRootDir=/rocketmq/rocketmq-4.9.2/store
 
# commitLog的存储路径 
storePathCommitLog=/rocketmq/rocketmq-4.9.2/store/commitlog
 
# 消费队列的存储路径
storePathConsumeQueue=/rocketmq/rocketmq-4.9.2/store/consumequeue
 
# 消息索引的存储路径
storePathIndex=/rocketmq/rocketmq-4.9.2/store/index
 
# checkpoint文件的存储路径
storeCheckpoint=/rocketmq/rocketmq-4.9.2/store/checkpoint
 
# abort文件的存储路径
abortFile=/rocketmq/rocketmq-4.9.2/store/abort
 
# 限制的消息大小,默认为4M 
maxMessageSize=65536
 
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=75
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73

修改broker-a.properties

namesrvAddr=192.168.182.147:9876;192.168.182.148:9876
  • 1

在这里插入图片描述
修改broker-b-s.properties

namesrvAddr=192.168.182.147:9876;192.168.182.148:9876
listenPort=11911
storePathRootDir=~/store-s 
storePathCommitLog=~/store-s/commitlog 
storePathConsumeQueue=~/store-s/consumequeue 
storePathIndex=~/store-s/index 
storeCheckpoint=~/store-s/checkpoint 
abortFile=~/store-s/abort
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

在这里插入图片描述

修改rocketmqOS2配置文件

在这里插入图片描述
在这里插入图片描述

namesrvAddr=192.168.182.147:9876;192.168.182.148:9876
listenPort=11911
storePathRootDir=~/store-s 
storePathCommitLog=~/store-s/commitlog 
storePathConsumeQueue=~/store-s/consumequeue 
storePathIndex=~/store-s/index 
storeCheckpoint=~/store-s/checkpoint 
abortFile=~/store-s/abort
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

在这里插入图片描述
broker-b.properties

namesrvAddr=192.168.182.147:9876;192.168.182.148:9876
  • 1

启动服务器

启动NameServer集群
分别启动rocketmqOS1与rocketmqOS2两个主机中的NameServer。启动命令完全相同。

nohup sh bin/mqnamesrv & 
tail -f ~/logs/rocketmqlogs/namesrv.log
  • 1
  • 2

启动两个Master
分别启动rocketmqOS1与rocketmqOS2两个主机中的broker master。注意,它们指定所要加载的配置
文件是不同的。

nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a.properties & 
tail -f ~/logs/rocketmqlogs/broker.log
  • 1
  • 2
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b.properties & 
tail -f ~/logs/rocketmqlogs/broker.log
  • 1
  • 2

启动两个Slave
分别启动rocketmqOS1与rocketmqOS2两个主机中的broker slave。注意,它们指定所要加载的配置文
件是不同的。

nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b-s.properties & 
tail -f ~/logs/rocketmqlogs/broker.log
  • 1
  • 2
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a-s.properties & 
tail -f ~/logs/rocketmqlogs/broker.log
  • 1
  • 2

确定启动正常
在这里插入图片描述

四、控制台安装

下载地址:https://github.com/apache/rocketmq-externals/releases
在这里插入图片描述
在这里插入图片描述
pom.xml添加依赖

        <dependency>
            <groupId>javax.xml.bind</groupId>
            <artifactId>jaxb-api</artifactId>
            <version>2.3.0</version>
        </dependency>
        <dependency>
            <groupId>com.sun.xml.bind</groupId>
            <artifactId>jaxb-impl</artifactId>
            <version>2.3.0</version>
        </dependency>
        <dependency>
            <groupId>com.sun.xml.bind</groupId>
            <artifactId>jaxb-core</artifactId>
            <version>2.3.0</version>
        </dependency>
        <dependency>
            <groupId>javax.activation</groupId>
            <artifactId>activation</artifactId>
            <version>1.1.1</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

在console下打包

在这里插入图片描述

mvn clean package -Dmaven.test.skip=ture
  • 1

启动在这里插入图片描述
访问:http://localhost:7000.
在这里插入图片描述

同步

public class SyncProducer {
    public static void main(String[] args) throws Exception {
        // 创建一个producer,参数为Producer Group名称
        DefaultMQProducer producer = new DefaultMQProducer("pg");
        // 指定nameServer地址
        producer.setNamesrvAddr("rocketmqOS:9876");
        // 设置同步发送失败时重试发送的次数,默认为2次
        producer.setRetryTimesWhenSendFailed(3);
        // 设置发送超时时限为5s,默认3s
        producer.setSendMsgTimeout(5000);

        // 开启生产者
        producer.start();

        // 生产并发送100条消息
        for (int i = 0; i < 100; i++) {
            byte[] body = ("Hi," + i).getBytes();
            Message msg = new Message("someTopic", "someTag", body);
            // 为消息指定key
            msg.setKeys("key-" + i);
            // 同步发送消息
            SendResult sendResult = producer.send(msg);
            System.out.println(sendResult);
        }
        // 关闭producer
        producer.shutdown();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

在linux种查看
在这里插入图片描述
控制台查看
在这里插入图片描述

定义异步消息发送生产者

public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("pg");
        producer.setNamesrvAddr("rocketmqOS:9876");
        // 指定异步发送失败后不进行重试发送
        producer.setRetryTimesWhenSendAsyncFailed(0);
        // 指定新创建的Topic的Queue数量为2,默认为4
        producer.setDefaultTopicQueueNums(2);

        producer.start();

        for (int i = 0; i < 100; i++) {
            byte[] body = ("Hi," + i).getBytes();
            try {
                Message msg = new Message("myTopicA", "myTag", body);
                // 异步发送。指定回调
                producer.send(msg, new SendCallback() {
                    // 当producer接收到MQ发送来的ACK后就会触发该回调方法的执行
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.println(sendResult);
                    }

                    @Override
                    public void onException(Throwable e) {
                        e.printStackTrace();
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        } // end-for

        // sleep一会儿
        // 由于采用的是异步发送,所以若这里不sleep,
        // 则消息还未发送就会将producer给关闭,报错
        TimeUnit.SECONDS.sleep(3);
        producer.shutdown();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
public class SomeConsumer {

    public static void main(String[] args) throws MQClientException {
        // 定义一个pull消费者
        // DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("cg");
        // 定义一个push消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
        // 指定nameServer
        consumer.setNamesrvAddr("rocketmqOS:9876");
        // 指定从第一条消息开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 指定消费topic与tag
        consumer.subscribe("someTopic", "*");
        // 指定采用“广播模式”进行消费,默认为“集群模式”
        // consumer.setMessageModel(MessageModel.BROADCASTING);

        // 顺序消息消费失败的消费重试时间间隔,默认为1000毫秒,其取值范围为[10, 30000]毫秒
        consumer.setSuspendCurrentQueueTimeMillis(100);

        // 修改消费重试次数
        consumer.setMaxReconsumeTimes(20);

        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            // 一旦broker中有了其订阅的消息就会触发该方法的执行,
            // 其返回值为当前consumer消费的状态
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                // 逐条消费消息
                for (MessageExt msg : msgs) {
                    System.out.println(msg);
                }
                // 返回消费状态:消费成功
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 开启消费者消费
        consumer.start();
        System.out.println("Consumer Started");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

定义单向消息发送生产者

public class OnewayProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("pg");
        producer.setNamesrvAddr("rocketmqOS:9876");
        producer.start();
        for (int i = 0; i < 10; i++) {
            byte[] body = ("Hi," + i).getBytes();
            Message msg = new Message("single", "someTag", body);
            // 单向发送 producer.sendOneway(msg); }
            producer.shutdown();
            System.out.println("producer shutdown");
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

消费

public class RetryConsumer {

    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
        consumer.setNamesrvAddr("mqOS:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("someTopic", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
               try {
                   // ....
               } catch (Throwable e) {
                   return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
               }

                // 返回消费状态:消费成功
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 开启消费者消费
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/爱喝兽奶帝天荒/article/detail/877493
推荐阅读
相关标签
  

闽ICP备14008679号