你好,游客 登录 注册 搜索
背景:
阅读新闻

使用Hadoop构建MapReduce应用

[日期:2013-07-16] 来源:Linux社区  作者:lidinghao [字体: ]

随着捕获的数据的数量每年增加,我们的存储也需要增加。很多公司正在认识到“数据为王”这一道理,但是我们如何分析这些数据呢?答案就是“通过Hadoop”。在本系列的第二篇文章中,java编程专家Steven Haines将会解释什么是MapReduce应用,以及如何构建一个简单的MapReduce应用。

本系列中的第一篇文章描述了Hadoop被设计用来解决的业务问题领域,以及给予它解决这些问题能力的内部架构。运行在Hadoop内的应用被称作为MapReduce应用,因此这部文章将演示如何构建一个简单的MapReduce应用。

采用MapReduce与Hadoop进行大数据分析  http://www.linuxidc.com/Linux/2013-07/87312.htm

搭建开发环境

在你能够使用Hadoop之前,你需要安装Java 6(或者更新版本),你可以从Oracle的网站上下载对应你的平台的版本。另外,如果你是运行在Windows上,由于Hadoop运行的正式开发和部署平台是Linux,所以你需要使用Cygwin来运行Hadoop。Mac OXS用户可以无问题地原生态运行Hadoop。

Hadoop可以从它的Releases页面下载,但是它的版本号结构解释起来具有一点儿挑战性。简而言之,1.x的代码分支包含当前的稳定发行版,2.x.x分支包含用于版本2的Hadoop的alpha代码,0.22.x的代码分支与2.x.x的相同,除了没有security,0.23的代码分支去除了高可用性(high availability)。0.20.x的代码分支是历史遗留问题,你可以忽略。对于本文中的例子,我将使用0.23.x代码分支,写这篇文章是该分支的最新版本是0.23.5,但是对于生产环境,你可能会想下载1.x版本或2.x.x版本。

下载并解压文件到你的本地机器上。如果你计划要做较长时间的Hadoop开发,你最好将解压的bin文件夹添加到你的PATH环境变量中。你可以通过执行bin目录中的hadoop命令来测试你的安装:

   bin/hadoop

执行该命令而没有任何参数的话会显示以下输出:

Usage: hadoop [--config confdir] COMMAND

      where COMMAND is one of:

  fs                   run a generic filesystem user client

  version              print the version

  jar <jar>            run a jar file

  distcp <srcurl> <desturl> copy file or directories recursively

  archive -archiveName NAME -p <parent path> <src>* <dest> create a hadoop archive

  classpath            prints the class path needed to get the

                      Hadoop jar and the required libraries

  daemonlog            get/set the log level for each daemon

 or

  CLASSNAME            run the class named CLASSNAME

 

Most commands print help when invoked w/o parameters.

 

虽然有很多命令可以传递给Hadoop,但是由于本文中我们将关注在开发环境中执行Hadoop应用程序,因此我们唯一感兴趣的是下面的命令:

hadoop jar <jar-file-name>

 

Hello,MapReduce

在任何编程语言中,你编写的第一段程序通常都是一个“Hello,World”程序。对Hadoop和MapReduce而言,每个人编写的标准程序是Word Count应用程序。Word Count应用程序统计在大量的文本中每个单词出现的次数。它是一个学习MapReduce的完美例子,因为它的mapping和reducing步骤很琐细,但却引导你采用MapReduce的方式思考。下面是对Word Count应用程序中各个组件及其功能的总结:

  • FileInputFormat:我们定义一个FileInputFormat去读取指定目录下(作为第一个参数传递给MapReduce应用程序)的所有文件,并传递这些文件给一个TextInputFormat(见Listing 1)以便分发给我们的mappers。
  • TextInputFormat:Hadoop默认的InputFormat是TextInputFormat,它每次读取一行,并把字节偏移作为key(LongWritable),将这行文本作为value(Text),并返回key。
  • Word Count Mapper:这是一个我们写的类用来把InputFormat传给它的单行文本标记化成单词,然后把单词本身和一个用于表示我们见过这个词的数字“1”绑在一起
  • Combiner:在开发环境中我们不需要combiner,但是combiner(或combiner的功能)是由reducer(在本文后面会有描述)实现的,在传递(键/值)对(key/value pair)到reducer之前运行在本地节点上。应用combiner能够急剧地提示性能,但是你需要确保combining你的结果不会破坏你的reducer:为了能让reducer承担combiner的功能,它的操作必须是可结合的(即reducer应与combiner一样能与map结合),否则,发送到reducer的map将不会产生正确的结果。
  • Word Count Reducer: word count  reducer接受一个映射(map),它映射每个单词到记录该单词所有被mapper观察到的次数的列表。没有combiner,reducer将会接受一个单词和一个全为”1”的集合,但是由于我们让reducer承担combiner的功能,我们接受到得将是一个各个待被相加到一起的数字的集合。
  • TextOutputFormat:本例中,我们使用TextOutputFormat类,并告诉它key为Text类型,value为IntWritable类型。
  • FileOutputFormat:TextOutputFormat发送它的格式化输出到FileOutputFormat,后者将结果写入到自己创建的”output”目录中。

