赞
踩
前面分析了rocketmq-namesrv的源码
其实很简单,lock+map 把注册中心的事情做了。 因为nameSrv 之间不用保证数据一致性。 每个节点都是独立的
本文接着学习 rocketmq-tools, 这个包主要实现了对mqadmin的支持
admin 也是一个client端
DefaultMQAdminExt 和 DefaultMQAdminExtImpl 封装了一些mqadmin命令实现
把命令行的操作转换成java类,基于commons-cli 来做的一些封装
- /**
- * @author shijia.wxr
- */
- public interface SubCommand {
- public String commandName();
-
-
- public String commandDesc();
-
-
- public Options buildCommandlineOptions(final Options options);
-
-
- public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook);
- }
MQAdminStartup
- public static void initCommand() {
- initCommand(new UpdateTopicSubCommand());
- initCommand(new DeleteTopicSubCommand());
- initCommand(new UpdateSubGroupSubCommand());
- initCommand(new DeleteSubscriptionGroupCommand());
- initCommand(new UpdateBrokerConfigSubCommand());
- initCommand(new UpdateTopicPermSubCommand());
-
- initCommand(new TopicRouteSubCommand());
- initCommand(new TopicStatusSubCommand());
- initCommand(new TopicClusterSubCommand());
-
-
- initCommand(new BrokerStatusSubCommand());
- initCommand(new QueryMsgByIdSubCommand());
- initCommand(new QueryMsgByKeySubCommand());
- initCommand(new QueryMsgByUniqueKeySubCommand());
- initCommand(new QueryMsgByOffsetSubCommand());
- initCommand(new QueryMsgByUniqueKeySubCommand());
- initCommand(new PrintMessageSubCommand());
- initCommand(new SendMsgStatusCommand());
- initCommand(new BrokerConsumeStatsSubCommad());
-
-
- initCommand(new ProducerConnectionSubCommand());
- initCommand(new ConsumerConnectionSubCommand());
- initCommand(new ConsumerProgressSubCommand());
- initCommand(new ConsumerStatusSubCommand());
- initCommand(new CloneGroupOffsetCommand());
-
- initCommand(new ClusterListSubCommand());
- initCommand(new TopicListSubCommand());
-
- initCommand(new UpdateKvConfigCommand());
- initCommand(new DeleteKvConfigCommand());
-
- initCommand(new WipeWritePermSubCommand());
- initCommand(new ResetOffsetByTimeCommand());
-
- initCommand(new UpdateOrderConfCommand());
- initCommand(new CleanExpiredCQSubCommand());
- initCommand(new CleanUnusedTopicCommand());
-
- initCommand(new StartMonitoringSubCommand());
- initCommand(new StatsAllSubCommand());
-
- initCommand(new SyncDocsToGithubSubCommand());
- initCommand(new AllocateMQSubCommand());
-
- initCommand(new CheckMsgSendRTCommand());
- initCommand(new CLusterSendMsgRTCommand());
-
- }
上传wiki文件
- public void start() throws MQClientException {
- this.defaultMQPullConsumer.start();
- this.defaultMQAdminExt.start();
- this.defaultMQPushConsumer.start();
- this.startScheduleTask();
- }
- private void startScheduleTask() {
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- MonitorService.this.doMonitorWork();
- } catch (Exception e) {
- log.error("doMonitorWork Exception", e);
- }
- }
- }, 1000 * 20, this.monitorConfig.getRoundInterval(), TimeUnit.MILLISECONDS);
- }
-
- public void doMonitorWork() throws RemotingException, MQClientException, InterruptedException {
- long beginTime = System.currentTimeMillis();
- this.monitorListener.beginRound();
-
- TopicList topicList = defaultMQAdminExt.fetchAllTopicList();
- for (String topic : topicList.getTopicList()) {
- if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
- String consumerGroup = topic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
-
- try {
- this.reportUndoneMsgs(consumerGroup);
- } catch (Exception e) {
- // log.error("reportUndoneMsgs Exception", e);
- }
-
-
- try {
- this.reportConsumerRunningInfo(consumerGroup);
- } catch (Exception e) {
- // log.error("reportConsumerRunningInfo Exception", e);
- }
- }
- }
- this.monitorListener.endRound();
- long spentTimeMills = System.currentTimeMillis() - beginTime;
- log.info("Execute one round monitor work, spent timemills: {}", spentTimeMills);
- }
- private void reportUndoneMsgs(final String consumerGroup) {
- ConsumeStats cs = null;
- try {
- cs = defaultMQAdminExt.examineConsumeStats(consumerGroup);
- } catch (Exception e) {
- return;
- }
-
- ConsumerConnection cc = null;
- try {
- cc = defaultMQAdminExt.examineConsumerConnectionInfo(consumerGroup);
- } catch (Exception e) {
- return;
- }
-
- if (cs != null) {
-
- HashMap<String/* Topic */, ConsumeStats> csByTopic = new HashMap<String, ConsumeStats>();
- {
- Iterator<Entry<MessageQueue, OffsetWrapper>> it = cs.getOffsetTable().entrySet().iterator();
- while (it.hasNext()) {
- Entry<MessageQueue, OffsetWrapper> next = it.next();
- MessageQueue mq = next.getKey();
- OffsetWrapper ow = next.getValue();
- ConsumeStats csTmp = csByTopic.get(mq.getTopic());
- if (null == csTmp) {
- csTmp = new ConsumeStats();
- csByTopic.put(mq.getTopic(), csTmp);
- }
-
- csTmp.getOffsetTable().put(mq, ow);
- }
- }
-
-
- {
- Iterator<Entry<String, ConsumeStats>> it = csByTopic.entrySet().iterator();
- while (it.hasNext()) {
- Entry<String, ConsumeStats> next = it.next();
- UndoneMsgs undoneMsgs = new UndoneMsgs();
- undoneMsgs.setConsumerGroup(consumerGroup);
- undoneMsgs.setTopic(next.getKey());
- this.computeUndoneMsgs(undoneMsgs, next.getValue());
- this.monitorListener.reportUndoneMsgs(undoneMsgs);
- this.reportFailedMsgs(consumerGroup, next.getKey());
- }
- }
- }
- }
- public void reportConsumerRunningInfo(final String consumerGroup) throws InterruptedException,
- MQBrokerException, RemotingException, MQClientException {
- ConsumerConnection cc = defaultMQAdminExt.examineConsumerConnectionInfo(consumerGroup);
- TreeMap<String, ConsumerRunningInfo> infoMap = new TreeMap<String, ConsumerRunningInfo>();
- for (Connection c : cc.getConnectionSet()) {
- String clientId = c.getClientId();
-
- if (c.getVersion() < MQVersion.Version.V3_1_8_SNAPSHOT.ordinal()) {
- continue;
- }
-
- try {
- ConsumerRunningInfo info =
- defaultMQAdminExt.getConsumerRunningInfo(consumerGroup, clientId, false);
- infoMap.put(clientId, info);
- } catch (Exception e) {
- }
- }
-
- if (!infoMap.isEmpty()) {
- this.monitorListener.reportConsumerRunningInfo(infoMap);
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。