Giraph源码分析启动ZooKeeper服务
说明: (1) 实验环境. 三台服务器:test165、test62、test63。test165同时是JobTracker和TaskTracker. 测试例子:官网自带的SSSP程序,数据是自己模拟生成。 运行命令:hadoop jar giraph-examples-1.0.0-for-hadoop-0.20.203.0-jar-with-dependencies.jar o
说明:
(1) 实验环境.
三台服务器:test165、test62、test63。test165同时是JobTracker和TaskTracker.
测试例子:官网自带的SSSP程序,数据是自己模拟生成。
运行命令:hadoop jar giraph-examples-1.0.0-for-hadoop-0.20.203.0-jar-with-dependencies.jar org.apache.giraph.GiraphRunner org.apache.giraph.examples.SimpleShortestPathsVertex -vif org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexInputFormat -vip /user/giraph/SSSP -of org.apache.giraph.io.formats.IdWithValueTextOutputFormat -op /user/giraph/output-sssp-debug-7 -w 5
(2). 为节约空间,下文中所有代码均为核心代码片段。
(3). core-site.xml中hadoop.tmp.dir的路径设为:/home/hadoop/hadooptmp
(4).写本文是多次调试完成的,故文中的JobID不一样,读者可理解为同一JobID.
(5). 后续文章也遵循上述规则。
1. org.apache.giraph.graph.GraphMapper类
Giraph中自定义org.apache.giraph.graph.GraphMapper类来继承Hadoop中的 org.apache.hadoop.mapreduce.Mapper
This mapper that will execute the BSP graph tasks alloted to this worker. All tasks will be performed by calling the GraphTaskManager object managed by this GraphMapper wrapper classs. Since this mapper will not be passing data by key-value pairs through the MR framework, the Mapper parameter types are irrelevant, and set to Object type.
BSP的运算逻辑被封装在GraphMapper类中,其拥有一GraphTaskManager对象,用来管理Job的tasks。每个GraphMapper对象都相当于BSP中的一个计算节点(compute node)。
在GraphMapper类中的setup()方法中,创建GraphTaskManager对象并调用其setup()方法进行一些初始化工作。如下:
@Override public void setup(Context context) throws IOException, InterruptedException { // Execute all Giraph-related role(s) assigned to this compute node. // Roles can include "master," "worker," "zookeeper," or . . . ? graphTaskManager = new GraphTaskManager<I, V, E, M>(context); graphTaskManager.setup( DistributedCache.getLocalCacheArchives(context.getConfiguration())); }
@Override public void run(Context context) throws IOException, InterruptedException { // Notify the master quicker if there is worker failure rather than // waiting for ZooKeeper to timeout and delete the ephemeral znodes try { setup(context); while (context.nextKeyValue()) { graphTaskManager.execute(); } cleanup(context); // Checkstyle exception due to needing to dump ZooKeeper failure } catch (RuntimeException e) { graphTaskManager.zooKeeperCleanup(); graphTaskManager.workerFailureCleanup(); } }
2. org.apache.giraph.graph.GraphTaskManager 类
功能:The Giraph-specific business logic for a single BSP compute node in whatever underlying type of cluster our Giraph job will run on. Owning object will provide the glue into the underlying cluster framework and will call this object to perform Giraph work.
下面讲述setup()方法,代码如下。
/** * Called by owner of this GraphTaskManager on each compute node * @param zkPathList the path to the ZK jars we need to run the job */ public void setup(Path[] zkPathList) throws IOException, InterruptedException { context.setStatus("setup: Initializing Zookeeper services."); locateZookeeperClasspath(zkPathList); serverPortList = conf.getZookeeperList(); if (serverPortList == null && startZooKeeperManager()) { return; // ZK connect/startup failed } if (zkManager != null && zkManager.runsZooKeeper()) { LOG.info("setup: Chosen to run ZooKeeper..."); } context.setStatus("setup: Connected to Zookeeper service " +serverPortList); this.graphFunctions = determineGraphFunctions(conf, zkManager); instantiateBspService(serverPortList, sessionMsecTimeout); }
1) locateZookeeperClasspath(zkPathList):找到ZK jar的本地副本,其路径为:/home/hadoop/hadooptmp/mapred/local/taskTracker/root/jobcache/job_201403270456_0001/jars/job.jar ,用于启动ZooKeeper服务。
2) startZooKeeperManager(),初始化和配置ZooKeeperManager。定义如下,
/** * Instantiate and configure ZooKeeperManager for this job. This will * result in a Giraph-owned Zookeeper instance, a connection to an * existing quorum as specified in the job configuration, or task failure * @return true if this task should terminate */ private boolean startZooKeeperManager() throws IOException, InterruptedException { zkManager = new ZooKeeperManager(context, conf); context.setStatus("setup: Setting up Zookeeper manager."); zkManager.setup(); if (zkManager.computationDone()) { done = true; return true; } zkManager.onlineZooKeeperServers(); serverPortList = zkManager.getZooKeeperServerPortString(); return false; }
org.apache.giraph.zk.ZooKeeperManager 类,功能:Manages the election of ZooKeeper servers, starting/stopping the services, etc.
ZooKeeperManager类的setup()定义如下:
/** * Create the candidate stamps and decide on the servers to start if * you are partition 0. */ public void setup() throws IOException, InterruptedException { createCandidateStamp(); getZooKeeperServerList(); }
运行时指定了5个workers(-w 5),再加上一个master,所有上面有6个task。
getZooKeeperServerList()方法中,taskPartition为0的task会调用createZooKeeperServerList()方法创建ZooKeeper server List,也是创建一个空文件,通过文件名来描述Zookeeper servers。
createZooKeeperServerList核心代码如下:
/** * Task 0 will call this to create the ZooKeeper server list. The result is * a file that describes the ZooKeeper servers through the filename. */ private void createZooKeeperServerList() throws IOException, InterruptedException { Map<String, Integer> hostnameTaskMap = Maps.newTreeMap(); while (true) { FileStatus [] fileStatusArray = fs.listStatus(taskDirectory); hostnameTaskMap.clear(); if (fileStatusArray.length > 0) { for (FileStatus fileStatus : fileStatusArray) { String[] hostnameTaskArray = fileStatus.getPath().getName().split(HOSTNAME_TASK_SEPARATOR); if (!hostnameTaskMap.containsKey(hostnameTaskArray[0])) { hostnameTaskMap.put(hostnameTaskArray[0], new Integer(hostnameTaskArray[1])); } } if (hostnameTaskMap.size() >= serverCount) { break; } Thread.sleep(pollMsecs); } } }
经过多次测试,task 0总是被选为ZooKeeper Server ,因为在同一进程中,扫描taskDirectory时,只有它对应的task 文件(其他task的文件还没有生成好),然后退出for循环,发现hostNameTaskMap的size等于1,直接退出while循环。那么此处就选了test162 0。
最后,创建了文件:_bsp/_defaultZkManagerDir/job_201403301409_0006/zkServerList_test162 0
onlineZooKeeperServers(),根据zkServerList_test162 0文件,Task 0 先生成zoo.cfg配置文件,使用ProcessBuilder来创建ZooKeeper服务进程,然后Task 0 再通过socket连接到ZooKeeper服务进程上,最后创建文件 _bsp/_defaultZkManagerDir/job_201403301409_0006/_zkServer/test162 0 来标记master任务已完成。worker一直在进行循环检测master是否生成好 _bsp/_defaultZkManagerDir/job_201403301409_0006/_zkServer/test162 0,即worker等待直到master上的ZooKeeper服务已经启动完成。
启动ZooKeeper服务的命令如下:
3) determineGraphFunctions()。
GraphTaskManager类中有CentralizedServiceMaster对象和CentralizedServiceWorker 对象,分别对应于master和worker。每个BSP compute node扮演的角色判定逻辑如下:
a) If not split master, everyone does the everything and/or running ZooKeeper.
b) If split master/worker, masters also run ZooKeeper
c) If split master/worker == true and giraph.zkList is set, the master will not instantiate a ZK instance, but will assume a quorum is already active on the cluster for Giraph to use.
该判定在GraphTaskManager 类中的静态方法determineGraphFunctions()中定义,片段代码如下:
private static GraphFunctions determineGraphFunctions( ImmutableClassesGiraphConfiguration conf, ZooKeeperManager zkManager) { // What functions should this mapper do? if (!splitMasterWorker) { if ((zkManager != null) && zkManager.runsZooKeeper()) { functions = GraphFunctions.ALL; } else { functions = GraphFunctions.ALL_EXCEPT_ZOOKEEPER; } } else { if (zkAlreadyProvided) { int masterCount = conf.getZooKeeperServerCount(); if (taskPartition < masterCount) { functions = GraphFunctions.MASTER_ONLY; } else { functions = GraphFunctions.WORKER_ONLY; } } else { if ((zkManager != null) && zkManager.runsZooKeeper()) { functions = GraphFunctions.MASTER_ZOOKEEPER_ONLY; } else { functions = GraphFunctions.WORKER_ONLY; } } } return functions; }
默认的,Giraph会区分master和worker。会在master上面启动zookeeper服务,不会在worker上启动ZooKeeper服务。那么Task 0 就是master+ZooKeeper,其他Tasks就是workers。

ホットAIツール

Undresser.AI Undress
リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover
写真から衣服を削除するオンライン AI ツール。

Undress AI Tool
脱衣画像を無料で

Clothoff.io
AI衣類リムーバー

AI Hentai Generator
AIヘンタイを無料で生成します。

人気の記事

ホットツール

メモ帳++7.3.1
使いやすく無料のコードエディター

SublimeText3 中国語版
中国語版、とても使いやすい

ゼンドスタジオ 13.0.1
強力な PHP 統合開発環境

ドリームウィーバー CS6
ビジュアル Web 開発ツール

SublimeText3 Mac版
神レベルのコード編集ソフト(SublimeText3)

ホットトピック









WPS は、ドキュメント、フォーム、PPT など、非常に広く使用されているオフィス ソフトウェアであり、複数端末の同期をサポートしています。 wps編集時に「このオブジェクトのソースアプリケーションを起動できません」というプロンプトが表示された場合、どうすれば解決できますか?この問題は、リンクまたはファイルを開こうとしているが、そのソース アプリケーションが存在しないか削除されているために発生することがあります。解決策は次のとおりです。 1. WPS ソフトウェアを再インストールします。WPSOffice を再インストールして問題を解決し、最新バージョンを使用していることを確認してください。 2. デフォルトのプログラムを手動で変更する: デフォルトのプログラムを WPS に変更してみてください。開きたいファイルを右クリックし、[プログラムから開く] を選択してください。

壁紙エンジンが起動するとき、4 つの異なるオプションがあります。多くのユーザーは、壁紙エンジンを起動するときにどれを選択すればよいかわかりません。通常、壁紙エンジンが起動するときは、最初の 32 ビットを選択します。壁紙エンジンを開始するときにどれを選択しますか? 回答: 32 ビットを開始します。 1. 通常、壁紙エンジンが起動するときは、最初の「32 ビットの開始」を選択します。 2. 壁紙エンジンが開始されるとき、4 つの異なるオプションがあります: 32 ビットで開始、64 ビットで開始。 3. 32 ビットを開始: これは一般的に推奨されるオプションであり、ほとんどのユーザーに適しています。 4. 64 ビットの開始: システムが 64 ビットをサポートしている場合は、このオプションを選択できます。

テクノロジーが発展し続けるにつれて、さまざまなオペレーティング システムを使用する必要性がますます一般的になってきています。 Apple ユーザーの場合、macOS と Windows などの 2 つの異なるオペレーティング システムを 1 つのデバイスにインストールして使用する必要がある場合があります。この場合、特に二重化システムの起動シーケンスの設定が重要です。この記事では、Apple デバイスの電源を入れたときに最初にデュアル システムが起動するように設定する方法を紹介します。まず、両方のオペレーティング システムが Apple デバイスに正常にインストールされていることを確認する必要があります。この Apple では BootCamp を使用できます

Linux でサービスを再起動する正しい方法は何ですか? Linux システムを使用していると、特定のサービスを再起動する必要がある状況がよく発生しますが、サービスの再起動時に実際にサービスが停止しない、または開始しないなどの問題が発生することがあります。したがって、サービスを再起動する正しい方法を習得することが非常に重要です。 Linux では、通常、systemctl コマンドを使用してシステム サービスを管理できます。 systemctl コマンドは systemd システム マネージャーの一部です

タイトル: Ubuntu で PHP サービスが正常に起動できない問題を解決する方法と具体的なコード例 Ubuntu を使用して Web サイトやアプリケーションを構築する場合、PHP サービスが正常に起動できず、Web サイトが起動できないという問題がよく発生します。正常にアクセスできなくなるか、アプリケーションが正常に機能できなくなります。この記事では、Ubuntu で PHP サービスが正常に開始できない問題を解決する方法を紹介し、読者がそのような問題をすぐに解決できるように具体的なコード例を示します。 1. PHP 設定ファイルを確認する まず、PHP 設定ファイルを確認する必要があります。

PHPコードのソースコードを解釈・実行せずにブラウザ上に表示するにはどうすればよいでしょうか? PHP は、動的 Web ページの開発に一般的に使用されるサーバー側スクリプト言語です。サーバー上で PHP ファイルが要求されると、サーバーはそのファイル内の PHP コードを解釈して実行し、最終的な HTML コンテンツを表示のためにブラウザーに送信します。ただし、PHP ファイルのソース コードを実行するのではなく、ブラウザーに直接表示したい場合があります。この記事では、PHPコードのソースコードを解釈・実行せずにブラウザ上に表示する方法を紹介します。 PHPでは、次のように使用できます

タイトル: DreamWeaver CMS のセカンダリディレクトリを開けない原因と解決策の分析 Dreamweaver CMS (DedeCMS) は、さまざまな Web サイトの構築に広く使用されている強力なオープンソースのコンテンツ管理システムです。ただし、Web サイトの構築中に、セカンダリ ディレクトリを開けない状況が発生し、Web サイトの通常の動作に問題が発生することがあります。この記事では、セカンダリ ディレクトリを開けない考えられる理由を分析し、この問題を解決するための具体的なコード例を示します。 1. 考えられる原因分析: 疑似静的ルール構成の問題: 使用中

Linux でサービス再起動コマンドを実行するには、通常、Systemd サービス マネージャーを使用する必要があります。 Systemd は、Linux 上で広く使用されているサービス管理ツールであり、システム サービスを簡単に管理および制御できます。以下では、Linux で Systemd を介してサービス再起動コマンドを実行する方法と、具体的なコード例を紹介します。手順1:サービス名の確認 サービス再起動コマンドを実行する前に、再起動するサービス名を確認する必要があります。次のコマンドを使用して、システム上で実行されているサービスのリストを表示できます。
