赞
踩
rocketmq-all-5.1.4-bin-release.zip
runserver.sh/runbroker.sh/tools.sh
nohup sh bin/mqnamesrv >>namesrv.log &
单点模式:
nohup sh bin/mqbroker -c conf/brock.conf -pc conf/rmq-proxy.json --enable-proxy >>broker.log&
集群模式:2m+2s-async 多主多从,异步复制
A1机器> nohup sh bin/mqbroker -n '192.168.1.1:9876;192.168.1.2:9876;192.168.1.3:9876' -c conf/2m-2s-async/broker-a.properties -pc conf/rmq-proxy-a.json --enable-proxy >>broker-a.log&
B2机器> nohup sh bin/mqbroker -n '192.168.1.1:9876;192.168.1.2:9876;192.168.1.3:9876' -c conf/2m-2s-async/broker-b.properties -pc conf/rmq-proxy-b.json --enable-proxy >>broker-b.log&
B3机器> nohup sh bin/mqbroker -n '192.168.1.1:9876;192.168.1.2:9876;192.168.1.3:9876' -c conf/2m-2s-async/broker-a-s.properties -pc conf/rmq-proxy-a-s.json --enable-proxy >>broker-a-s.log&
A4机器>nohup sh bin/mqbroker -n '192.168.1.1:9876;192.168.1.2:9876;192.168.1.3:9876' -c conf/2m-2s-async/broker-b-s.properties -pc conf/rmq-proxy-b-s.json --enable-proxy >>broker-b-s.log&
nohup java -jar -Xms512m -Xmx1G rocketmq-dashboard-1.0.0.jar >> rocketmq-dashboard.log &
注意事项:
0 通过rocketmq-dashboard 增加topic
当然,除了使用RocketMQ Dashboard来管理Topic(包括创建、删除等操作)之外,还有其他几种方式可以实现增加Topic的操作。以下是几种常见的方法:
sh mqadmin updateTopic -n <nameserver-address> -c DefaultCluster -t TestTopic
- import org.apache.rocketmq.client.exception.MQClientException;
- import org.apache.rocketmq.client.producer.DefaultMQProducer;
- import org.apache.rocketmq.common.protocol.route.TopicRouteData;
- import org.apache.rocketmq.remoting.RPCHook;
- import org.apache.rocketmq.tools.admin.DefaultMQAdminClient;
-
- public class CreateTopic {
- public static void main(String[] args) throws MQClientException {
- DefaultMQAdminClient adminClient = new DefaultMQAdminClient(RPCHook.DEFAULT);
- adminClient.setNamesrvAddr("<nameserver-address>");
- adminClient.start();
-
- TopicConfig topicConfig = new TopicConfig();
- topicConfig.setTopicName("TestTopic");
- topicConfig.setReadQueueNums(16);
- topicConfig.setWriteQueueNums(16);
- topicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
-
- adminClient.createAndUpdateTopicConfig(topicConfig);
-
- adminClient.shutdown();
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
#NameServer地址
rocketmq.name-server: 192.168.1.1:9876
rocketmq.producer.group: sale-producer-group
implementation 'org.apache.rocketmq:rocketmq-spring-boot-starter:2.1.1'
- import com.alibaba.fastjson.JSONObject;
- import lombok.AllArgsConstructor;
- import org.apache.rocketmq.spring.core.RocketMQTemplate;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.web.bind.annotation.RequestBody;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RequestMethod;
- import org.springframework.web.bind.annotation.RestController;
- import reactor.core.publisher.Mono;
-
- import javax.annotation.Resource;
-
- @RestController
- @AllArgsConstructor
- public class SaleMQProducer {
- private final Logger LOGGER = LoggerFactory.getLogger(SaleMQProducer.class);
- @Resource
- private RocketMQTemplate rocketMQTemplate;
-
- @RequestMapping(value = "/api/mq/create", method = RequestMethod.POST)
- Object create(@RequestBody JSONObject req) {
- return Mono.defer(() -> {
- String topic = req.getString("topic");
- String tag = req.getString("tag");
- String body = req.getString("body");
- LOGGER.info("topic={},tag={},body={}", topic, tag, body);
- //支持带tag发送
- String dest = String.format("%s:%s", topic, tag);
- rocketMQTemplate.convertAndSend(dest, body);
- return Mono.just(true);
- });
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
- import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
- import org.apache.rocketmq.spring.core.RocketMQListener;
- import org.springframework.stereotype.Component;
-
-
- @Component
- @RocketMQMessageListener(consumerGroup = "sale-consumer-group", topic = "TEST_FIRST")
- public class SaleMQConsumer implements RocketMQListener<String> {
- @Override
- public void onMessage(String message) {
- System.out.println("Received message : " + message);
- }
- }
- import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
- import org.apache.rocketmq.spring.core.RocketMQListener;
- import org.springframework.stereotype.Component;
-
-
- @Component
- @RocketMQMessageListener(consumerGroup = "sale-consumer-group-tag-a", topic = "TEST_FIRST", selectorExpression = "TagA")
- public class SaleMQConsumerTagA implements RocketMQListener<String> {
- @Override
- public void onMessage(String message) {
- System.out.println("Received message : " + message);
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。