首頁 資料庫 mysql教程 Giraph源码分析启动ZooKeeper服务

Giraph源码分析启动ZooKeeper服务

Jun 07, 2016 pm 03:54 PM
zookeeper 分析 啟動 服務 原始碼

说明: (1) 实验环境. 三台服务器:test165、test62、test63。test165同时是JobTracker和TaskTracker. 测试例子:官网自带的SSSP程序,数据是自己模拟生成。 运行命令:hadoop jar giraph-examples-1.0.0-for-hadoop-0.20.203.0-jar-with-dependencies.jar o

说明:

(1) 实验环境.

三台服务器:test165、test62、test63。test165同时是JobTracker和TaskTracker.

测试例子:官网自带的SSSP程序,数据是自己模拟生成。

运行命令:hadoop jar giraph-examples-1.0.0-for-hadoop-0.20.203.0-jar-with-dependencies.jar org.apache.giraph.GiraphRunner org.apache.giraph.examples.SimpleShortestPathsVertex -vif org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexInputFormat -vip /user/giraph/SSSP -of org.apache.giraph.io.formats.IdWithValueTextOutputFormat -op /user/giraph/output-sssp-debug-7 -w 5

(2). 为节约空间,下文中所有代码均为核心代码片段。

(3). core-site.xml中hadoop.tmp.dir的路径设为:/home/hadoop/hadooptmp

(4).写本文是多次调试完成的,故文中的JobID不一样,读者可理解为同一JobID.

(5). 后续文章也遵循上述规则。

1. org.apache.giraph.graph.GraphMapper类

Giraph中自定义org.apache.giraph.graph.GraphMapper类来继承Hadoop中的 org.apache.hadoop.mapreduce.Mapper类,覆写了setup()、map()、cleanup()和run()方法。GraphMapper类的说明如下:

This mapper that will execute the BSP graph tasks alloted to this worker. All tasks will be performed by calling the GraphTaskManager object managed by this GraphMapper wrapper classs. Since this mapper will not be passing data by key-value pairs through the MR framework, the Mapper parameter types are irrelevant, and set to Object type.

BSP的运算逻辑被封装在GraphMapper类中,其拥有一GraphTaskManager对象,用来管理Job的tasks。每个GraphMapper对象都相当于BSP中的一个计算节点(compute node)。

在GraphMapper类中的setup()方法中,创建GraphTaskManager对象并调用其setup()方法进行一些初始化工作。如下:

  @Override
  public void setup(Context context)
    throws IOException, InterruptedException {
    // Execute all Giraph-related role(s) assigned to this compute node.
    // Roles can include "master," "worker," "zookeeper," or . . . ?
    graphTaskManager = new GraphTaskManager<I, V, E, M>(context);
    graphTaskManager.setup(
      DistributedCache.getLocalCacheArchives(context.getConfiguration()));
  }
登入後複製
map()方法为空,因为所有操作都被封装在了GraphTaskManager类中。在run()方法中调用GraphTaskManager对象的execute()方法进行BSP迭代计算。
@Override
  public void run(Context context) throws IOException, InterruptedException {
    // Notify the master quicker if there is worker failure rather than
    // waiting for ZooKeeper to timeout and delete the ephemeral znodes
    try {
      setup(context);
      while (context.nextKeyValue()) {
        graphTaskManager.execute();
      }
      cleanup(context);
      // Checkstyle exception due to needing to dump ZooKeeper failure
    } catch (RuntimeException e) {
      graphTaskManager.zooKeeperCleanup();
      graphTaskManager.workerFailureCleanup();
    }
  }
登入後複製

2. org.apache.giraph.graph.GraphTaskManager 类

功能:The Giraph-specific business logic for a single BSP compute node in whatever underlying type of cluster our Giraph job will run on. Owning object will provide the glue into the underlying cluster framework and will call this object to perform Giraph work.

下面讲述setup()方法,代码如下。

 /**
   * Called by owner of this GraphTaskManager on each compute node
   * @param zkPathList the path to the ZK jars we need to run the job
   */
  public void setup(Path[] zkPathList) throws IOException, InterruptedException {
    context.setStatus("setup: Initializing Zookeeper services.");
    locateZookeeperClasspath(zkPathList);
    serverPortList = conf.getZookeeperList();
    if (serverPortList == null && startZooKeeperManager()) {
      return; // ZK connect/startup failed
    }
    if (zkManager != null && zkManager.runsZooKeeper()) {
        LOG.info("setup: Chosen to run ZooKeeper...");
    }
    context.setStatus("setup: Connected to Zookeeper service " +serverPortList);
    this.graphFunctions = determineGraphFunctions(conf, zkManager);
    instantiateBspService(serverPortList, sessionMsecTimeout);
  }
