当前位置:   article > 正文

rocketmq学习笔记 五 源码之rocketmq-tools

rocketmq-tools

前面分析了rocketmq-namesrv的源码

其实很简单,lock+map 把注册中心的事情做了。  因为nameSrv 之间不用保证数据一致性。 每个节点都是独立的


本文接着学习 rocketmq-tools, 这个包主要实现了对mqadmin的支持


1.admin

admin 也是一个client端


DefaultMQAdminExt 和 DefaultMQAdminExtImpl  封装了一些mqadmin命令实现


2.command

把命令行的操作转换成java类,基于commons-cli 来做的一些封装

  1. /**
  2. * @author shijia.wxr
  3. */
  4. public interface SubCommand {
  5. public String commandName();
  6. public String commandDesc();
  7. public Options buildCommandlineOptions(final Options options);
  8. public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook);
  9. }

MQAdminStartup

  1. public static void initCommand() {
  2. initCommand(new UpdateTopicSubCommand());
  3. initCommand(new DeleteTopicSubCommand());
  4. initCommand(new UpdateSubGroupSubCommand());
  5. initCommand(new DeleteSubscriptionGroupCommand());
  6. initCommand(new UpdateBrokerConfigSubCommand());
  7. initCommand(new UpdateTopicPermSubCommand());
  8. initCommand(new TopicRouteSubCommand());
  9. initCommand(new TopicStatusSubCommand());
  10. initCommand(new TopicClusterSubCommand());
  11. initCommand(new BrokerStatusSubCommand());
  12. initCommand(new QueryMsgByIdSubCommand());
  13. initCommand(new QueryMsgByKeySubCommand());
  14. initCommand(new QueryMsgByUniqueKeySubCommand());
  15. initCommand(new QueryMsgByOffsetSubCommand());
  16. initCommand(new QueryMsgByUniqueKeySubCommand());
  17. initCommand(new PrintMessageSubCommand());
  18. initCommand(new SendMsgStatusCommand());
  19. initCommand(new BrokerConsumeStatsSubCommad());
  20. initCommand(new ProducerConnectionSubCommand());
  21. initCommand(new ConsumerConnectionSubCommand());
  22. initCommand(new ConsumerProgressSubCommand());
  23. initCommand(new ConsumerStatusSubCommand());
  24. initCommand(new CloneGroupOffsetCommand());
  25. initCommand(new ClusterListSubCommand());
  26. initCommand(new TopicListSubCommand());
  27. initCommand(new UpdateKvConfigCommand());
  28. initCommand(new DeleteKvConfigCommand());
  29. initCommand(new WipeWritePermSubCommand());
  30. initCommand(new ResetOffsetByTimeCommand());
  31. initCommand(new UpdateOrderConfCommand());
  32. initCommand(new CleanExpiredCQSubCommand());
  33. initCommand(new CleanUnusedTopicCommand());
  34. initCommand(new StartMonitoringSubCommand());
  35. initCommand(new StatsAllSubCommand());
  36. initCommand(new SyncDocsToGithubSubCommand());
  37. initCommand(new AllocateMQSubCommand());
  38. initCommand(new CheckMsgSendRTCommand());
  39. initCommand(new CLusterSendMsgRTCommand());
  40. }



3.github

上传wiki文件

