当前位置:   article > 正文

rocketmq 集群环境部署及与spring cloud 集成_rocketmq.name-server 集群配置 spring

rocketmq.name-server 集群配置 spring

1 下载zip 安装包

rocketmq-all-5.1.4-bin-release.zip

2 修改启动配置,防止默认内存配置过高

runserver.sh/runbroker.sh/tools.sh

3 启动namesrv

nohup sh bin/mqnamesrv >>namesrv.log &

4 启动broker+proxy

单点模式:

nohup sh bin/mqbroker -c conf/brock.conf -pc conf/rmq-proxy.json --enable-proxy >>broker.log&

集群模式:2m+2s-async 多主多从,异步复制

A1机器> nohup sh bin/mqbroker -n '192.168.1.1:9876;192.168.1.2:9876;192.168.1.3:9876' -c conf/2m-2s-async/broker-a.properties -pc conf/rmq-proxy-a.json --enable-proxy >>broker-a.log&

B2机器> nohup sh bin/mqbroker -n '192.168.1.1:9876;192.168.1.2:9876;192.168.1.3:9876' -c conf/2m-2s-async/broker-b.properties -pc conf/rmq-proxy-b.json --enable-proxy >>broker-b.log&

B3机器> nohup sh bin/mqbroker -n '192.168.1.1:9876;192.168.1.2:9876;192.168.1.3:9876' -c conf/2m-2s-async/broker-a-s.properties -pc conf/rmq-proxy-a-s.json --enable-proxy >>broker-a-s.log&

A4机器>nohup sh bin/mqbroker -n '192.168.1.1:9876;192.168.1.2:9876;192.168.1.3:9876' -c conf/2m-2s-async/broker-b-s.properties -pc conf/rmq-proxy-b-s.json --enable-proxy >>broker-b-s.log&

5 配置控制面板 rocketmq-dashboard

nohup java -jar -Xms512m -Xmx1G rocketmq-dashboard-1.0.0.jar >> rocketmq-dashboard.log &

注意事项:

  • 所有ip配置均配置本机外网ip
  • 端口不重复,安全组开启相应端口
  • -n 多主机情况,需要增加引号或双引号
  • broker 监听端口,不重复
  • proxy 监听端口,不重复

6 集成到spring cloud 具体服务中:

0 通过rocketmq-dashboard 增加topic

   当然,除了使用RocketMQ Dashboard来管理Topic(包括创建、删除等操作)之外,还有其他几种方式可以实现增加Topic的操作。以下是几种常见的方法:

  • 命令行工具: RocketMQ提供了命令行工具mqadmin,通过这个工具可以直接在命令行下执行Topic的创建操作。例如,创建一个名为TestTopic的Topic,可以在命令行中输入如下命令:
   sh mqadmin updateTopic -n <nameserver-address> -c DefaultCluster -t TestTopic
   

     其中<nameserver-address>需要替换为你的NameServer地址。首次创建时可能需要使用createTopic命令而非updateTopic。

  • 编程方式: 你可以编写Java程序,利用RocketMQ提供的客户端API进行Topic的创建。这通常涉及到初始化一个DefaultMQAdminClient实例,然后调用其相应的方法来创建Topic。示例代码如下:

  1. import org.apache.rocketmq.client.exception.MQClientException;
  2. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  3. import org.apache.rocketmq.common.protocol.route.TopicRouteData;
  4. import org.apache.rocketmq.remoting.RPCHook;
  5. import org.apache.rocketmq.tools.admin.DefaultMQAdminClient;
  6. public class CreateTopic {
  7. public static void main(String[] args) throws MQClientException {
  8. DefaultMQAdminClient adminClient = new DefaultMQAdminClient(RPCHook.DEFAULT);
  9. adminClient.setNamesrvAddr("<nameserver-address>");
  10. adminClient.start();
  11. TopicConfig topicConfig = new TopicConfig();
  12. topicConfig.setTopicName("TestTopic");
  13. topicConfig.setReadQueueNums(16);
  14. topicConfig.setWriteQueueNums(16);
  15. topicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
  16. adminClient.createAndUpdateTopicConfig(topicConfig);
  17. adminClient.shutdown();
  18. }
  19. }

请确保替换<nameserver-address>为实际的NameServer地址,并根据需要调整队列数和权限。

  • HTTP API: RocketMQ从4.7.0版本开始支持HTTP API,这意味着你可以通过发送HTTP请求的方式来管理Topic,包括创建Topic。具体API文档和使用方式可参考RocketMQ官方文档,通常涉及构造特定的HTTP POST请求到RocketMQ服务端。

   每种方法有其适用场景,选择最适合你当前环境和需求的方式即可。

1 增加配置

#NameServer地址

rocketmq.name-server: 192.168.1.1:9876

rocketmq.producer.group: sale-producer-group

2 增加依赖

implementation 'org.apache.rocketmq:rocketmq-spring-boot-starter:2.1.1'

3 消息生产者

  1. import com.alibaba.fastjson.JSONObject;
  2. import lombok.AllArgsConstructor;
  3. import org.apache.rocketmq.spring.core.RocketMQTemplate;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import org.springframework.web.bind.annotation.RequestBody;
  7. import org.springframework.web.bind.annotation.RequestMapping;
  8. import org.springframework.web.bind.annotation.RequestMethod;
  9. import org.springframework.web.bind.annotation.RestController;
  10. import reactor.core.publisher.Mono;
  11. import javax.annotation.Resource;
  12. @RestController
  13. @AllArgsConstructor
  14. public class SaleMQProducer {
  15. private final Logger LOGGER = LoggerFactory.getLogger(SaleMQProducer.class);
  16. @Resource
  17. private RocketMQTemplate rocketMQTemplate;
  18. @RequestMapping(value = "/api/mq/create", method = RequestMethod.POST)
  19. Object create(@RequestBody JSONObject req) {
  20. return Mono.defer(() -> {
  21. String topic = req.getString("topic");
  22. String tag = req.getString("tag");
  23. String body = req.getString("body");
  24. LOGGER.info("topic={},tag={},body={}", topic, tag, body);
  25. //支持带tag发送
  26. String dest = String.format("%s:%s", topic, tag);
  27. rocketMQTemplate.convertAndSend(dest, body);
  28. return Mono.just(true);
  29. });
  30. }
  31. }

4 消息消费者

  1. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
  2. import org.apache.rocketmq.spring.core.RocketMQListener;
  3. import org.springframework.stereotype.Component;
  4. @Component
  5. @RocketMQMessageListener(consumerGroup = "sale-consumer-group", topic = "TEST_FIRST")
  6. public class SaleMQConsumer implements RocketMQListener<String> {
  7. @Override
  8. public void onMessage(String message) {
  9. System.out.println("Received message : " + message);
  10. }
  11. }

5 消息消费者,增加tag匹配(默认* 全量匹配)

  1. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
  2. import org.apache.rocketmq.spring.core.RocketMQListener;
  3. import org.springframework.stereotype.Component;
  4. @Component
  5. @RocketMQMessageListener(consumerGroup = "sale-consumer-group-tag-a", topic = "TEST_FIRST", selectorExpression = "TagA")
  6. public class SaleMQConsumerTagA implements RocketMQListener<String> {
  7. @Override
  8. public void onMessage(String message) {
  9. System.out.println("Received message : " + message);
  10. }
  11. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/713013
推荐阅读
相关标签
  

闽ICP备14008679号