登入後複製
依次介绍每个方法的功能:

1) locateZookeeperClasspath(zkPathList):找到ZK jar的本地副本,其路径为:/home/hadoop/hadooptmp/mapred/local/taskTracker/root/jobcache/job_201403270456_0001/jars/job.jar ,用于启动ZooKeeper服务。
2) startZooKeeperManager(),初始化和配置ZooKeeperManager。定义如下,

 /**
   * Instantiate and configure ZooKeeperManager for this job. This will
   * result in a Giraph-owned Zookeeper instance, a connection to an
   * existing quorum as specified in the job configuration, or task failure
   * @return true if this task should terminate
   */
  private boolean startZooKeeperManager()
    throws IOException, InterruptedException {
    zkManager = new ZooKeeperManager(context, conf);
    context.setStatus("setup: Setting up Zookeeper manager.");
    zkManager.setup();
    if (zkManager.computationDone()) {
      done = true;
      return true;
    }
    zkManager.onlineZooKeeperServers();
    serverPortList = zkManager.getZooKeeperServerPortString();
    return false;
  }
登入後複製

org.apache.giraph.zk.ZooKeeperManager 类,功能:Manages the election of ZooKeeper servers, starting/stopping the services, etc.

ZooKeeperManager类的setup()定义如下:

/**
   * Create the candidate stamps and decide on the servers to start if
   * you are partition 0.
   */
  public void setup() throws IOException, InterruptedException {
    createCandidateStamp();
    getZooKeeperServerList();
  }
登入後複製
createCandidateStamp()方法在 HDFS上 的_bsp/_defaultZkManagerDir/job_201403301409_0006/_task 目录下为每个task创建一个文件,文件内容为空。文件名为本机的Hostname+taskPartition,如下截图:

\

运行时指定了5个workers(-w 5),再加上一个master,所有上面有6个task。

getZooKeeperServerList()方法中,taskPartition为0的task会调用createZooKeeperServerList()方法创建ZooKeeper server List,也是创建一个空文件,通过文件名来描述Zookeeper servers。

\

createZooKeeperServerList核心代码如下:

/**
   * Task 0 will call this to create the ZooKeeper server list.  The result is
   * a file that describes the ZooKeeper servers through the filename.
   */
  private void createZooKeeperServerList() throws IOException,
      InterruptedException {
    Map<String, Integer> hostnameTaskMap = Maps.newTreeMap();
    while (true) {
      FileStatus [] fileStatusArray = fs.listStatus(taskDirectory);
      hostnameTaskMap.clear();
      if (fileStatusArray.length > 0) {
        for (FileStatus fileStatus : fileStatusArray) {  
          String[] hostnameTaskArray =
              fileStatus.getPath().getName().split(HOSTNAME_TASK_SEPARATOR);
   
          if (!hostnameTaskMap.containsKey(hostnameTaskArray[0])) {
            hostnameTaskMap.put(hostnameTaskArray[0],
                new Integer(hostnameTaskArray[1]));
          }
        }
        if (hostnameTaskMap.size() >= serverCount) {
          break;
        }
        Thread.sleep(pollMsecs);
      }
    }
  }
登入後複製
首先获取taskDirectory(_bsp/_defaultZkManagerDir/job_201403301409_0006/_task)目录下文件,如果当前目录下有文件,则把文件名(Hostname+taskPartition)中的Hostname和taskPartition存入到hostNameTaskMap中。扫描taskDirectory目录后,若hostNameTaskMap的size大于serverCount(等于GiraphConstants.java中的ZOOKEEPER_SERVER_COUNT变量,定义为1),就停止外层的循环。外层循环的目的是:因为taskDirectory下的文件每个task文件时多个task在分布式条件下创建的,有可能task 0在此创建server List时,别的task还没有生成后task文件。Giraph默认为每个Job启动一个ZooKeeper服务,也就是说只有一个task会启动ZooKeeper服务。

