最近開了一個讀者回饋表單郵箱,無論是對文章的感想或是對部落格的感想,有什麼想回饋的都可以發郵箱跟我說:i_kkkp@163.com

hadoop之MapReduce工作原理

MapReduce 定义

MapReduce 是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应用”的核心框架,其核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个hadoop集群上。

MapReduce 产生缘由

为什么需要MapReduce?

  • 海量数据在单机上处理因为硬件资源限制,无法胜任。
  • 而一旦将单机版程序扩展到集群来分布式运行,将极大增加程序的复杂度和开发难度。
  • 引入MapReduce框架后,开发人员可以将绝大部分工作集中在业务逻辑的开发上,而将分布式计算中的复杂性交由框架来处理。

设想一个海量数据场景下的wordcount需求:

  • 单机版:内存受限,磁盘受限,运算能力受限
  • 分布式:文件分布式存储(HDFS)、运算逻辑需要至少分成2个阶段(一个阶段独立并发,一个阶段汇聚)、运算程序如何分发、程序如何分配运算任务(切片)、两阶段的程序如何启动?如何协调?、整个程序运行过程中的监控?容错?重试?

可见在程序由单机版扩成分布式时,会引入大量的复杂工作。

MapReduce与Yarn的关系

Yarn 是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台。而MapReduce等运算程序则相当于运行于操作系统之上的应用程序。

YARN的重要概念:

  1. yarn并不清楚用户提交的程序的运行机制;

  2. yarn只提供运算资源的调度(用户程序向yarn申请资源,yarn就负责分配资源);

  3. yarn中的主管角色叫ResourceManager;

  4. yarn中具体提供运算资源的角色叫NodeManager;

  5. 这样一来,yarn其实就与运行的用户程序完全解耦,就意味着yarn上可以运行各种类型的分布式运算程序(MapReduce只是其中的一种),比如MapReduce、storm程序,spark程序,tez……;

  6. 所以,spark、storm等运算框架都可以整合在yarn上运行,只要他们各自的框架中有符合yarn规范的资源请求机制即可;

  7. Yarn就成为一个通用的资源调度平台,从此,企业中以前存在的各种运算集群都可以整合在一个物理集群上,提高资源利用率,方便数据共享。

MapReduce 工作原理

严格说起来MapReduce并不是一种算法, 而是一个计算思想。它由map和reduce两个阶段组成。

MapReduce 进程

为了提高开发效率,可以将分布式程序中的公共功能封装成框架,让开发人员可以将精力集中于业务逻辑。

而MapReduce就是这样一个分布式程序的通用框架,整体结构如下(在分布式运行时有三类实例进程):

  • MRAppMaster:负责整个程序的过程调度及状态协调
  • MapTask:负责map阶段的整个数据处理流程
  • ReduceTask:负责reduce阶段的整个数据处理流程

MapReduce 运行机制

hadoop

流程描述如下:

  1. 一个MR程序启动的时候,最先启动的是MRAppMaster,MRAppMaster启动后根据本次job的描述信息,计算出需要的MapTask实例数量,然后向集群申请机器启动相应数量的MapTask进程;

  2. MapTask进程启动之后,根据给定的数据切片范围进行数据处理,主体流程为:

    • 利用客户指定的inputformat来获取RecordReader读取数据,形成输入KV对;
    • 将输入KV对传递给客户定义的map()方法,做逻辑运算,并将map()方法输出的KV对收集到缓存;
    • 将缓存中的KV对按照K分区排序后不断溢写到磁盘文件。
  3. MRAppMaster监控到所有MapTask进程任务完成之后,会根据客户指定的参数启动相应数量的ReduceTask进程,并告知ReduceTask进程要处理的数据范围(数据分区);

  4. ReduceTask进程启动之后,根据MRAppMaster告知的待处理数据所在位置,从若干台MapTask运行所在机器上获取到若干个MapTask输出结果文件,并在本地进行重新归并排序,然后按照相同key的KV为一个组,调用客户定义的reduce()方法进行逻辑运算,并收集运算输出的结果KV,然后调用客户指定的outputformat将结果数据输出到外部存储。

我们来举个例子。

