데이터 베이스 MySQL 튜토리얼 Giraph源码分析启动ZooKeeper服务

Giraph源码分析启动ZooKeeper服务

Jun 07, 2016 pm 03:54 PM
zookeeper 분석하다 시작하다 제공하다 소스 코드

说明: (1) 实验环境. 三台服务器:test165、test62、test63。test165同时是JobTracker和TaskTracker. 测试例子:官网自带的SSSP程序,数据是自己模拟生成。 运行命令:hadoop jar giraph-examples-1.0.0-for-hadoop-0.20.203.0-jar-with-dependencies.jar o

说明:

(1) 实验环境.

三台服务器:test165、test62、test63。test165同时是JobTracker和TaskTracker.

测试例子:官网自带的SSSP程序,数据是自己模拟生成。

运行命令:hadoop jar giraph-examples-1.0.0-for-hadoop-0.20.203.0-jar-with-dependencies.jar org.apache.giraph.GiraphRunner org.apache.giraph.examples.SimpleShortestPathsVertex -vif org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexInputFormat -vip /user/giraph/SSSP -of org.apache.giraph.io.formats.IdWithValueTextOutputFormat -op /user/giraph/output-sssp-debug-7 -w 5

(2). 为节约空间,下文中所有代码均为核心代码片段。

(3). core-site.xml中hadoop.tmp.dir的路径设为:/home/hadoop/hadooptmp

(4).写本文是多次调试完成的,故文中的JobID不一样,读者可理解为同一JobID.

(5). 后续文章也遵循上述规则。

1. org.apache.giraph.graph.GraphMapper类

Giraph中自定义org.apache.giraph.graph.GraphMapper类来继承Hadoop中的 org.apache.hadoop.mapreduce.Mapper类,覆写了setup()、map()、cleanup()和run()方法。GraphMapper类的说明如下:

This mapper that will execute the BSP graph tasks alloted to this worker. All tasks will be performed by calling the GraphTaskManager object managed by this GraphMapper wrapper classs. Since this mapper will not be passing data by key-value pairs through the MR framework, the Mapper parameter types are irrelevant, and set to Object type.

BSP的运算逻辑被封装在GraphMapper类中,其拥有一GraphTaskManager对象,用来管理Job的tasks。每个GraphMapper对象都相当于BSP中的一个计算节点(compute node)。

在GraphMapper类中的setup()方法中,创建GraphTaskManager对象并调用其setup()方法进行一些初始化工作。如下:

  @Override
  public void setup(Context context)
    throws IOException, InterruptedException {
    // Execute all Giraph-related role(s) assigned to this compute node.
    // Roles can include "master," "worker," "zookeeper," or . . . ?
    graphTaskManager = new GraphTaskManager<I, V, E, M>(context);
    graphTaskManager.setup(
      DistributedCache.getLocalCacheArchives(context.getConfiguration()));
  }
로그인 후 복사
map()方法为空,因为所有操作都被封装在了GraphTaskManager类中。在run()方法中调用GraphTaskManager对象的execute()方法进行BSP迭代计算。
@Override
  public void run(Context context) throws IOException, InterruptedException {
    // Notify the master quicker if there is worker failure rather than
    // waiting for ZooKeeper to timeout and delete the ephemeral znodes
    try {
      setup(context);
      while (context.nextKeyValue()) {
        graphTaskManager.execute();
      }
      cleanup(context);
      // Checkstyle exception due to needing to dump ZooKeeper failure
    } catch (RuntimeException e) {
      graphTaskManager.zooKeeperCleanup();
      graphTaskManager.workerFailureCleanup();
    }
  }
로그인 후 복사

2. org.apache.giraph.graph.GraphTaskManager 类

功能:The Giraph-specific business logic for a single BSP compute node in whatever underlying type of cluster our Giraph job will run on. Owning object will provide the glue into the underlying cluster framework and will call this object to perform Giraph work.

