赞
踩
rocketmq文档:https://rocketmq.apache.org/docs/quick-start/.
下载链接https://rocketmq.apache.org/dowloading/releases/.
//下载
[root@liar local]# wget https://dist.apache.org/repos/dist/dev/rocketmq/5.0.0-ALPHA-rc2/rocketmq-all-5.0.0-ALPHA-bin-release.zip
解压
[root@liar local]# unzip rocketmq-all-5.0.0-ALPHA-bin-release.zip
配置rocketmq环境变量
[root@liar local]# vim /etc/profile
export rocketmq=/usr/local/rocketmq-all-5.0.0-ALPHA-bin-release/
export PATH=$PATH:$rocketmq/bin
修改一下RocketMQ启动的内存大小
[root@liar bin]# vim runbroker.sh
[root@liar bin]# [root@liar bin]# vim runserver.sh
进入rocket的bin目录执行命令命令:nohup sh mqnamesrv &
[root@liar bin]# nohup sh mqnamesrv &
[1] 146587
[root@liar bin]# nohup: ignoring input and appending output to 'nohup.out'
查看是否成功
[root@liar bin]# tail -1000f nohup.out
或者jps查看
[root@liar bin]# jps
146587 NamesrvStartup
146604 Jps
进入rocket的bin目录执行命令命令:nohup sh mqbroker -n localhost:9876 &
nohup sh mqbroker -n localhost:9876 &
发送消息报错指定自动创建topic
自动创建topic:启动broker时加上自动创建topic的参数,如下,其中autoCreateTopicEnable=true表示自动创建topic
nohup sh mqbroker -n localhost:9876 autoCreateTopicEnable=true > ../broker.log &
export NAMESRV_ADDR=localhost:9876
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
序号 | 主机名/IP | IP | 功能 | BROKER角色 |
---|---|---|---|---|
1 | rocketmqOS1 | 92.168.182.147 | NameServer + Broker | Master1 + Slave2 |
2 | rocketmqOS2 | 92.168.182.148 | NameServer + Broker | Master2 + Slave1 |
相关属性解析
# broker所属集群的名称 brokerClusterName = DefaultCluster # broker的名称 brokerName = broker-a # broker的ID, 0表示Master,非0表示Slave brokerId = 0 # 删除文件时间点,默认是凌晨4点 deleteWhen = 04 # 文件保留时间,默认保留48小时 fileReservedTime = 48 # broker的角色 # ASYNC_MASTER: 异步复制Master # SYNC_MASTER: 同步双写Master # SLAVE: slave brokerRole = ASYNC_MASTER # 刷盘方式 # ASYNC_FLUSH: 异步刷盘 # SYNC_FLUSH: 同步刷盘 flushDiskType = ASYNC_FLUSH # NameServer的地址,如果有多个的话,使用分号分隔开 namesrvAddr=10.0.90.59:9876 # 当前broker监听的IP地址 brokerIP1=10.0.90.59 # 在发送消息时,自动创建服务器不存在的topic,默认创建4个队列 defaultTopicQueueNums=4 # 是否允许broker自动创建Topic autoCreateTopicEnable=true #是否允许broker自动创建订阅组 autoCreateSubscriptionGroup=true # broker对外服务的监听端口 listenPort=10911 # 每个commitLog文件的大小默认是1G mapedFileSizeCommitLog=1073741824 # ConsumeQueue每个文件默认存30W条 mapedFileSizeConsumeQueue=300000 # store的存储路径 storePathRootDir=/rocketmq/rocketmq-4.9.2/store # commitLog的存储路径 storePathCommitLog=/rocketmq/rocketmq-4.9.2/store/commitlog # 消费队列的存储路径 storePathConsumeQueue=/rocketmq/rocketmq-4.9.2/store/consumequeue # 消息索引的存储路径 storePathIndex=/rocketmq/rocketmq-4.9.2/store/index # checkpoint文件的存储路径 storeCheckpoint=/rocketmq/rocketmq-4.9.2/store/checkpoint # abort文件的存储路径 abortFile=/rocketmq/rocketmq-4.9.2/store/abort # 限制的消息大小,默认为4M maxMessageSize=65536 # 检测物理文件磁盘空间 diskMaxUsedSpaceRatio=75
修改broker-a.properties
namesrvAddr=192.168.182.147:9876;192.168.182.148:9876
修改broker-b-s.properties
namesrvAddr=192.168.182.147:9876;192.168.182.148:9876
listenPort=11911
storePathRootDir=~/store-s
storePathCommitLog=~/store-s/commitlog
storePathConsumeQueue=~/store-s/consumequeue
storePathIndex=~/store-s/index
storeCheckpoint=~/store-s/checkpoint
abortFile=~/store-s/abort
namesrvAddr=192.168.182.147:9876;192.168.182.148:9876
listenPort=11911
storePathRootDir=~/store-s
storePathCommitLog=~/store-s/commitlog
storePathConsumeQueue=~/store-s/consumequeue
storePathIndex=~/store-s/index
storeCheckpoint=~/store-s/checkpoint
abortFile=~/store-s/abort
broker-b.properties
namesrvAddr=192.168.182.147:9876;192.168.182.148:9876
启动NameServer集群
分别启动rocketmqOS1与rocketmqOS2两个主机中的NameServer。启动命令完全相同。
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
启动两个Master
分别启动rocketmqOS1与rocketmqOS2两个主机中的broker master。注意,它们指定所要加载的配置
文件是不同的。
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a.properties &
tail -f ~/logs/rocketmqlogs/broker.log
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b.properties &
tail -f ~/logs/rocketmqlogs/broker.log
启动两个Slave
分别启动rocketmqOS1与rocketmqOS2两个主机中的broker slave。注意,它们指定所要加载的配置文
件是不同的。
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b-s.properties &
tail -f ~/logs/rocketmqlogs/broker.log
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a-s.properties &
tail -f ~/logs/rocketmqlogs/broker.log
确定启动正常
下载地址:https://github.com/apache/rocketmq-externals/releases
pom.xml添加依赖
<dependency> <groupId>javax.xml.bind</groupId> <artifactId>jaxb-api</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>com.sun.xml.bind</groupId> <artifactId>jaxb-impl</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>com.sun.xml.bind</groupId> <artifactId>jaxb-core</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>javax.activation</groupId> <artifactId>activation</artifactId> <version>1.1.1</version> </dependency>
mvn clean package -Dmaven.test.skip=ture
启动
访问:http://localhost:7000.
public class SyncProducer { public static void main(String[] args) throws Exception { // 创建一个producer,参数为Producer Group名称 DefaultMQProducer producer = new DefaultMQProducer("pg"); // 指定nameServer地址 producer.setNamesrvAddr("rocketmqOS:9876"); // 设置同步发送失败时重试发送的次数,默认为2次 producer.setRetryTimesWhenSendFailed(3); // 设置发送超时时限为5s,默认3s producer.setSendMsgTimeout(5000); // 开启生产者 producer.start(); // 生产并发送100条消息 for (int i = 0; i < 100; i++) { byte[] body = ("Hi," + i).getBytes(); Message msg = new Message("someTopic", "someTag", body); // 为消息指定key msg.setKeys("key-" + i); // 同步发送消息 SendResult sendResult = producer.send(msg); System.out.println(sendResult); } // 关闭producer producer.shutdown(); } }
在linux种查看
控制台查看
public class AsyncProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("pg"); producer.setNamesrvAddr("rocketmqOS:9876"); // 指定异步发送失败后不进行重试发送 producer.setRetryTimesWhenSendAsyncFailed(0); // 指定新创建的Topic的Queue数量为2,默认为4 producer.setDefaultTopicQueueNums(2); producer.start(); for (int i = 0; i < 100; i++) { byte[] body = ("Hi," + i).getBytes(); try { Message msg = new Message("myTopicA", "myTag", body); // 异步发送。指定回调 producer.send(msg, new SendCallback() { // 当producer接收到MQ发送来的ACK后就会触发该回调方法的执行 @Override public void onSuccess(SendResult sendResult) { System.out.println(sendResult); } @Override public void onException(Throwable e) { e.printStackTrace(); } }); } catch (Exception e) { e.printStackTrace(); } } // end-for // sleep一会儿 // 由于采用的是异步发送,所以若这里不sleep, // 则消息还未发送就会将producer给关闭,报错 TimeUnit.SECONDS.sleep(3); producer.shutdown(); } }
public class SomeConsumer { public static void main(String[] args) throws MQClientException { // 定义一个pull消费者 // DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("cg"); // 定义一个push消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg"); // 指定nameServer consumer.setNamesrvAddr("rocketmqOS:9876"); // 指定从第一条消息开始消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 指定消费topic与tag consumer.subscribe("someTopic", "*"); // 指定采用“广播模式”进行消费,默认为“集群模式” // consumer.setMessageModel(MessageModel.BROADCASTING); // 顺序消息消费失败的消费重试时间间隔,默认为1000毫秒,其取值范围为[10, 30000]毫秒 consumer.setSuspendCurrentQueueTimeMillis(100); // 修改消费重试次数 consumer.setMaxReconsumeTimes(20); // 注册消息监听器 consumer.registerMessageListener(new MessageListenerConcurrently() { // 一旦broker中有了其订阅的消息就会触发该方法的执行, // 其返回值为当前consumer消费的状态 @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { // 逐条消费消息 for (MessageExt msg : msgs) { System.out.println(msg); } // 返回消费状态:消费成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 开启消费者消费 consumer.start(); System.out.println("Consumer Started"); } }
public class OnewayProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("pg");
producer.setNamesrvAddr("rocketmqOS:9876");
producer.start();
for (int i = 0; i < 10; i++) {
byte[] body = ("Hi," + i).getBytes();
Message msg = new Message("single", "someTag", body);
// 单向发送 producer.sendOneway(msg); }
producer.shutdown();
System.out.println("producer shutdown");
}
}
}
public class RetryConsumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg"); consumer.setNamesrvAddr("mqOS:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("someTopic", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { // .... } catch (Throwable e) { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } // 返回消费状态:消费成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 开启消费者消费 consumer.start(); System.out.printf("Consumer Started.%n"); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。