Hadoop Map/Reduce是一个使用简易的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错的方式并行处理上T级别的数据集。WordCount.java
package org.myorg;//包含在包myorg中
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class WordCount {
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {//定义映射map类
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();//定义对象word用来包含需要统计的字符串
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
//定义具体的映射方法,传入参数key,value(要统计的字符串),输出参数output(每个单词出现的个数统计映射方式),reporter
String line = value.toString();//一行一行的读入传入的字符串
StringTokenizer tokenizer = new StringTokenizer(line);//以空格分隔符将一行分为若干tokens
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
output.collect(word, one);//统计相同单词出现的次数
}
}
}
public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
//定义reduce类,将每个key(本例中就是单词)出现的次数求和
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
//定义具体的reduce方法,将每次映射中对应每个单词出现的次数进行统计求和,输入参数有key,values,输出参数有output ,reporter
int sum = 0;//初始化总数
while (values.hasNext()) {//将两次映射中对应的次数进行求和
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));//输出output得到最终结果
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(WordCount.class);//定义JobCon的配置变量conf,代表一个Map/Reduce作业的配置。
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text.class);//框架随后会把与一个特定key关联的所有中间过程的值(value)分成组,然后把它们传给Reducer以产出最终的结果。用户可以通过 JobConf.setOutputKeyComparatorClass(Class)来指定具体负责分组的 Comparator。
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);//Mapper已经排好序的输出
conf.setCombinerClass(Reduce.class);//用户可选择通过 JobConf.setCombinerClass(Class)指定一个combiner,它负责对中间过程的输出进行本地的聚集,这会有助于降低从Mapper到 Reducer数据传输量。
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));//定义文件的输入格式,指定一组输入文件
FileOutputFormat.setOutputPath(conf, new Path(args[1]));//定义文件的输出格式,输出文件应该写在哪儿
JobClient.runJob(conf);//提交作业并且监控它的执行
}
}
package org.myorg;
import java.io.*;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class WordCount extends Configured implements Tool {//定义单词统计WordCount类
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text,
Text,IntWritable>{//定义映射map类
static enum Counters{ INPUT_WORDS }
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();//定义对象word用来包含需要统计的字符串
//初始化每个统计变量
private boolean caseSensitive = true;
private Set<string> patternsToSkip = new HashSet<String>();
private long numRecords = 0;
private String inputFile;
public void configure(JobConf job) {//修改配置参数
caseSensitive = job.getBoolean("wordcount.case.sensitive", true);
inputFile = job.get("map.input.file");
if (job.getBoolean("wordcount.skip.patterns", false)){
path[] patternsFiles = new Path[0];
try{
patternsFiles = DistributedCache.getLocalCacheFiles(job);//使用DistributedCache 来分发只读数据。 这里允许用户指定单词的模式,在计数时忽略那些符合模式的单词
} catch (IOException ioe){
System.err.println("Caught exception while getting cached files: " +
StringUtils.stringifyException(ioe));
}
for (Path patternsFile : patternsFiles){
parseSkipFile(patternsFile);
}
}
}
private void parseSkipFile(Path patternsFile) {//从文件中读入字符串
try {
BufferedReader fis = new BufferedRdader(new FileReader(patternsFile.toString()));//读入数据
String pattern = null;
while ((pattern = fis.readLine()) != null){
patternsToSkip.add(pattern);//将读入数据保存到patternsToSkip
}
}catch (IOException ioe){
System.err.println("Caught exception while parsing the cached file '" +
patternsFile + "' :" + StringUtils.stringifyException(ioe));
}
}
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable>
output, Reporter reporter) throws IOException {
String line = (caseSensitive) ? value.toString() : value.toString().toLowerCase();//一行一行的读入传入的字符串
for (String pattern : patternsToSkip) {
line = line.replaceAll(pattern, "");
}
StringTokenizer tokenizer = new StringTokenizer(line);//以空格分隔符将一行分为若干tokens
while (tokenizer.hasMoreTokens()) {//统计相同单词出现的次数
word.set(tokenizer.nextToken());
output.collect(word, one);//统计相同单词出现的次数
reporter.incrCounter(Counters.INPUT_WORDS, 1);//遇到相同的进行+1操作
}
if((++numRecords % 100) == 0) {//打印最后的统计结果
reporter.setStatus("Finished processing "+ numRecords + " records" + "from the input file: "+ inputFile);
}
}
}
public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable,
Text, IntWritable> {//定义映射之间总数的统计
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {////定义具体的reduce方法,将每次映射中对应每个单词出现的次数进行统计求和,输入参数有key,values,输出参数有output
int sum = 0;//初始化总数
while (values.hasNext()) {//将两次映射中对应的次数进行求和
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));//输出output得到最终结果
}
}
public int run(String[] args) throws Exception {//执行统计方法run
JobConf conf = new JobConf(getConf(), WordCount.class);//定义JobCon的配置变量conf,代表一个Map/Reduce作业的配置。
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text.class);//框架随后会把与一个特定key关联的所有中间过程的值(value)分成组,然后把它们传给Reducer以产出最终的结果。用户可以通过 JobConf.setOutputKeyComparatorClass(Class)来指定具体负责分组的 Comparator。
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);;//Mapper已经排好序的输出
conf.setCombinerClass(Reduce.class);//用户可选择通过 JobConf.setCombinerClass(Class)指定一个combiner,它负责对中间过程的输出进行本地的聚集,这会有助于降低从Mapper到 Reducer数据传输量。
conf.setReducerClass(Reduce.class);
conf.setIntputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutpotFormat.class);
List<String> other_args = new ArrayList<String>();
for (int i=0; i < args.length; ++i) {
if ("-skip".equals(args[i])) {
DistributedCache.addCacheFile(new path(args[++i]).toUri(), conf);//使用DistributedCache 来分发只读数据。 这里允许用户指定单词的模式,在计数时忽略那些符合模式的单词
conf.setBoolean("wordcount.skip.patterns", true);
} else{
other_args.add(args[i]);
}
}
FileInputFormat.setInputPaths(conf, new Path(other_args.get(0)));//定义文件的输入格式,指定一组输入文件
FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));//定义文件的输出格式,输出文件应该写在哪儿
JobClient.runJob(conf);//提交作业并且监控它的执行
return 0;
}
public static void main(String[] args) throws Exception{
int res = ToolRunner.run(new Configuration(), new WordCount(), args);//执行wordcount统计
System.exit(res);
}
}
分享到:
相关推荐
Hadoop Map Reduce教程,介绍hadoop map/reduce框架的各个方面
通常,hadoop Map/Reduce框架和分布式文件系统是运行在一组相同的节点上的,也就是说,计算节点和存储节点通常在一起。这种配置允许框架在那些已经存好数据的节点上高效地调度任务,这可以使整个集群的网络带宽被...
这篇教程从用户的角度出发,全面地介绍了Hadoop Map/Reduce框架的各个方面。
Hadoop MapReduce 教程 这篇教程从用户的角度出发,全面地介绍了Hadoop Map/Reduce框架的各个方面
这是一篇关于分布式计算的毕业设计论文,内容摘要:分布式式计算,同样是一个宽泛的概念,在这里,它狭义的指代,按Google Map/Reduce框架所设计的分布式框架。在Hadoop中,分布式文件系统,很大程度上,是为各种...
hadoop分布式框架使用手册,分布讲解分布式存储和计算框架,map/reduce的计算框架在对于一次存入多次读取的场景很是适用
其中Hadoop 是应用较多的分布式存储和计算框架之一。 本文在该平台下,通过对国内某搜索引擎两个月内的上千万条用户搜索日志进行数据统计分 析,给出相应Map/Reduce 程序的设计思路和实例,并提出Map/Reduce 分布式...
Spark 提供了与Hadoop Map/Reduce 相似的分布式计 算框架,但却有基于内存和迭代优化的设计,因此在交互式数据分析和数据挖掘工作负载中表现更优秀。 随着对大数据技术研究的深入,Spark开源生态系统得到了快速发展...
Hadoop, Apache开源的分布式框架。源自Google GFS,BigTable,MapReduce 论文。 == HDFS == HDFS (Hadoop Distributed File System),Hadoop 分布式文件系统。 NameNode,HDFS命名服务器,负责与DataNode文件元信息保存。...
mapreduce 编程 此示例程序将让您提取有用的统计数据,例如排名前 10 的平均评分电影、使用 Hadoop map-reduce 框架以及链接多个映射器和化简器对 200 万条记录进行基于流派的过滤
Hadoop映射减少字计数器 61C(计算机体系结构)的类项目,该项目使用Hadoop Map Reduce框架对大型文本文档中的单词频率进行计数。
HadoopMap/Reduce是一个使用简易的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错的方式并行处理上T级别的数据集。一个Map/Reduce作业(job)通常会把输入的数据集...
采用Apache公司的Hadoop Map-Reduce框架来实现数据收集功能,并通过实验,将数据收集工作在传统的单线程模式(传统实现模式)、Hadoop伪分布模式和全分布模式下所需时间进行比较,并对执行结果进行了分析。...
Hadoop本身,实现的是分布式的文件系统HDFS,和分布式的计算(Map/Reduce)框架,此外,它还不是一个人在战斗,Hadoop包含一系列扩展项目,包括了分布式文件数据库HBase(对应Google的BigTable),分布式协同服务...
改进了多特征点图像特征提取和匹配算法,并基于Map/Reduce框架实现了基于多特征的服饰图像数据分布式检索。实验结果表明,该方法能够均衡系统负载,提高资源利用率,扩展性强,有效地降低了海量服饰图像检索时间,是...
4.JobTracker,hadoop的Map/Reduce调度器,负责与TackTracker通信分配计算任务并跟踪任务进度。5.DataNode,hadoop数据节点,负责存储数据。6.TaskTracker,hadoop调度程序,负责Map,Reduce 任务的具体启动和执行。7....
* 程序利用Eclipse EE在Hadoop平台下,使用Map/Reduce编程框架,将传统的C4.5决策树算法并行化; * 该部分属于本科毕业设计中,并行随机森林算法的核心部分; * Hadoop的搭建主要参考给力星的博客( ...
该项目使用 Apache Map Reduce 框架分析电影。