下面讲述setup()方法,代码如下。

 /**
   * Called by owner of this GraphTaskManager on each compute node
   * @param zkPathList the path to the ZK jars we need to run the job
   */
  public void setup(Path[] zkPathList) throws IOException, InterruptedException {
    context.setStatus("setup: Initializing Zookeeper services.");
    locateZookeeperClasspath(zkPathList);
    serverPortList = conf.getZookeeperList();
    if (serverPortList == null && startZooKeeperManager()) {
      return; // ZK connect/startup failed
    }
    if (zkManager != null && zkManager.runsZooKeeper()) {
        LOG.info("setup: Chosen to run ZooKeeper...");
    }
    context.setStatus("setup: Connected to Zookeeper service " +serverPortList);
    this.graphFunctions = determineGraphFunctions(conf, zkManager);
    instantiateBspService(serverPortList, sessionMsecTimeout);
  }
로그인 후 복사
依次介绍每个方法的功能:

1) locateZookeeperClasspath(zkPathList):找到ZK jar的本地副本,其路径为:/home/hadoop/hadooptmp/mapred/local/taskTracker/root/jobcache/job_201403270456_0001/jars/job.jar ,用于启动ZooKeeper服务。
2) startZooKeeperManager(),初始化和配置ZooKeeperManager。定义如下,

 /**
   * Instantiate and configure ZooKeeperManager for this job. This will
   * result in a Giraph-owned Zookeeper instance, a connection to an
   * existing quorum as specified in the job configuration, or task failure
   * @return true if this task should terminate
   */
  private boolean startZooKeeperManager()
    throws IOException, InterruptedException {
    zkManager = new ZooKeeperManager(context, conf);
    context.setStatus("setup: Setting up Zookeeper manager.");
    zkManager.setup();
    if (zkManager.computationDone()) {
      done = true;
      return true;
    }
    zkManager.onlineZooKeeperServers();
    serverPortList = zkManager.getZooKeeperServerPortString();
    return false;
  }
로그인 후 복사

org.apache.giraph.zk.ZooKeeperManager 类,功能:Manages the election of ZooKeeper servers, starting/stopping the services, etc.

ZooKeeperManager类的setup()定义如下:

/**
   * Create the candidate stamps and decide on the servers to start if
   * you are partition 0.
   */
  public void setup() throws IOException, InterruptedException {
    createCandidateStamp();
    getZooKeeperServerList();
  }
로그인 후 복사
createCandidateStamp()方法在 HDFS上 的_bsp/_defaultZkManagerDir/job_201403301409_0006/_task 目录下为每个task创建一个文件,文件内容为空。文件名为本机的Hostname+taskPartition,如下截图:

\

运行时指定了5个workers(-w 5),再加上一个master,所有上面有6个task。

getZooKeeperServerList()方法中,taskPartition为0的task会调用createZooKeeperServerList()方法创建ZooKeeper server List,也是创建一个空文件,通过文件名来描述Zookeeper servers。

\

createZooKeeperServerList核心代码如下:

/**
   * Task 0 will call this to create the ZooKeeper server list.  The result is
   * a file that describes the ZooKeeper servers through the filename.
   */
  private void createZooKeeperServerList() throws IOException,
      InterruptedException {
    Map<String, Integer> hostnameTaskMap = Maps.newTreeMap();
    while (true) {
      FileStatus [] fileStatusArray = fs.listStatus(taskDirectory);
      hostnameTaskMap.clear();
      if (fileStatusArray.length > 0) {
        for (FileStatus fileStatus : fileStatusArray) {  
          String[] hostnameTaskArray =
              fileStatus.getPath().getName().split(HOSTNAME_TASK_SEPARATOR);
   
          if (!hostnameTaskMap.containsKey(hostnameTaskArray[0])) {
            hostnameTaskMap.put(hostnameTaskArray[0],
                new Integer(hostnameTaskArray[1]));
          }
        }
        if (hostnameTaskMap.size() >= serverCount) {
          break;
        }
        Thread.sleep(pollMsecs);
      }
    }
  }
로그인 후 복사
首先获取taskDirectory(_bsp/_defaultZkManagerDir/job_201403301409_0006/_task)目录下文件,如果当前目录下有文件,则把文件名(Hostname+taskPartition)中的Hostname和taskPartition存入到hostNameTaskMap中。扫描taskDirectory目录后,若hostNameTaskMap的size大于serverCount(等于GiraphConstants.java中的ZOOKEEPER_SERVER_COUNT变量,定义为1),就停止外层的循环。外层循环的目的是:因为taskDirectory下的文件每个task文件时多个task在分布式条件下创建的,有可能task 0在此创建server List时,别的task还没有生成后task文件。Giraph默认为每个Job启动一个ZooKeeper服务,也就是说只有一个task会启动ZooKeeper服务。

