MongDB的MapReduce相当于MySQL中的group by,所以在MongoDB上使用Map/Reduce进行并行统计很容易。 使用MapReduce要实现两个函数Map函数和Reduce函数,Map函数调用emit(key,value),遍历collection中的所有记录,将key和value传递给Reduce函数进行处理。Map函
MongDB的MapReduce相当于MySQL中的“group by”,所以在MongoDB上使用Map/Reduce进行并行“统计”很容易。
使用MapReduce要实现两个函数Map函数和Reduce函数,Map函数调用emit(key,value),遍历collection中的所有记录,将key和value传递给Reduce函数进行处理。Map函数和Reduce函数可以使用JS来实现,可以通过db.runCommand或mapReduce命令来执行一个MapReduce操作。
示例shell
db.runCommand( { mapreduce : <collection>, map : <mapfunction>, reduce : <reducefunction> [, query : <query filter object>] [, sort : <sorts the input objects using this key. Useful for optimization, like sorting by the emit key for fewer reduces>] [, limit : <number of objects to return from collection>] [, out : <see output options below>] [, keeptemp: <true|false>] [, finalize : <finalizefunction>] [, scope : <object where fields go into javascript global scope >] [, verbose : true] } );
下面我们准备数据以备后面示例所需
> db.students.insert({classid:1, age:14, name:'Tom'}) > db.students.insert({classid:1, age:12, name:'Jacky'}) > db.students.insert({classid:2, age:16, name:'Lily'}) > db.students.insert({classid:2, age:9, name:'Tony'}) > db.students.insert({classid:2, age:19, name:'Harry'}) > db.students.insert({classid:2, age:13, name:'Vincent'}) > db.students.insert({classid:1, age:14, name:'Bill'}) > db.students.insert({classid:2, age:17, name:'Bruce'}) >
Map 函数必须调用 emit(key, value) 返回键值对,使用 this 访问当前待处理的 Document。
这里this一定不能忘了!!!
> m = function() { emit(this.classid, 1) } function () { emit(this.classid, 1); } >
> r = function(key, values) { ... var x = 0; ... values.forEach(function(v) { x += v }); ... return x; ... } function (key, values) { var x = 0; values.forEach(function (v) {x += v;}); return x; } >
结果如下:
> res = db.runCommand({ ... mapreduce:"students", ... map:m, ... reduce:r, ... out:"students_res" ... }); { "result" : "students_res", "timeMillis" : 1587, "counts" : { "input" : 8, "emit" : 8, "output" : 2 }, "ok" : 1 } > db.students_res.find() { "_id" : 1, "value" : 3 } { "_id" : 2, "value" : 5 } >
利用 finalize() 我们可以对 reduce() 的结果做进一步处理。
> f = function(key, value) { return {classid:key, count:value}; } function (key, value) { return {classid:key, count:value}; } >
> res = db.runCommand({ ... mapreduce:"students", ... map:m, ... reduce:r, ... out:"students_res", ... finalize:f ... }); { "result" : "students_res", "timeMillis" : 804, "counts" : { "input" : 8, "emit" : 8, "output" : 2 }, "ok" : 1 } > db.students_res.find() { "_id" : 1, "value" : { "classid" : 1, "count" : 3 } } { "_id" : 2, "value" : { "classid" : 2, "count" : 5 } } >
我们还可以添加更多的控制细节。
> res = db.runCommand({ ... mapreduce:"students", ... map:m, ... reduce:r, ... out:"students_res", ... finalize:f, ... query:{age:{$lt:10}} ... }); { "result" : "students_res", "timeMillis" : 358, "counts" : { "input" : 1, "emit" : 1, "output" : 1 }, "ok" : 1 } > db.students_res.find(); { "_id" : 2, "value" : { "classid" : 2, "count" : 1 } } >