当前位置:   article > 正文

Hadoop2.x ResourceManager启动之服务启动_service org.apache.hadoop.yarn.server.resourcemana

service org.apache.hadoop.yarn.server.resourcemanager.activestandbyelectorba
YARN ResourceManager启动之服务启动
RM是个综合服务类,内部包含了多个服务,所有的服务被放在列表中,通过循环逐个启动,其他服务的列表如下:

每个服务的启动都遵循一定的流程,服务的启动流程如下:
1、ResourceManager.java中的serviceStart调用父类的serviceStart

  1. //ResourceManager.java
  2. protected void serviceStart() throws Exception {
  3. ......
  4. super.serviceStart();
  5. }
2、父类CompositeService.serviceStart
  1. protected void serviceStart() throws Exception {
  2. //获得服务列表
  3. List<Service> services = getServices();
  4. if (LOG.isDebugEnabled()) {
  5. LOG.debug(getName() + ": starting services, size=" + services.size());
  6. }
  7. //循环启动服务,每一次start调用最终都会进入服务本身的serviceStart函数
  8. for (Service service : services) {
  9. // start the service. If this fails that service
  10. // will be stopped and an exception raised
  11. service.start();
  12. }
  13. super.serviceStart();
  14. }
3、进入父类的父类AbstractService
  1. public void start() {
  2. //服务是否已经启动?
  3. if (isInState(STATE.STARTED)) {
  4. return;
  5. }
  6. //enter the started state
  7. synchronized (stateChangeLock) {
  8. if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) {
  9. try {
  10. //记录服务启动时间
  11. startTime = System.currentTimeMillis();
  12. //开始启动服务,此处会进入子类的函数+++
  13. serviceStart();
  14. //检测服务是否启动成功
  15. if (isInState(STATE.STARTED)) {
  16. //if the service started (and isn't now in a later state), notify
  17. if (LOG.isDebugEnabled()) {
  18. LOG.debug("Service " + getName() + " is started");
  19. }
  20. notifyListeners();
  21. }
  22. } catch (Exception e) {
  23. noteFailure(e);
  24. ServiceOperations.stopQuietly(LOG, this);
  25. throw ServiceStateException.convert(e);
  26. }
  27. }
  28. }
  29. }
4、最终进入子类serviceStart函数中启动服务
由此可以看出的由于服务的抽象对服务的统一管理带来了便利,如果后续再增加服务,只要按这个继承关系就可以将服务纳入统一管理了。
Token管理器服务线程启动

  1. @Override
  2. public void serviceStart() throws Exception {
  3. //Token管理器启动,具体作用以后分析,每个管理器由Timer驱动
  4. amRmTokenSecretManager.start();
  5. containerTokenSecretManager.start();
  6. nmTokenSecretManager.start();
  7. try {
  8. //过期Token移除线程
  9. rmDTSecretManager.startThreads();
  10. } catch(IOException ie) {
  11. throw new YarnRuntimeException("Failed to start secret manager threads", ie);
  12. }
  13. super.serviceStart();
  14. }
Ping Checker服务:AbstractLivelinessMonitor的内部类,循环遍历已记录的NodeManager列表,当发现某个节点超过一段时间未汇报,则认为他已经挂掉,在列表中删除。
  1. private class PingChecker implements Runnable {
  2. @Override
  3. public void run() {
  4. while (!stopped && !Thread.currentThread().isInterrupted()) {
  5. synchronized (AbstractLivelinessMonitor.this) {
  6. //获得活动NM列表的迭代器
  7. Iterator<Map.Entry<O, Long>> iterator =
  8. running.entrySet().iterator();
  9. //avoid calculating current time everytime in loop
  10. long currentTime = clock.getTime();
  11. //迭代每个节点,若发现节点超过expireInterval(yarn.nm.liveness-monitor.expiry-interval-ms控制,默认10分钟)
  12. //则认为他已经挂掉,删除该节点
  13. while (iterator.hasNext()) {
  14. Map.Entry<O, Long> entry = iterator.next();
  15. if (currentTime > entry.getValue() + expireInterval) {
  16. iterator.remove();
  17. expire(entry.getKey());
  18. LOG.info("Expired:" + entry.getKey().toString() +
  19. " Timed out after " + expireInterval/1000 + " secs");
  20. }
  21. }
  22. }
  23. try {
  24. //线程暂停monitorInterval( expireInterval/3)
  25. Thread.sleep(monitorInterval);
  26. } catch (InterruptedException e) {
  27. LOG.info(getName() + " thread interrupted");
  28. break;
  29. }
  30. }
  31. }
  32. }