4.monitor 

  1. public void start() throws MQClientException {
  2. this.defaultMQPullConsumer.start();
  3. this.defaultMQAdminExt.start();
  4. this.defaultMQPushConsumer.start();
  5. this.startScheduleTask();
  6. }


  1. private void startScheduleTask() {
  2. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  3. @Override
  4. public void run() {
  5. try {
  6. MonitorService.this.doMonitorWork();
  7. } catch (Exception e) {
  8. log.error("doMonitorWork Exception", e);
  9. }
  10. }
  11. }, 1000 * 20, this.monitorConfig.getRoundInterval(), TimeUnit.MILLISECONDS);
  12. }
  13. public void doMonitorWork() throws RemotingException, MQClientException, InterruptedException {
  14. long beginTime = System.currentTimeMillis();
  15. this.monitorListener.beginRound();
  16. TopicList topicList = defaultMQAdminExt.fetchAllTopicList();
  17. for (String topic : topicList.getTopicList()) {
  18. if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
  19. String consumerGroup = topic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
  20. try {
  21. this.reportUndoneMsgs(consumerGroup);
  22. } catch (Exception e) {
  23. // log.error("reportUndoneMsgs Exception", e);
  24. }
  25. try {
  26. this.reportConsumerRunningInfo(consumerGroup);
  27. } catch (Exception e) {
  28. // log.error("reportConsumerRunningInfo Exception", e);
  29. }
  30. }
  31. }
  32. this.monitorListener.endRound();
  33. long spentTimeMills = System.currentTimeMillis() - beginTime;
  34. log.info("Execute one round monitor work, spent timemills: {}", spentTimeMills);
  35. }

reportUndoneMsgs

  1. private void reportUndoneMsgs(final String consumerGroup) {
  2. ConsumeStats cs = null;
  3. try {
  4. cs = defaultMQAdminExt.examineConsumeStats(consumerGroup);
  5. } catch (Exception e) {
  6. return;
  7. }
  8. ConsumerConnection cc = null;
  9. try {
  10. cc = defaultMQAdminExt.examineConsumerConnectionInfo(consumerGroup);
  11. } catch (Exception e) {
  12. return;
  13. }
  14. if (cs != null) {
  15. HashMap<String/* Topic */, ConsumeStats> csByTopic = new HashMap<String, ConsumeStats>();
  16. {
  17. Iterator<Entry<MessageQueue, OffsetWrapper>> it = cs.getOffsetTable().entrySet().iterator();
  18. while (it.hasNext()) {
  19. Entry<MessageQueue, OffsetWrapper> next = it.next();
  20. MessageQueue mq = next.getKey();
  21. OffsetWrapper ow = next.getValue();
  22. ConsumeStats csTmp = csByTopic.get(mq.getTopic());
  23. if (null == csTmp) {
  24. csTmp = new ConsumeStats();
  25. csByTopic.put(mq.getTopic(), csTmp);
  26. }
  27. csTmp.getOffsetTable().put(mq, ow);
  28. }
  29. }
  30. {
  31. Iterator<Entry<String, ConsumeStats>> it = csByTopic.entrySet().iterator();
  32. while (it.hasNext()) {
  33. Entry<String, ConsumeStats> next = it.next();
  34. UndoneMsgs undoneMsgs = new UndoneMsgs();
  35. undoneMsgs.setConsumerGroup(consumerGroup);
  36. undoneMsgs.setTopic(next.getKey());
  37. this.computeUndoneMsgs(undoneMsgs, next.getValue());
  38. this.monitorListener.reportUndoneMsgs(undoneMsgs);
  39. this.reportFailedMsgs(consumerGroup, next.getKey());
  40. }
  41. }
  42. }
  43. }


reportConsumerRunningInfo

  1. public void reportConsumerRunningInfo(final String consumerGroup) throws InterruptedException,
  2. MQBrokerException, RemotingException, MQClientException {
  3. ConsumerConnection cc = defaultMQAdminExt.examineConsumerConnectionInfo(consumerGroup);
  4. TreeMap<String, ConsumerRunningInfo> infoMap = new TreeMap<String, ConsumerRunningInfo>();
  5. for (Connection c : cc.getConnectionSet()) {
  6. String clientId = c.getClientId();
  7. if (c.getVersion() < MQVersion.Version.V3_1_8_SNAPSHOT.ordinal()) {
  8. continue;
  9. }
  10. try {
  11. ConsumerRunningInfo info =
  12. defaultMQAdminExt.getConsumerRunningInfo(consumerGroup, clientId, false);
  13. infoMap.put(clientId, info);
  14. } catch (Exception e) {
  15. }
  16. }
  17. if (!infoMap.isEmpty()) {
  18. this.monitorListener.reportConsumerRunningInfo(infoMap);
  19. }
  20. }





声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/603741
推荐阅读
相关标签
  

闽ICP备14008679号