php的工作就是把每個字搞成一個RDD
<code><span><span><?php</span><span>$in</span> = fopen(<span>'php://stdin'</span>,<span>'r'</span>); <span>while</span>(!feof(<span>$in</span>)) { <span>$temp</span> = explode(<span>" "</span>,fgets(<span>$in</span>)); <span>for</span> (<span>$i</span>=<span>0</span>;<span>$i</span><count(<span>$temp</span>);<span>$i</span>++){ printf(<span>"%s\n"</span>,<span>$temp</span>[<span>$i</span>]); } } <span>?></span></span></code>
<code>package test import org<span>.apache</span><span>.spark</span><span>.SparkConf</span> import org<span>.apache</span><span>.spark</span><span>.SparkContext</span>object PipeTest { def main(args: Array[String]) { val sparkConf = new SparkConf()<span>.setAppName</span>(<span>"pipe Test"</span>) val sc = new SparkContext(sparkConf) val a = sc<span>.textFile</span>(<span>"/home/gt/wordcount.txt"</span>, <span>3</span>) val result = a<span>.pipe</span>(<span>"php /home/gt/spark/bin/test.php"</span>)<span>.map</span>(<span>x</span> => (<span>x</span>, <span>1</span>))<span>.reduceByKey</span>(_ + _) result<span>.foreach</span> { <span>x</span> => println(<span>"!!!!! "</span> + <span>x</span>) } sc<span>.stop</span>() } }</code>
以上就介紹了Spark pipe + PHP 的 wordcount 實現,包含了方面的內容,希望對PHP教學有興趣的朋友有幫助。