经过多次测试,task 0总是被选为ZooKeeper Server ,因为在同一进程中,扫描taskDirectory时,只有它对应的task 文件(其他task的文件还没有生成好),然后退出for循环,发现hostNameTaskMap的size等于1,直接退出while循环。那么此处就选了test162 0。

最后,创建了文件:_bsp/_defaultZkManagerDir/job_201403301409_0006/zkServerList_test162 0

\

onlineZooKeeperServers(),根据zkServerList_test162 0文件,Task 0 先生成zoo.cfg配置文件,使用ProcessBuilder来创建ZooKeeper服务进程,然后Task 0 再通过socket连接到ZooKeeper服务进程上,最后创建文件 _bsp/_defaultZkManagerDir/job_201403301409_0006/_zkServer/test162 0 来标记master任务已完成。worker一直在进行循环检测master是否生成好 _bsp/_defaultZkManagerDir/job_201403301409_0006/_zkServer/test162 0即worker等待直到master上的ZooKeeper服务已经启动完成。

启动ZooKeeper服务的命令如下:

\

3) determineGraphFunctions()。

GraphTaskManager类中有CentralizedServiceMaster对象和CentralizedServiceWorker 对象,分别对应于master和worker。每个BSP compute node扮演的角色判定逻辑如下:

a) If not split master, everyone does the everything and/or running ZooKeeper.

b) If split master/worker, masters also run ZooKeeper

c) If split master/worker == true and giraph.zkList is set, the master will not instantiate a ZK instance, but will assume a quorum is already active on the cluster for Giraph to use.

该判定在GraphTaskManager 类中的静态方法determineGraphFunctions()中定义,片段代码如下:

 private static GraphFunctions determineGraphFunctions(
      ImmutableClassesGiraphConfiguration conf,
      ZooKeeperManager zkManager) {
    // What functions should this mapper do?
    if (!splitMasterWorker) {
      if ((zkManager != null) && zkManager.runsZooKeeper()) {
        functions = GraphFunctions.ALL;
      } else {
        functions = GraphFunctions.ALL_EXCEPT_ZOOKEEPER;
      }
    } else {
      if (zkAlreadyProvided) {
        int masterCount = conf.getZooKeeperServerCount();
        if (taskPartition < masterCount) {
          functions = GraphFunctions.MASTER_ONLY;
        } else {
          functions = GraphFunctions.WORKER_ONLY;
        }
      } else {
        if ((zkManager != null) && zkManager.runsZooKeeper()) {
          functions = GraphFunctions.MASTER_ZOOKEEPER_ONLY;
        } else {
          functions = GraphFunctions.WORKER_ONLY;
        }
      }
    }
    return functions;
  }
登入後複製

默认的,Giraph会区分master和worker。会在master上面启动zookeeper服务,不会在worker上启动ZooKeeper服务。那么Task 0 就是master+ZooKeeper,其他Tasks就是workers。

本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

Video Face Swap

Video Face Swap

使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱工具

記事本++7.3.1

記事本++7.3.1

好用且免費的程式碼編輯器

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

禪工作室 13.0.1

禪工作室 13.0.1

強大的PHP整合開發環境

Dreamweaver CS6

Dreamweaver CS6

視覺化網頁開發工具

SublimeText3 Mac版

SublimeText3 Mac版

神級程式碼編輯軟體(SublimeText3)

wps不能啟動此物件的來源應用程式怎麼辦? wps不能啟動此物件的來源應用程式怎麼辦? Mar 13, 2024 pm 09:13 PM

  wps是使用非常廣泛的辦公室軟體,包括了文件、表格和PPT,並且支援多端同步。如果在編輯wps時出現提示“無法啟動此物件的來源應用程式”,則如何解決?出現這個問題可能是因為你正在嘗試打開一個連結或文件,但是它的來源應用程式已經不存在或被刪除了。  以下是一些修復方法:  1、重新安裝WPS軟體:嘗試重裝WPSOffice來修復該問題,確保您使用的是最新版本。  2、手動更改預設程式:試著將預設程式更改為WPS,可以按右鍵單擊需要打開的文件,選擇“打開方式”,然

設定蘋果雙系統啟動優先順序的方法 設定蘋果雙系統啟動優先順序的方法 Feb 19, 2024 pm 06:49 PM