经过多次测试,task 0总是被选为ZooKeeper Server ,因为在同一进程中,扫描taskDirectory时,只有它对应的task 文件(其他task的文件还没有生成好),然后退出for循环,发现hostNameTaskMap的size等于1,直接退出while循环。那么此处就选了test162 0。

最后,创建了文件:_bsp/_defaultZkManagerDir/job_201403301409_0006/zkServerList_test162 0

\

onlineZooKeeperServers(),根据zkServerList_test162 0文件,Task 0 先生成zoo.cfg配置文件,使用ProcessBuilder来创建ZooKeeper服务进程,然后Task 0 再通过socket连接到ZooKeeper服务进程上,最后创建文件 _bsp/_defaultZkManagerDir/job_201403301409_0006/_zkServer/test162 0 来标记master任务已完成。worker一直在进行循环检测master是否生成好 _bsp/_defaultZkManagerDir/job_201403301409_0006/_zkServer/test162 0即worker等待直到master上的ZooKeeper服务已经启动完成。

启动ZooKeeper服务的命令如下:

\

3) determineGraphFunctions()。

GraphTaskManager类中有CentralizedServiceMaster对象和CentralizedServiceWorker 对象,分别对应于master和worker。每个BSP compute node扮演的角色判定逻辑如下:

a) If not split master, everyone does the everything and/or running ZooKeeper.

b) If split master/worker, masters also run ZooKeeper

c) If split master/worker == true and giraph.zkList is set, the master will not instantiate a ZK instance, but will assume a quorum is already active on the cluster for Giraph to use.

该判定在GraphTaskManager 类中的静态方法determineGraphFunctions()中定义,片段代码如下:

 private static GraphFunctions determineGraphFunctions(
      ImmutableClassesGiraphConfiguration conf,
      ZooKeeperManager zkManager) {
    // What functions should this mapper do?
    if (!splitMasterWorker) {
      if ((zkManager != null) && zkManager.runsZooKeeper()) {
        functions = GraphFunctions.ALL;
      } else {
        functions = GraphFunctions.ALL_EXCEPT_ZOOKEEPER;
      }
    } else {
      if (zkAlreadyProvided) {
        int masterCount = conf.getZooKeeperServerCount();
        if (taskPartition < masterCount) {
          functions = GraphFunctions.MASTER_ONLY;
        } else {
          functions = GraphFunctions.WORKER_ONLY;
        }
      } else {
        if ((zkManager != null) && zkManager.runsZooKeeper()) {
          functions = GraphFunctions.MASTER_ZOOKEEPER_ONLY;
        } else {
          functions = GraphFunctions.WORKER_ONLY;
        }
      }
    }
    return functions;
  }
로그인 후 복사

默认的,Giraph会区分master和worker。会在master上面启动zookeeper服务,不会在worker上启动ZooKeeper服务。那么Task 0 就是master+ZooKeeper,其他Tasks就是workers。

본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.

핫 AI 도구

Undresser.AI Undress

Undresser.AI Undress

사실적인 누드 사진을 만들기 위한 AI 기반 앱

AI Clothes Remover

AI Clothes Remover

사진에서 옷을 제거하는 온라인 AI 도구입니다.

Undress AI Tool

Undress AI Tool

무료로 이미지를 벗다

Clothoff.io

Clothoff.io

AI 옷 제거제

AI Hentai Generator

AI Hentai Generator

AI Hentai를 무료로 생성하십시오.

인기 기사

R.E.P.O. 에너지 결정과 그들이하는 일 (노란색 크리스탈)
3 몇 주 전 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. 최고의 그래픽 설정
3 몇 주 전 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. 아무도들을 수없는 경우 오디오를 수정하는 방법
3 몇 주 전 By 尊渡假赌尊渡假赌尊渡假赌
WWE 2K25 : Myrise에서 모든 것을 잠금 해제하는 방법
4 몇 주 전 By 尊渡假赌尊渡假赌尊渡假赌

뜨거운 도구

메모장++7.3.1

메모장++7.3.1

사용하기 쉬운 무료 코드 편집기

SublimeText3 중국어 버전

SublimeText3 중국어 버전

중국어 버전, 사용하기 매우 쉽습니다.