hadoop
上图是一个统计词频的任务。

  1. Hadoop将输入数据切成若干个分片,并将每个split(分割)交给一个map task(Map任务)处理。

  2. Mapping之后,相当于得出这个task里面,每个词以及它出现的次数。

  3. shuffle(拖移)将相同的词放在一起,并对它们进行排序,分成若干个分片。

  4. 根据这些分片,进行reduce(归约)。

  5. 统计出reduce task的结果,输出到文件。

在MapReduce里,为了完成上面这些过程,需要两个角色:JobTracker和TaskTracker。

hadoop

JobTracker用于调度和管理其它的TaskTracker。JobTracker可以运行于集群中任一台计算机上。TaskTracker 负责执行任务,必须运行于 DataNode 上。

现在这边给出一个简单的mapreduce实现示例:

用于统计输入文件中每个单词的出现次数。

  1. 导入必要的包:

    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  2. 定义Mapper类:

    public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            // 将每行文本拆分为单词,然后发送到Reducer
            String[] words = line.split("\\s+");
            for (String word : words) {
                context.write(new Text(word), new IntWritable(1));
            }
        }
    }

    Mapper类的作用是将输入的文本数据拆分成单词,然后为每个单词输出一个键-值对(单词, 1)。

  3. 定义Reducer类:

    public static class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            // 对相同单词的出现次数进行累加
            for (IntWritable value : values) {
                sum += value.get();
            }
            // 输出单词和其出现的总次数
            context.write(key, new IntWritable(sum));
        }
    }

    Reducer类的作用是接收来自Mapper的键-值对,对相同键的值进行累加,然后输出单词和其总出现次数。

  4. 主函数(main方法):

    public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(word.class);
    
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
    
        job.setReducerClass(MyReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
    
        // 设置输入路径和输出路径
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
        // 提交作业并等待完成
        job.waitForCompletion(true);
    }

hadoop

在整个Hadoop架构中,计算框架起到承上启下的作用,一方面可以操作HDFS中的数据,另一方面可以被封装,提供Hive、Pig这样的上层组件的调用。

我们简单介绍一下其中几个比较重要的组件。

HBase:来源于Google的BigTable;是一个高可靠性、高性能、面向列、可伸缩的分布式数据库。

Hive:是一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,通过类SQL语句快速实现简单的MapReduce统计,不必开发专门的MapReduce应用,十分适合数据仓库的统计分析。

Pig:是一个基于Hadoop的大规模数据分析工具,它提供的SQL-LIKE语言叫Pig Latin,该语言的编译器会把类SQL的数据分析请求转换为一系列经过优化处理的MapReduce运算。

ZooKeeper:来源于Google的Chubby;它主要是用来解决分布式应用中经常遇到的一些数据管理问题,简化分布式应用协调及其管理的难度。

Ambari:Hadoop管理工具,可以快捷地监控、部署、管理集群。

Sqoop:用于在Hadoop与传统的数据库间进行数据的传递。

Mahout:一个可扩展的机器学习和数据挖掘库。

Hadoop的优点和应用

总的来看,Hadoop有以下优点:

高可靠性:这个是由它的基因决定的。它的基因来自Google。Google最擅长的事情,就是“垃圾利用”。Google起家的时候就是穷,买不起高端服务器,所以,特别喜欢在普通电脑上部署这种大型系统。虽然硬件不可靠,但是系统非常可靠。

高扩展性:Hadoop是在可用的计算机集群间分配数据并完成计算任务的,这些集群可以方便地进行扩展。说白了,想变大很容易。

高效性:Hadoop能够在节点之间动态地移动数据,并保证各个节点的动态平衡,因此处理速度非常快。

高容错性:Hadoop能够自动保存数据的多个副本,并且能够自动将失败的任务重新分配。这个其实也算是高可靠性。

低成本:Hadoop是开源的,依赖于社区服务,使用成本比较低。

基于这些优点,Hadoop适合应用于大数据存储和大数据分析的应用,适合于服务器几千台到几万台的集群运行,支持PB级的存储容量。

Hadoop的应用非常广泛,包括:搜索、日志处理、推荐系统、数据分析、视频图像分析、数据保存等,都可以使用它进行部署。

Hadoop 2.0体系架构之分布式文件系统HDFS 一文读懂什么是Hadoop

評論