你好,游客 登录 注册 搜索
背景:
阅读新闻

Hadoop1.2.1源码解析系列:JT与TT之间的心跳通信机制——JT篇

[日期:2014-06-15] 来源:Linux社区  作者:vickyway [字体: ]

上一篇浅析了Hadoop心跳机制的TT(TaskTracker)方面(http://www.linuxidc.com/Linux/2014-06/103216.htm),这一篇浅析下JT(JobTracker)方面。
 
我们知道心跳是TT通过RPC请求调用JT的heartbeat()方法的,TT在调用JT的heartbeat回收集自身的状态信息封装到TaskTrackerStatus对象中,传递给JT。下面看看JT如何处理来自TT的心跳。

目录

Hadoop1.2.1源码解析系列:JT与TT之间的心跳通信机制——TT篇 http://www.linuxidc.com/Linux/2014-06/103216.htm

Hadoop1.2.1源码解析系列:JT与TT之间的心跳通信机制——JT篇 http://www.linuxidc.com/Linux/2014-06/103217.htm

Hadoop1.2.1源码解析系列:JT与TT之间的心跳通信机制——命令篇 http://www.linuxidc.com/Linux/2014-06/103218.htm
 
1.JobTracker.heartbeat():

 // Make sure heartbeat is from a tasktracker allowed by the jobtracker.
    if (!acceptTaskTracker(status)) {
      throw new DisallowedTaskTrackerException(status);
    }

第一步是检查发送心跳请求的TT是否属于可允许的TT,这个是根据一个HostsFileReader对象进行判断的,该对象是在实例化JT的时候创建的,这个类保存了两个队列,分别是includes和excludes队列,includes表示可以访问的host列表,excludes表示不可访问的host列表,这两个列表的内容根据两个mapred.hosts和mapred.hosts.exclude(mapred-site,xml中,默认是null)这两个参数指定的文件名读取的。具体可参考JT源码1956行。
 
2.JobTracker.heartbeat():

String trackerName = status.getTrackerName();
    long now = clock.getTime();
    if (restarted) {
      faultyTrackers.markTrackerHealthy(status.getHost());
    } else {
      faultyTrackers.checkTrackerFaultTimeout(status.getHost(), now);
    }

第一步是检查发送心跳请求的TT是否属于可允许的TT,这个是根据一个HostsFileReader对象进行判断的,该对象是在实例化JT的时候创建的,这个类保存了两个队列,分别是includes和excludes队列,includes表示可以访问的host列表,excludes表示不可访问的host列表,这两个列表的内容根据两个mapred.hosts和mapred.hosts.exclude(mapred-site,xml中,默认是null)这两个参数指定的文件名读取的。具体可参考JT源码1956行。
 
2.JobTracker.heartbeat():

String trackerName = status.getTrackerName();
    long now = clock.getTime();
    if (restarted) {
      faultyTrackers.markTrackerHealthy(status.getHost());
    } else {
      faultyTrackers.checkTrackerFaultTimeout(status.getHost(), now);
    }

这一步是检查TT是否重启,是重启的话标识该TT的状态为健康的,否则检查TT的健康状态。faultyTrackers.markTrackerHealthy(status.getHost())内部将该TT所在的Host上所有的TT(从这里可以看出hadoop考虑到一个Host上可能存在多个TT的可能)从黑名单,灰名单和可能存在错误的列表上删除,也就是从potentiallyFaultyTrackers队列中移除该Host,通过更新JT的numGraylistedTrackers/numBlacklistedTrackers数量以及JT的totalMapTaskCapacity和totalReduceTaskCapacity数量。至于如何检查TT健康状态,具体是根据JT上记录的关于TT执行任务失败的次数来判断的(具体不是太理解)。

--------------------------------------分割线 --------------------------------------

Ubuntu 13.04上搭建Hadoop环境 http://www.linuxidc.com/Linux/2013-06/86106.htm

Ubuntu 12.10 +Hadoop 1.2.1版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm

Ubuntu上搭建Hadoop环境(单机模式+伪分布模式) http://www.linuxidc.com/Linux/2013-01/77681.htm

Ubuntu下Hadoop环境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm

单机版搭建Hadoop环境图文教程详解 http://www.linuxidc.com/Linux/2012-02/53927.htm

--------------------------------------分割线 --------------------------------------
 
3.JobTracker.heartbeat():

HeartbeatResponse prevHeartbeatResponse =
      trackerToHeartbeatResponseMap.get(trackerName);
    boolean addRestartInfo = false;

    if (initialContact != true) {
      // If this isn't the 'initial contact' from the tasktracker,
      // there is something seriously wrong if the JobTracker has
      // no record of the 'previous heartbeat'; if so, ask the
      // tasktracker to re-initialize itself.
      if (prevHeartbeatResponse == null) {
        // This is the first heartbeat from the old tracker to the newly
        // started JobTracker
        if (hasRestarted()) {
          addRestartInfo = true;
          // inform the recovery manager about this tracker joining back
          recoveryManager.unMarkTracker(trackerName);
        } else {
          // Jobtracker might have restarted but no recovery is needed
          // otherwise this code should not be reached
          LOG.warn("Serious problem, cannot find record of 'previous' " +
                  "heartbeat for '" + trackerName +
                  "'; reinitializing the tasktracker");
          return new HeartbeatResponse(responseId,
              new TaskTrackerAction[] {new ReinitTrackerAction()});
        }

      } else {
               
        // It is completely safe to not process a 'duplicate' heartbeat from a
        // {@link TaskTracker} since it resends the heartbeat when rpcs are
        // lost see {@link TaskTracker.transmitHeartbeat()};
        // acknowledge it by re-sending the previous response to let the
        // {@link TaskTracker} go forward.
        if (prevHeartbeatResponse.getResponseId() != responseId) {
          LOG.info("Ignoring 'duplicate' heartbeat from '" +
              trackerName + "'; resending the previous 'lost' response");
          return prevHeartbeatResponse;
        }
      }
    }

此处第一句从JT记录的HeartbeatResponse队列中获取该TT的HeartbeatResponse信息,即判断JT之前是否收到过该TT的心跳请求。如果initialContact!=true,表示TT不是首次连接JT,同时如果prevHeartbeatResponse==null,根据注释可以知道如果TT不是首次连接JT,而且JT中并没有该TT之前的心跳请求信息,表明This is the first heartbeat from the old tracker to the newly started JobTracker。判断hasRestarted是否为true,hasRestarted是在JT初始化(initialize()方法)时,根据recoveryManager的shouldRecover来决定的,hasRestarted=shouldRecover,所以当需要进行job恢复时,addRestartInfo会被设置为true,即需要TT进行job恢复操作,同时从recoveryManager的recoveredTrackers队列中移除该TT。如果不需要进行任务恢复,则直接返回HeartbeatResponse,并对TT下重新初始化指令(后期介绍),注意此处返回的responseId还是原来的responseId,即responseId不变。上面说的都是prevHeartbeatResponse==null时的情况,下面说说prevHeartbeatResponse!=null时如何处理,当prevHeartbeatResponse!=null时会直接返回prevHeartbeatResponse,而忽略本次心跳请求。

更多详情见请继续阅读下一页的精彩内容http://www.linuxidc.com/Linux/2014-06/103217p2.htm

linux
相关资讯       Hadoop1.2.1  Hadoop源码解析 
本文评论   查看全部评论 (0)
表情: 表情 姓名: 字数

       

评论声明
  • 尊重网上道德,遵守中华人民共和国的各项有关法律法规
  • 承担一切因您的行为而直接或间接导致的民事或刑事法律责任
  • 本站管理人员有权保留或删除其管辖留言中的任意内容
  • 本站有权在网站内转载或引用您的评论
  • 参与本评论即表明您已经阅读并接受上述条款