스튜디오 13.0.1 보내기

스튜디오 13.0.1 보내기

강력한 PHP 통합 개발 환경

드림위버 CS6

드림위버 CS6

시각적 웹 개발 도구

SublimeText3 Mac 버전

SublimeText3 Mac 버전

신 수준의 코드 편집 소프트웨어(SublimeText3)

wps가 이 객체의 소스 애플리케이션을 시작할 수 없으면 어떻게 해야 합니까? wps가 이 객체의 소스 애플리케이션을 시작할 수 없으면 어떻게 해야 합니까? Mar 13, 2024 pm 09:13 PM

WPS는 문서, 양식, PPT 등 매우 널리 사용되는 사무용 소프트웨어이며 다중 터미널 동기화를 지원합니다. wps 편집 시 "이 개체에 대한 소스 응용 프로그램을 시작할 수 없습니다"라는 메시지가 나타나는 경우 어떻게 해결합니까? 이 문제는 링크나 파일을 열려고 하지만 해당 소스 응용 프로그램이 더 이상 존재하지 않거나 삭제되었기 때문에 발생할 수 있습니다. 1. WPS 소프트웨어 재설치: WPSOffice를 재설치하여 문제를 해결하고 최신 버전을 사용하고 있는지 확인하십시오. 2. 기본 프로그램 수동 변경: 기본 프로그램을 WPS로 변경해 보십시오. 열려는 파일을 마우스 오른쪽 버튼으로 클릭하고 "다음으로 열기"를 선택한 다음

Apple 듀얼 시스템의 부팅 우선순위를 설정하는 방법 Apple 듀얼 시스템의 부팅 우선순위를 설정하는 방법 Feb 19, 2024 pm 06:49 PM

기술이 계속 발전함에 따라 다양한 운영 체제를 사용해야 하는 필요성이 점점 더 일반화되고 있습니다. Apple 사용자의 경우 때로는 macOS 및 Windows와 같이 하나의 장치에 두 가지 다른 운영 체제를 설치하고 사용해야 할 수도 있습니다. 이 경우 이중 시스템의 시작 순서를 설정하는 것이 특히 중요합니다. 이 문서에서는 장치를 켤 때 먼저 듀얼 시스템을 시작하도록 Apple 장치를 설정하는 방법을 소개합니다. 먼저, 두 운영 체제가 모두 Apple 장치에 성공적으로 설치되었는지 확인해야 합니다. 이 Apple에서는 BootCamp를 사용할 수 있습니다.

WallpaperEngine을 시작할 때 무엇을 선택해야 합니까? WallpaperEngine을 시작할 때 무엇을 선택해야 합니까? Mar 19, 2024 am 08:49 AM

wallpaperengine이 시작되면 4가지 옵션이 있습니다. 많은 사용자는 wallpaperengine을 시작할 때 무엇을 선택해야 할지 모릅니다. 일반적으로 wallpaperengine이 시작되면 첫 번째 옵션인 32비트 시작을 선택합니다. wallpaperengine을 시작할 때 무엇을 선택해야 합니까? 답변: 32비트로 시작하세요. 1. 일반적으로 wallpaperengine이 시작되면 첫 번째 항목인 32비트 시작을 선택합니다. 2. wallpaperengine이 시작되면 4가지 옵션이 있습니다: 32비트 시작, 64비트 시작. 3. 32비트 시작: 일반적으로 권장되는 옵션이며 대부분의 사용자에게 적합합니다. 4. 64비트 시작: 시스템이 64비트를 지원하는 경우 이 옵션을 선택할 수 있습니다.

Linux에서 서비스를 다시 시작하는 올바른 방법은 무엇입니까? Linux에서 서비스를 다시 시작하는 올바른 방법은 무엇입니까? Mar 15, 2024 am 09:09 AM

Linux에서 서비스를 다시 시작하는 올바른 방법은 무엇입니까? Linux 시스템을 사용하다 보면 서비스를 다시 시작해야 하는 상황이 자주 발생하지만, 서비스를 다시 시작할 때 서비스가 실제로 중지되지 않거나 시작되지 않는 등의 문제가 발생할 수도 있습니다. 따라서 서비스를 다시 시작하는 올바른 방법을 익히는 것이 매우 중요합니다. Linux에서는 일반적으로 systemctl 명령을 사용하여 시스템 서비스를 관리할 수 있습니다. systemctl 명령은 systemd 시스템 관리자의 일부입니다.