ResourceManager Event Processor服务:
  1. private final class EventProcessor implements Runnable {
  2. @Override
  3. public void run() {
  4. SchedulerEvent event;
  5. while (!stopped && !Thread.currentThread().isInterrupted()) {
  6. try {
  7. //取出事件
  8. event = eventQueue.take();
  9. } catch (InterruptedException e) {
  10. LOG.error("Returning, interrupted : " + e);
  11. return; // TODO: Kill RM.
  12. }
  13. try {
  14. //处理事件
  15. scheduler.handle(event);
  16. } catch (Throwable t) {
  17. .....
  18. }
  19. }
  20. }
  21. }
ResourceTrackerService服务:RPC服务器,实现了ResourceTracker接口,提供NM的注册和心跳服务
  1. //ResourceTrackerService.java
  2. @Override
  3. protected void serviceStart() throws Exception {
  4. super.serviceStart();
  5. // ResourceTrackerServer authenticates NodeManager via Kerberos if
  6. // security is enabled, so no secretManager.
  7. //创建RPC服务器,该服务器实现ResourceTracker接口,handler数量由yarn.resourcemanager.
  8. resource-tracker.client.thread-count控制
  9. Configuration conf = getConfig();
  10. YarnRPC rpc = YarnRPC.create(conf);
  11. this.server =
  12. rpc.getServer(ResourceTracker.class, this, resourceTrackerAddress,
  13. conf, null,
  14. conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT,
  15. YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT));
  16. // Enable service authorization?
  17. if (conf.getBoolean(
  18. CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
  19. false)) {
  20. refreshServiceAcls(conf, new RMPolicyProvider());
  21. }
  22. //服务启动
  23. this.server.start();
  24. conf.updateConnectAddr(YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
  25. server.getListenerAddress());
  26. }
RPC服务器组件启动,主要包括responder、listener、handler
  1. public synchronized void start() {
  2. responder.start();
  3. listener.start();
  4. handlers = new Handler[handlerCount];
  5. for (int i = 0; i < handlerCount; i++) {
  6. handlers[i] = new Handler(i);
  7. handlers[i].start();
  8. }
  9. }
服务于客户端的RPC server:ClientRMService,类似ResourceTrackerService,该服务器实现了ApplicationClientProtocol接口,RPC server的启动都一样,只是实现的协议不同
  1. @Override
  2. protected void serviceStart() throws Exception {
  3. Configuration conf = getConfig();
  4. YarnRPC rpc = YarnRPC.create(conf);
  5. //handler数量由yarn.resourcemanager.client.thread-count控制
  6. this.server =
  7. rpc.getServer(ApplicationClientProtocol.class, this,
  8. clientBindAddress,
  9. conf, this.rmDTSecretManager,
  10. conf.getInt(YarnConfiguration.RM_CLIENT_THREAD_COUNT,
  11. YarnConfiguration.DEFAULT_RM_CLIENT_THREAD_COUNT));
  12. // Enable service authorization?
  13. if (conf.getBoolean(
  14. CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
  15. false)) {
  16. refreshServiceAcls(conf, new RMPolicyProvider());
  17. }
  18. this.server.start();
  19. clientBindAddress = conf.updateConnectAddr(YarnConfiguration.RM_ADDRESS,
  20. server.getListenerAddress());
  21. super.serviceStart();
  22. }
