Giraph源码分析启动ZooKeeper服务
说明: (1) 实验环境. 三台服务器:test165、test62、test63。test165同时是JobTracker和TaskTracker. 测试例子:官网自带的SSSP程序,数据是自己模拟生成。 运行命令:hadoop jar giraph-examples-1.0.0-for-hadoop-0.20.203.0-jar-with-dependencies.jar o
说明:
(1) 实验环境.
三台服务器:test165、test62、test63。test165同时是JobTracker和TaskTracker.
测试例子:官网自带的SSSP程序,数据是自己模拟生成。
运行命令:hadoop jar giraph-examples-1.0.0-for-hadoop-0.20.203.0-jar-with-dependencies.jar org.apache.giraph.GiraphRunner org.apache.giraph.examples.SimpleShortestPathsVertex -vif org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexInputFormat -vip /user/giraph/SSSP -of org.apache.giraph.io.formats.IdWithValueTextOutputFormat -op /user/giraph/output-sssp-debug-7 -w 5
(2). 为节约空间,下文中所有代码均为核心代码片段。
(3). core-site.xml中hadoop.tmp.dir的路径设为:/home/hadoop/hadooptmp
(4).写本文是多次调试完成的,故文中的JobID不一样,读者可理解为同一JobID.
(5). 后续文章也遵循上述规则。
1. org.apache.giraph.graph.GraphMapper类
Giraph中自定义org.apache.giraph.graph.GraphMapper类来继承Hadoop中的 org.apache.hadoop.mapreduce.Mapper
This mapper that will execute the BSP graph tasks alloted to this worker. All tasks will be performed by calling the GraphTaskManager object managed by this GraphMapper wrapper classs. Since this mapper will not be passing data by key-value pairs through the MR framework, the Mapper parameter types are irrelevant, and set to Object type.
BSP的运算逻辑被封装在GraphMapper类中,其拥有一GraphTaskManager对象,用来管理Job的tasks。每个GraphMapper对象都相当于BSP中的一个计算节点(compute node)。
在GraphMapper类中的setup()方法中,创建GraphTaskManager对象并调用其setup()方法进行一些初始化工作。如下:
@Override public void setup(Context context) throws IOException, InterruptedException { // Execute all Giraph-related role(s) assigned to this compute node. // Roles can include "master," "worker," "zookeeper," or . . . ? graphTaskManager = new GraphTaskManager<I, V, E, M>(context); graphTaskManager.setup( DistributedCache.getLocalCacheArchives(context.getConfiguration())); }
@Override public void run(Context context) throws IOException, InterruptedException { // Notify the master quicker if there is worker failure rather than // waiting for ZooKeeper to timeout and delete the ephemeral znodes try { setup(context); while (context.nextKeyValue()) { graphTaskManager.execute(); } cleanup(context); // Checkstyle exception due to needing to dump ZooKeeper failure } catch (RuntimeException e) { graphTaskManager.zooKeeperCleanup(); graphTaskManager.workerFailureCleanup(); } }
2. org.apache.giraph.graph.GraphTaskManager 类
功能:The Giraph-specific business logic for a single BSP compute node in whatever underlying type of cluster our Giraph job will run on. Owning object will provide the glue into the underlying cluster framework and will call this object to perform Giraph work.
下面讲述setup()方法,代码如下。
/** * Called by owner of this GraphTaskManager on each compute node * @param zkPathList the path to the ZK jars we need to run the job */ public void setup(Path[] zkPathList) throws IOException, InterruptedException { context.setStatus("setup: Initializing Zookeeper services."); locateZookeeperClasspath(zkPathList); serverPortList = conf.getZookeeperList(); if (serverPortList == null && startZooKeeperManager()) { return; // ZK connect/startup failed } if (zkManager != null && zkManager.runsZooKeeper()) { LOG.info("setup: Chosen to run ZooKeeper..."); } context.setStatus("setup: Connected to Zookeeper service " +serverPortList); this.graphFunctions = determineGraphFunctions(conf, zkManager); instantiateBspService(serverPortList, sessionMsecTimeout); }
1) locateZookeeperClasspath(zkPathList):找到ZK jar的本地副本,其路径为:/home/hadoop/hadooptmp/mapred/local/taskTracker/root/jobcache/job_201403270456_0001/jars/job.jar ,用于启动ZooKeeper服务。
2) startZooKeeperManager(),初始化和配置ZooKeeperManager。定义如下,
/** * Instantiate and configure ZooKeeperManager for this job. This will * result in a Giraph-owned Zookeeper instance, a connection to an * existing quorum as specified in the job configuration, or task failure * @return true if this task should terminate */ private boolean startZooKeeperManager() throws IOException, InterruptedException { zkManager = new ZooKeeperManager(context, conf); context.setStatus("setup: Setting up Zookeeper manager."); zkManager.setup(); if (zkManager.computationDone()) { done = true; return true; } zkManager.onlineZooKeeperServers(); serverPortList = zkManager.getZooKeeperServerPortString(); return false; }
org.apache.giraph.zk.ZooKeeperManager 类,功能:Manages the election of ZooKeeper servers, starting/stopping the services, etc.
ZooKeeperManager类的setup()定义如下:
/** * Create the candidate stamps and decide on the servers to start if * you are partition 0. */ public void setup() throws IOException, InterruptedException { createCandidateStamp(); getZooKeeperServerList(); }
运行时指定了5个workers(-w 5),再加上一个master,所有上面有6个task。
getZooKeeperServerList()方法中,taskPartition为0的task会调用createZooKeeperServerList()方法创建ZooKeeper server List,也是创建一个空文件,通过文件名来描述Zookeeper servers。
createZooKeeperServerList核心代码如下:
/** * Task 0 will call this to create the ZooKeeper server list. The result is * a file that describes the ZooKeeper servers through the filename. */ private void createZooKeeperServerList() throws IOException, InterruptedException { Map<String, Integer> hostnameTaskMap = Maps.newTreeMap(); while (true) { FileStatus [] fileStatusArray = fs.listStatus(taskDirectory); hostnameTaskMap.clear(); if (fileStatusArray.length > 0) { for (FileStatus fileStatus : fileStatusArray) { String[] hostnameTaskArray = fileStatus.getPath().getName().split(HOSTNAME_TASK_SEPARATOR); if (!hostnameTaskMap.containsKey(hostnameTaskArray[0])) { hostnameTaskMap.put(hostnameTaskArray[0], new Integer(hostnameTaskArray[1])); } } if (hostnameTaskMap.size() >= serverCount) { break; } Thread.sleep(pollMsecs); } } }
经过多次测试,task 0总是被选为ZooKeeper Server ,因为在同一进程中,扫描taskDirectory时,只有它对应的task 文件(其他task的文件还没有生成好),然后退出for循环,发现hostNameTaskMap的size等于1,直接退出while循环。那么此处就选了test162 0。
最后,创建了文件:_bsp/_defaultZkManagerDir/job_201403301409_0006/zkServerList_test162 0
onlineZooKeeperServers(),根据zkServerList_test162 0文件,Task 0 先生成zoo.cfg配置文件,使用ProcessBuilder来创建ZooKeeper服务进程,然后Task 0 再通过socket连接到ZooKeeper服务进程上,最后创建文件 _bsp/_defaultZkManagerDir/job_201403301409_0006/_zkServer/test162 0 来标记master任务已完成。worker一直在进行循环检测master是否生成好 _bsp/_defaultZkManagerDir/job_201403301409_0006/_zkServer/test162 0,即worker等待直到master上的ZooKeeper服务已经启动完成。
启动ZooKeeper服务的命令如下:
3) determineGraphFunctions()。
GraphTaskManager类中有CentralizedServiceMaster对象和CentralizedServiceWorker 对象,分别对应于master和worker。每个BSP compute node扮演的角色判定逻辑如下:
a) If not split master, everyone does the everything and/or running ZooKeeper.
b) If split master/worker, masters also run ZooKeeper
c) If split master/worker == true and giraph.zkList is set, the master will not instantiate a ZK instance, but will assume a quorum is already active on the cluster for Giraph to use.
该判定在GraphTaskManager 类中的静态方法determineGraphFunctions()中定义,片段代码如下:
private static GraphFunctions determineGraphFunctions( ImmutableClassesGiraphConfiguration conf, ZooKeeperManager zkManager) { // What functions should this mapper do? if (!splitMasterWorker) { if ((zkManager != null) && zkManager.runsZooKeeper()) { functions = GraphFunctions.ALL; } else { functions = GraphFunctions.ALL_EXCEPT_ZOOKEEPER; } } else { if (zkAlreadyProvided) { int masterCount = conf.getZooKeeperServerCount(); if (taskPartition < masterCount) { functions = GraphFunctions.MASTER_ONLY; } else { functions = GraphFunctions.WORKER_ONLY; } } else { if ((zkManager != null) && zkManager.runsZooKeeper()) { functions = GraphFunctions.MASTER_ZOOKEEPER_ONLY; } else { functions = GraphFunctions.WORKER_ONLY; } } } return functions; }
默认的,Giraph会区分master和worker。会在master上面启动zookeeper服务,不会在worker上启动ZooKeeper服务。那么Task 0 就是master+ZooKeeper,其他Tasks就是workers。

Alat AI Hot

Undresser.AI Undress
Apl berkuasa AI untuk mencipta foto bogel yang realistik

AI Clothes Remover
Alat AI dalam talian untuk mengeluarkan pakaian daripada foto.

Undress AI Tool
Gambar buka pakaian secara percuma

Clothoff.io
Penyingkiran pakaian AI

AI Hentai Generator
Menjana ai hentai secara percuma.

Artikel Panas

Alat panas

Notepad++7.3.1
Editor kod yang mudah digunakan dan percuma

SublimeText3 versi Cina
Versi Cina, sangat mudah digunakan

Hantar Studio 13.0.1
Persekitaran pembangunan bersepadu PHP yang berkuasa

Dreamweaver CS6
Alat pembangunan web visual

SublimeText3 versi Mac
Perisian penyuntingan kod peringkat Tuhan (SublimeText3)

Topik panas



WPS ialah perisian pejabat yang digunakan secara meluas, termasuk dokumen, borang dan PPT, serta menyokong penyegerakan berbilang terminal. Jika gesaan "Aplikasi sumber untuk objek ini tidak boleh dilancarkan" muncul semasa mengedit wps, bagaimana untuk menyelesaikannya? Masalah ini mungkin berlaku kerana anda cuba membuka pautan atau fail, tetapi aplikasi sumbernya tidak lagi wujud atau telah dipadamkan. Berikut ialah beberapa pembetulan: 1. Pasang semula perisian WPS: Cuba pasang semula WPSOffice untuk menyelesaikan masalah dan pastikan anda menggunakan versi terkini. 2. Tukar program lalai secara manual: Cuba tukar program lalai kepada WPS Anda boleh klik kanan fail yang ingin anda buka, pilih "Buka dengan", dan kemudian

Apabila enjin kertas dinding dimulakan, terdapat 4 pilihan berbeza Ramai pengguna tidak tahu yang mana satu untuk dipilih semasa memulakan enjin kertas dinding Secara amnya, apabila enjin kertas dinding dimulakan, pilih yang pertama: mulakan 32-bit. Mana satu untuk dipilih semasa memulakan mesin kertas dinding? 1. Secara amnya, apabila enjin kertas dinding dimulakan, pilih yang pertama: mulakan 32-bit. 2. Apabila enjin kertas dinding dimulakan, terdapat 4 pilihan berbeza: mulakan 32-bit; 3. Mulakan 32-bit: Ini adalah pilihan umum yang disyorkan dan sesuai untuk kebanyakan pengguna. 4. Mulakan 64-bit: Jika sistem menyokong 64-bit, anda boleh memilih pilihan ini

Apabila teknologi terus berkembang, keperluan untuk menggunakan sistem pengendalian yang berbeza menjadi semakin biasa. Bagi pengguna Apple, kadangkala anda mungkin perlu memasang dan menggunakan dua sistem pengendalian berbeza pada satu peranti, seperti macOS dan Windows. Dalam kes ini, adalah amat penting untuk menetapkan urutan permulaan sistem dwi. Artikel ini akan memperkenalkan cara menyediakan peranti Apple untuk memulakan sistem dwi terlebih dahulu apabila menghidupkan peranti. Pertama, kita perlu memastikan bahawa kedua-dua sistem pengendalian telah berjaya dipasang pada peranti Apple. Anda boleh menggunakan BootCamp Apple ini

Apakah cara yang betul untuk memulakan semula perkhidmatan dalam Linux? Apabila menggunakan sistem Linux, kami sering menghadapi situasi di mana kami perlu memulakan semula perkhidmatan tertentu, tetapi kadangkala kami mungkin menghadapi beberapa masalah semasa memulakan semula perkhidmatan, seperti perkhidmatan tidak benar-benar berhenti atau bermula. Oleh itu, adalah sangat penting untuk menguasai cara yang betul untuk memulakan semula perkhidmatan. Di Linux, anda biasanya boleh menggunakan perintah systemctl untuk mengurus perkhidmatan sistem. Perintah systemctl adalah sebahagian daripada pengurus sistem systemd

Tajuk: Kaedah dan contoh kod khusus untuk menyelesaikan masalah yang perkhidmatan PHP tidak boleh dimulakan seperti biasa di bawah Ubuntu Apabila menggunakan Ubuntu untuk membina tapak web atau aplikasi, anda sering menghadapi masalah yang perkhidmatan PHP tidak dapat dimulakan secara normal, yang akan menyebabkan laman web tersebut. tidak dapat diakses secara normal atau aplikasi tidak dapat berfungsi dengan normal. Artikel ini akan memperkenalkan cara untuk menyelesaikan masalah yang perkhidmatan PHP tidak dapat dimulakan seperti biasa di bawah Ubuntu, dan memberikan contoh kod khusus untuk membantu pembaca menyelesaikan kegagalan tersebut dengan cepat. 1. Semak fail konfigurasi PHP Mula-mula, kita perlu menyemak fail konfigurasi PHP

Tajuk: Analisis sebab dan penyelesaian mengapa direktori sekunder DreamWeaver CMS tidak boleh dibuka Dreamweaver CMS (DedeCMS) ialah sistem pengurusan kandungan sumber terbuka yang berkuasa yang digunakan secara meluas dalam pembinaan pelbagai laman web. Walau bagaimanapun, kadangkala semasa proses membina tapak web, anda mungkin menghadapi situasi di mana direktori sekunder tidak boleh dibuka, yang membawa masalah kepada operasi biasa tapak web. Dalam artikel ini, kami akan menganalisis kemungkinan sebab mengapa direktori sekunder tidak boleh dibuka dan memberikan contoh kod khusus untuk menyelesaikan masalah ini. 1. Analisis sebab yang mungkin: Masalah konfigurasi peraturan pseudo-statik: Semasa penggunaan

Bagaimana untuk memaparkan kod sumber kod PHP dalam penyemak imbas tanpa ditafsir dan dilaksanakan? PHP ialah bahasa skrip sebelah pelayan yang biasa digunakan untuk membangunkan laman web dinamik. Apabila fail PHP diminta pada pelayan, pelayan mentafsir dan melaksanakan kod PHP di dalamnya dan menghantar kandungan HTML akhir kepada penyemak imbas untuk paparan. Walau bagaimanapun, kadangkala kami mahu memaparkan kod sumber fail PHP terus dalam penyemak imbas dan bukannya dilaksanakan. Artikel ini akan memperkenalkan cara untuk memaparkan kod sumber kod PHP dalam penyemak imbas tanpa ditafsir dan dilaksanakan. Dalam PHP, anda boleh menggunakan

Di Linux, untuk melaksanakan perintah restart perkhidmatan, anda biasanya perlu menggunakan pengurus perkhidmatan Systemd. Systemd ialah alat pengurusan perkhidmatan yang digunakan secara meluas di Linux, yang boleh mengurus dan mengawal perkhidmatan sistem dengan mudah. Berikut akan memperkenalkan cara untuk melaksanakan perintah restart perkhidmatan melalui Systemd dalam Linux dan memberikan contoh kod khusus. Langkah 1: Sahkan nama perkhidmatan Sebelum melaksanakan perintah mulakan semula perkhidmatan, anda perlu mengesahkan nama perkhidmatan untuk dimulakan semula. Anda boleh melihat senarai perkhidmatan yang berjalan pada sistem dengan arahan berikut:
