package com.fatkun;
?
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.StringTokenizer;
?
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
?
public
class
WordCount
extends
Configured
implements
Tool {
static
enum Counters {
INPUT_WORDS
}
?
static
Log logger = LogFactory.getLog(WordCount.
class
);
?
public
static
class
CountMapper
extends
Mapper {
private
final
IntWritable one =
new
IntWritable(1);
private
Text word =
new
Text();
private
boolean caseSensitive = true;
?
@Override
protected
void setup(Context context) throws IOException,
InterruptedException {
Configuration conf = context.getConfiguration();
caseSensitive = conf.getBoolean(
"wordcount.case.sensitive"
, true);
super.setup(context);
}
?
@Override
protected
void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr =
new
StringTokenizer(value.toString());
while
(itr.hasMoreTokens()) {
if
(caseSensitive) {
word.set(itr.nextToken());
}
else
{
word.set(itr.nextToken().toLowerCase());
}
context.write(word, one);
context.getCounter(Counters.INPUT_WORDS).increment(1);
}
}
}
?
public
static
class
CountReducer
extends
Reducer {
?
@Override
protected
void reduce(Text text, Iterable values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for
(IntWritable value : values) {
sum += value.get();
}
context.write(text,
new
IntWritable(sum));
}
?
}
?
@Override
public
int run(String[] args) throws Exception {
Configuration conf =
new
Configuration(getConf());
Job job = Job.getInstance(conf,
"Example Hadoop WordCount"
);
job.setJarByClass(WordCount.
class
);
job.setMapperClass(CountMapper.
class
);
job.setCombinerClass(CountReducer.
class
);
job.setReducerClass(CountReducer.
class
);
?
job.setOutputKeyClass(Text.
class
);
job.setOutputValueClass(IntWritable.
class
);
?
List other_args =
new
ArrayList();
for
(int i = 0; i
<h2 id=
"运行"
>运行</h2>
<p>在eclipse导出jar包,执行以下命令</p>
<pre
class
=
"brush:php;toolbar:false"
>hadoop jar wordcount.jar com.fatkun.WordCount -Dwordcount.
case
.sensitive=false /user/fatkun/input /user/fatkun/output