我将spark搭建在两台机器上,其中一台即是master又是slave,另一台是slave,两台机器上均装有独立的mongodb数据库。我的主程序让它们统计自身数据库的内容,然后将结果汇总到一台服务器上的数据库里。目前代码是在master节点上提交的。但是我spark-submit之后,好像只统计master节点上的mongodb里的数据了,另一个worker节点没有统计上。请问这是什么原因?代码如下:
val conf = new SparkConf().setAppName("Scala Word Count")
val sc = new SparkContext(conf)
val config = new Configuration()
//以下代码表示只统计本机数据库上的数据,猜测问题可能出在这里
config.set("mongo.input.uri", "mongodb://127.0.0.1:27017/local.test")
//统计结果输出到服务器上
config.set("mongo.output.uri", "mongodb://103.25.23.80:60013/test_hao.result")
val mongoRDD = sc.newAPIHadoopRDD(config, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject])
// Input contains tuples of (ObjectId, BSONObject)
val countsRDD = mongoRDD.flatMap(arg => {
var str = arg._2.get("type").toString
str = str.toLowerCase().replaceAll("[.,!?\n]", " ")
str.split(" ")
})
.map(word => (word, 1))
.reduceByKey((a, b) => a + b)
// Output contains tuples of (null, BSONObject) - ObjectId will be generated by Mongo driver if null
val saveRDD = countsRDD.map((tuple) => {
var bson = new BasicBSONObject()
bson.put("word", tuple._1)
bson.put("count", tuple._2.toString() )
(null, bson)
})
// Only MongoOutputFormat and config are relevant
saveRDD.saveAsNewAPIHadoopFile("file:///bogus", classOf[Any], classOf[Any], classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]], config)
Posez et répondez à vos propres questions. La raison peut être la suivante :
Cette ligne de code signifie que le pilote lit la base de données puis charge les données qualifiées dans le RDD Puisque le paramètre précédent devait utiliser 127.0.0.1 comme entrée, c'est-à-dire que les données sont lues à partir du mongodb du pilote. Puisque le driver est sur le maître, les données lues sont naturellement les données sur le maître.