隨著科技的不斷發展,使用不同作業系統的需求也越來越普遍。對於蘋果用戶來說,有時可能需要在一台裝置上安裝並使用兩個不同的作業系統,如macOS和Windows。在這種情況下,設定雙系統的啟動順序就顯得格外重要。本文將介紹如何設定蘋果設備在開機時優先啟動雙系統。首先,我們需要確保已經在蘋果設備上成功安裝了兩個作業系統。你可以使用BootCamp這個蘋

wallpaperengine啟動選哪一個 wallpaperengine啟動選哪一個 Mar 19, 2024 am 08:49 AM

wallpaperengine啟動時,有4種不同的選項,有許多使用者不知道wallpaperengine啟動選哪一個,一般wallpaperengine啟動時選擇第一個:啟動32位元即可。 wallpaperengine啟動選哪一個答:啟動32位元。 1.一般wallpaperengine啟動時選擇第一個:啟動32位元即可。 2.wallpaperengine啟動時,有4種不同的選項:啟動32位元;啟動64位元。 3.啟動32位元:這是一般建議的選項,適用於大多數使用者。 4.啟動64位:如果系統支援64位,可以選擇此選

Linux 重啟服務的正確方式是什麼? Linux 重啟服務的正確方式是什麼? Mar 15, 2024 am 09:09 AM

Linux重啟服務的正確方式是什麼?在使用Linux系統時,經常會遇到需要重新啟動某個服務的情況,但是有時我們可能會在重新啟動服務時遇到一些問題,例如服務沒有真正停止或啟動等情況。因此,掌握正確的重啟服務的方式是非常重要的。在Linux中,通常可以使用systemctl指令來管理系統服務。 systemctl指令是systemd系統管理員的一部分

PHP程式碼在瀏覽器中如何顯示原始碼而不被解釋執行? PHP程式碼在瀏覽器中如何顯示原始碼而不被解釋執行? Mar 11, 2024 am 10:54 AM

PHP程式碼在瀏覽器中如何顯示原始碼而不被解釋執行? PHP是一種伺服器端腳本語言,通常用於開發動態網頁。當PHP檔案在伺服器上被要求時,伺服器會解釋執行其中的PHP程式碼,並將最終的HTML內容傳送到瀏覽器以供顯示。然而,有時我們希望在瀏覽器中直接展示PHP檔案的原始碼,而不是被執行。本文將介紹如何在瀏覽器中顯示PHP程式碼的源碼,而不被解釋執行。在PHP中,可以使

織夢CMS二級目錄打不開的原因分析 織夢CMS二級目錄打不開的原因分析 Mar 13, 2024 pm 06:24 PM

標題:解析織夢CMS二級目錄打不開的原因及解決方案織夢CMS(DedeCMS)是一款功能強大的開源內容管理系統,被廣泛應用於各類網站建設中。然而,有時在搭建網站過程中可能會遇到二級目錄無法開啟的情況,這給網站的正常運作帶來了困擾。在本文中,我們將分析二級目錄打不開的可能原因,並提供具體的程式碼範例來解決這個問題。一、可能的原因分析:偽靜態規則配置問題:在使用

Ubuntu PHP服務無法正常啟動的解決辦法 Ubuntu PHP服務無法正常啟動的解決辦法 Feb 28, 2024 am 10:48 AM

標題:解決Ubuntu下PHP服務無法正常啟動的方法及具體程式碼範例在使用Ubuntu建立網站或應用程式時,經常會遇到PHP服務無法正常啟動的問題,這會導致網站無法正常存取或應用程式無法正常運行。本文將介紹如何解決Ubuntu下PHP服務無法正常啟動的問題,同時提供具體的程式碼範例幫助讀者快速解決此類故障。一、檢查PHP設定檔首先,我們需要檢查PHP的設定文件

分析騰訊主要的程式語言是否為Go 分析騰訊主要的程式語言是否為Go Mar 27, 2024 pm 04:21 PM

標題:騰訊主要的程式語言是否為Go:一項深入分析騰訊作為中國領先的科技公司,在程式語言的選擇上一直備受關注。近年來,有人認為騰訊主要採用Go作為主要的程式語言。本文將對騰訊主要的程式語言是否為Go進行深入分析,並給出具體的程式碼範例來支持這一觀點。一、Go語言在騰訊的應用Go是一種由Google開發的開源程式語言,它的高效性、並發性和簡潔性受到眾多開發者的喜

See all articles