服务于管理员的RPC server:AdminService ,handler数量由yarn.resourcemanager.admin.client.thread-count控制,该服务实现ResourceManagerAdministrationProtocol接口
  1. protected void startServer() throws Exception {
  2. Configuration conf = getConfig();
  3. YarnRPC rpc = YarnRPC.create(conf);
  4. this.server = (Server) rpc.getServer(
  5. ResourceManagerAdministrationProtocol.class, this, masterServiceAddress,
  6. conf, null,
  7. conf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT,
  8. YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT));
  9. .......
  10. this.server.start();
  11. conf.updateConnectAddr(YarnConfiguration.RM_ADMIN_ADDRESS,
  12. server.getListenerAddress());
  13. }
AsyncDispatcher event handler服务的启动:
调用层次比较深,只关注关键部分,调用栈的顶层:
AsyncDispatcher类直接继承自AbstractService,服务启动时会先调用父类的同名函数

  1. @Override
  2. protected void serviceStart() throws Exception {
  3. //调用父类同名函数,实际啥都木有做,以后全局初始化之类的操作可能会放进去
  4. super.serviceStart();
  5. //创建一个新的线程,并启动,主要的业务关系包含在createThread函数中
  6. eventHandlingThread = new Thread(createThread());
  7. eventHandlingThread.setName("AsyncDispatcher event handler");
  8. eventHandlingThread.start();
  9. }
下面看AsyncDispatcher的线程执行体,由上面的createThread创建,该线程会进入主循环,并一直等待事件队列,一旦有新的事件到达,便执行dispatch(event),将事件分发出去
  1. Runnable createThread() {
  2. return new Runnable() {
  3. @Override
  4. public void run() {
  5. //查看服务标识和线程状态
  6. while (!stopped && !Thread.currentThread().isInterrupted()) {
  7. drained = eventQueue.isEmpty();
  8. // blockNewEvents is only set when dispatcher is draining to stop,
  9. // adding this check is to avoid the overhead of acquiring the lock
  10. // and calling notify every time in the normal run of the loop.
  11. //加入该检测是防止事件过多导致该线程压力过大
  12. if (blockNewEvents) {
  13. synchronized (waitForDrained) {
  14. if (drained) {
  15. waitForDrained.notify();
  16. }
  17. }
  18. }
  19. Event event;
  20. try {
  21. //在队列中取出事件
  22. event = eventQueue.take();
  23. } catch(InterruptedException ie) {
  24. if (!stopped) {
  25. LOG.warn("AsyncDispatcher thread interrupted", ie);
  26. }
  27. return;
  28. }
  29. if (event != null) {
  30. //分发事件
  31. dispatch(event);
  32. }
  33. }
  34. }
  35. };
  36. }
RM的服务类型还是比较多的,而且好多服务都是多线程的,比如RPCserver,默认的handler就有50个,而且有多个RPC server,RM中整体的服务列表服下:
  1. Service org.apache.hadoop.yarn.server.resourcemanager.RMSecretManagerService
  2. Service org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer
  3. Service AMLivelinessMonitor in state AMLivelinessMonitor: INITED
  4. Service AMLivelinessMonitor in state AMLivelinessMonitor: INITED
  5. Service org.apache.hadoop.yarn.server.resourcemanager.NodesListManager
  6. Service org.apache.hadoop.yarn.server.resourcemanager.ResourceManager$SchedulerEventDispatcher
  7. Service NMLivelinessMonitor in state NMLivelinessMonitor: INITED
  8. Service org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService
  9. Service org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService
  10. Service org.apache.hadoop.yarn.server.resourcemanager.ClientRMService
  11. Service org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher
  12. Service Dispatcher in state Dispatcher: INITED
  13. Service org.apache.hadoop.yarn.server.resourcemanager.AdminService


声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/394508?site
推荐阅读
相关标签
  

闽ICP备14008679号