你或许会疑惑为什么我们把String叫做“Text”,把number叫做“IntWritable”和“LongWritable”。原因是为了能够让value采用分布式的方式在Hadoop文件系统(HDFS)传递,存在一些定义序列化的规则。幸运的是,Hadoop为普通类型提供了包装(wrapper),但是如果你需要自己开发,那么它提供了Writable接口,你可以通过实现该接口来实现自己的需要。

Listing 1 显示了我们的第一个MapReduce应用程序的源代码。

 

package com.geekcap.hadoopexamples;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.KeyValueTextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

/**
 * Created by IntelliJ IDEA.
 * User: shaines
 * Date: 12/9/12
 * Time: 9:25 PM
 * To change this template use File | Settings | File Templates.
 */
public class WordCount extends Configured implements Tool {

    public static class MapClass extends MapReduceBase
            implements Mapper<LongWritable, Text, Text, IntWritable>
    {
        private Text word = new Text();
        private final static IntWritable one = new IntWritable( 1 );

        public void map( LongWritable key, // Offset into the file
                         Text value,
                         OutputCollector<Text, IntWritable> output,
                         Reporter reporter) throws IOException
        {
            // Get the value as a String
            String text = value.toString().toLowerCase();

            // Replace all non-characters
            text = text.replaceAll( "'", "" );
            text = text.replaceAll( "[^a-zA-Z]", " " );

            // Iterate over all of the words in the string
            StringTokenizer st = new StringTokenizer( text );
            while( st.hasMoreTokens() )
            {
                // Get the next token and set it as the text for our "word" variable
                word.set( st.nextToken() );

                // Output this word as the key and 1 as the value
                output.collect( word, one );
            }
        }
    }

    public static class Reduce extends MapReduceBase
            implements Reducer<Text, IntWritable, Text, IntWritable>
    {
        public void reduce( Text key, Iterator<IntWritable> values,
                            OutputCollector<Text, IntWritable> output,
                            Reporter reporter) throws IOException
        {
            // Iterate over all of the values (counts of occurrences of this word)
            int count = 0;
            while( values.hasNext() )
            {
                // Add the value to our count
                count += values.next().get();
            }

            // Output the word with its count (wrapped in an IntWritable)
            output.collect( key, new IntWritable( count ) );
        }
    }


    public int run(String[] args) throws Exception
    {
        // Create a configuration
        Configuration conf = getConf();

        // Create a job from the default configuration that will use the WordCount class
        JobConf job = new JobConf( conf, WordCount.class );

        // Define our input path as the first command line argument and our output path as the second
        Path in = new Path( args[0] );
        Path out = new Path( args[1] );

        // Create File Input/Output formats for these paths (in the job)
        FileInputFormat.setInputPaths( job, in );
        FileOutputFormat.setOutputPath( job, out );

        // Configure the job: name, mapper, reducer, and combiner
        job.setJobName( "WordCount" );
        job.setMapperClass( MapClass.class );
        job.setReducerClass( Reduce.class );
        job.setCombinerClass( Reduce.class );

        // Configure the output
        job.setOutputFormat( TextOutputFormat.class );
        job.setOutputKeyClass( Text.class );
        job.setOutputValueClass( IntWritable.class );

        // Run the job
        JobClient.runJob(job);
        return 0;
    }

    public static void main(String[] args) throws Exception
    {
        // Start the WordCount MapReduce application
        int res = ToolRunner.run( new Configuration(),
                new WordCount(),
                args );
        System.exit( res );
    }
}

 

 

 

注解

你会注意到我们把Mapper类和Reducer类包含在与WordCount类本身相同的文件内。尽管不存在固定规则要求必须定义你的mapper和reducer在相同的文件内,但是惯例如此,除非你的mapper或reducer非常复杂。

linux
本文评论   查看全部评论 (0)
表情: 表情 姓名: 字数

       

评论声明
  • 尊重网上道德,遵守中华人民共和国的各项有关法律法规
  • 承担一切因您的行为而直接或间接导致的民事或刑事法律责任
  • 本站管理人员有权保留或删除其管辖留言中的任意内容
  • 本站有权在网站内转载或引用您的评论
  • 参与本评论即表明您已经阅读并接受上述条款