随着捕获的数据的数量每年增加,我们的存储也需要增加。很多公司正在认识到“数据为王”这一道理,但是我们如何分析这些数据呢?答案就是“通过Hadoop”。在本系列的第二篇文章中,java编程专家Steven Haines将会解释什么是MapReduce应用,以及如何构建一个简单的MapReduce应用。
本系列中的第一篇文章描述了Hadoop被设计用来解决的业务问题领域,以及给予它解决这些问题能力的内部架构。运行在Hadoop内的应用被称作为MapReduce应用,因此这部文章将演示如何构建一个简单的MapReduce应用。
采用MapReduce与Hadoop进行大数据分析 http://www.linuxidc.com/Linux/2013-07/87312.htm
搭建开发环境
在你能够使用Hadoop之前,你需要安装Java 6(或者更新版本),你可以从Oracle的网站上下载对应你的平台的版本。另外,如果你是运行在Windows上,由于Hadoop运行的正式开发和部署平台是Linux,所以你需要使用Cygwin来运行Hadoop。Mac OXS用户可以无问题地原生态运行Hadoop。
Hadoop可以从它的Releases页面下载,但是它的版本号结构解释起来具有一点儿挑战性。简而言之,1.x的代码分支包含当前的稳定发行版,2.x.x分支包含用于版本2的Hadoop的alpha代码,0.22.x的代码分支与2.x.x的相同,除了没有security,0.23的代码分支去除了高可用性(high availability)。0.20.x的代码分支是历史遗留问题,你可以忽略。对于本文中的例子,我将使用0.23.x代码分支,写这篇文章是该分支的最新版本是0.23.5,但是对于生产环境,你可能会想下载1.x版本或2.x.x版本。
下载并解压文件到你的本地机器上。如果你计划要做较长时间的Hadoop开发,你最好将解压的bin文件夹添加到你的PATH环境变量中。你可以通过执行bin目录中的hadoop命令来测试你的安装:
bin/hadoop
执行该命令而没有任何参数的话会显示以下输出:
Usage: hadoop [--config confdir] COMMAND
where COMMAND is one of:
fs run a generic filesystem user client
version print the version
jar <jar> run a jar file
distcp <srcurl> <desturl> copy file or directories recursively
archive -archiveName NAME -p <parent path> <src>* <dest> create a hadoop archive
classpath prints the class path needed to get the
Hadoop jar and the required libraries
daemonlog get/set the log level for each daemon
or
CLASSNAME run the class named CLASSNAME
Most commands print help when invoked w/o parameters.
虽然有很多命令可以传递给Hadoop,但是由于本文中我们将关注在开发环境中执行Hadoop应用程序,因此我们唯一感兴趣的是下面的命令:
hadoop jar <jar-file-name>
Hello,MapReduce
在任何编程语言中,你编写的第一段程序通常都是一个“Hello,World”程序。对Hadoop和MapReduce而言,每个人编写的标准程序是Word Count应用程序。Word Count应用程序统计在大量的文本中每个单词出现的次数。它是一个学习MapReduce的完美例子,因为它的mapping和reducing步骤很琐细,但却引导你采用MapReduce的方式思考。下面是对Word Count应用程序中各个组件及其功能的总结:
- FileInputFormat:我们定义一个FileInputFormat去读取指定目录下(作为第一个参数传递给MapReduce应用程序)的所有文件,并传递这些文件给一个TextInputFormat(见Listing 1)以便分发给我们的mappers。
- TextInputFormat:Hadoop默认的InputFormat是TextInputFormat,它每次读取一行,并把字节偏移作为key(LongWritable),将这行文本作为value(Text),并返回key。
- Word Count Mapper:这是一个我们写的类用来把InputFormat传给它的单行文本标记化成单词,然后把单词本身和一个用于表示我们见过这个词的数字“1”绑在一起
- Combiner:在开发环境中我们不需要combiner,但是combiner(或combiner的功能)是由reducer(在本文后面会有描述)实现的,在传递(键/值)对(key/value pair)到reducer之前运行在本地节点上。应用combiner能够急剧地提示性能,但是你需要确保combining你的结果不会破坏你的reducer:为了能让reducer承担combiner的功能,它的操作必须是可结合的(即reducer应与combiner一样能与map结合),否则,发送到reducer的map将不会产生正确的结果。
- Word Count Reducer: word count reducer接受一个映射(map),它映射每个单词到记录该单词所有被mapper观察到的次数的列表。没有combiner,reducer将会接受一个单词和一个全为”1”的集合,但是由于我们让reducer承担combiner的功能,我们接受到得将是一个各个待被相加到一起的数字的集合。
- TextOutputFormat:本例中,我们使用TextOutputFormat类,并告诉它key为Text类型,value为IntWritable类型。
- FileOutputFormat:TextOutputFormat发送它的格式化输出到FileOutputFormat,后者将结果写入到自己创建的”output”目录中。
你或许会疑惑为什么我们把String叫做“Text”,把number叫做“IntWritable”和“LongWritable”。原因是为了能够让value采用分布式的方式在Hadoop文件系统(HDFS)传递,存在一些定义序列化的规则。幸运的是,Hadoop为普通类型提供了包装(wrapper),但是如果你需要自己开发,那么它提供了Writable接口,你可以通过实现该接口来实现自己的需要。
Listing 1 显示了我们的第一个MapReduce应用程序的源代码。
package com.geekcap.hadoopexamples; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.KeyValueTextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; /** * Created by IntelliJ IDEA. * User: shaines * Date: 12/9/12 * Time: 9:25 PM * To change this template use File | Settings | File Templates. */ public class WordCount extends Configured implements Tool { public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private Text word = new Text(); private final static IntWritable one = new IntWritable( 1 ); public void map( LongWritable key, // Offset into the file Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { // Get the value as a String String text = value.toString().toLowerCase(); // Replace all non-characters text = text.replaceAll( "'", "" ); text = text.replaceAll( "[^a-zA-Z]", " " ); // Iterate over all of the words in the string StringTokenizer st = new StringTokenizer( text ); while( st.hasMoreTokens() ) { // Get the next token and set it as the text for our "word" variable word.set( st.nextToken() ); // Output this word as the key and 1 as the value output.collect( word, one ); } } } public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce( Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { // Iterate over all of the values (counts of occurrences of this word) int count = 0; while( values.hasNext() ) { // Add the value to our count count += values.next().get(); } // Output the word with its count (wrapped in an IntWritable) output.collect( key, new IntWritable( count ) ); } } public int run(String[] args) throws Exception { // Create a configuration Configuration conf = getConf(); // Create a job from the default configuration that will use the WordCount class JobConf job = new JobConf( conf, WordCount.class ); // Define our input path as the first command line argument and our output path as the second Path in = new Path( args[0] ); Path out = new Path( args[1] ); // Create File Input/Output formats for these paths (in the job) FileInputFormat.setInputPaths( job, in ); FileOutputFormat.setOutputPath( job, out ); // Configure the job: name, mapper, reducer, and combiner job.setJobName( "WordCount" ); job.setMapperClass( MapClass.class ); job.setReducerClass( Reduce.class ); job.setCombinerClass( Reduce.class ); // Configure the output job.setOutputFormat( TextOutputFormat.class ); job.setOutputKeyClass( Text.class ); job.setOutputValueClass( IntWritable.class ); // Run the job JobClient.runJob(job); return 0; } public static void main(String[] args) throws Exception { // Start the WordCount MapReduce application int res = ToolRunner.run( new Configuration(), new WordCount(), args ); System.exit( res ); } }
注解
你会注意到我们把Mapper类和Reducer类包含在与WordCount类本身相同的文件内。尽管不存在固定规则要求必须定义你的mapper和reducer在相同的文件内,但是惯例如此,除非你的mapper或reducer非常复杂。