Ubuntu PHP 서비스가 정상적으로 시작되지 않는 문제에 대한 솔루션 Ubuntu PHP 서비스가 정상적으로 시작되지 않는 문제에 대한 솔루션 Feb 28, 2024 am 10:48 AM

제목: Ubuntu에서 PHP 서비스가 정상적으로 시작되지 않는 문제를 해결하기 위한 방법 및 구체적인 코드 예제 Ubuntu를 사용하여 웹사이트나 애플리케이션을 구축할 때 PHP 서비스가 정상적으로 시작되지 않아 웹사이트가 작동하지 않는 문제가 자주 발생합니다. 정상적으로 접속할 수 없거나 응용프로그램이 정상적으로 실행되지 않습니다. 이 글에서는 Ubuntu에서 PHP 서비스가 정상적으로 시작되지 않는 문제를 해결하는 방법을 소개하고, 이러한 오류를 독자가 빠르게 해결할 수 있도록 구체적인 코드 예제를 제공합니다. 1. PHP 구성 파일 확인 먼저 PHP 구성 파일을 확인해야 합니다.

PHP 코드의 소스 코드를 해석 및 실행하지 않고 브라우저에 표시하는 방법은 무엇입니까? PHP 코드의 소스 코드를 해석 및 실행하지 않고 브라우저에 표시하는 방법은 무엇입니까? Mar 11, 2024 am 10:54 AM

PHP 코드의 소스 코드를 해석 및 실행하지 않고 브라우저에 표시하는 방법은 무엇입니까? PHP는 동적 웹 페이지를 개발하는 데 일반적으로 사용되는 서버 측 스크립팅 언어입니다. 서버에서 PHP 파일이 요청되면 서버는 그 안에 있는 PHP 코드를 해석하고 실행한 후 최종 HTML 콘텐츠를 브라우저에 보내 표시합니다. 그러나 때때로 PHP 파일의 소스 코드를 실행하는 대신 브라우저에 직접 표시하고 싶을 때가 있습니다. 이 기사에서는 PHP 코드의 소스 코드를 해석 및 실행하지 않고 브라우저에 표시하는 방법을 소개합니다. PHP에서는 다음을 사용할 수 있습니다.

DreamWeaver CMS의 보조 디렉토리를 열 수 없는 이유 분석 DreamWeaver CMS의 보조 디렉토리를 열 수 없는 이유 분석 Mar 13, 2024 pm 06:24 PM

제목: DreamWeaver CMS의 보조 디렉터리를 열 수 없는 이유와 해결 방법 분석 Dreamweaver CMS(DedeCMS)는 다양한 웹 사이트 구축에 널리 사용되는 강력한 오픈 소스 콘텐츠 관리 시스템입니다. 그러나 때로는 웹사이트를 구축하는 과정에서 보조 디렉토리를 열 수 없는 상황이 발생할 수 있으며, 이로 인해 웹사이트의 정상적인 작동에 문제가 발생할 수 있습니다. 이 기사에서는 보조 디렉터리를 열 수 없는 가능한 이유를 분석하고 이 문제를 해결하기 위한 구체적인 코드 예제를 제공합니다. 1. 예상 원인 분석: 의사 정적 규칙 구성 문제: 사용 중

Linux에서 서비스 다시 시작 명령을 실행하는 방법은 무엇입니까? Linux에서 서비스 다시 시작 명령을 실행하는 방법은 무엇입니까? Mar 14, 2024 am 11:06 AM

Linux에서 서비스 재시작 명령을 실행하려면 일반적으로 Systemd 서비스 관리자를 사용해야 합니다. Systemd는 Linux에서 널리 사용되는 서비스 관리 도구로, 시스템 서비스를 쉽게 관리하고 제어할 수 있습니다. 다음은 Linux에서 Systemd를 통해 서비스 재시작 명령을 실행하는 방법을 소개하고 구체적인 코드 예제를 제공합니다. 1단계: 서비스 이름 확인 서비스 다시 시작 명령을 실행하기 전에 먼저 다시 시작할 서비스 이름을 확인해야 합니다. 다음 명령을 사용하여 시스템에서 실행 중인 서비스 목록을 볼 수 있습니다.

See all articles