#Hadoop,Cloud-Computing

Hadoop Yarn 是什么

在古老的 Hadoop1.0 中,MapReduce 的 JobTracker 负责了太多的工作,包括资源调度,管理众多的 TaskTracker 等工作。这自然是不合理的,于是 Hadoop 在 1.0 到 2.0 的升级过程中,便将 JobTracker 的资源调度工作独立了出来,而这一改动,直接让 Hadoop 成为大数据中最稳固的那一块基石。,而这个独立出来的资源管理框架,就是 Yarn 。

在详细介绍 Yarn 之前,我们先简单聊聊 Yarn ,Yarn 的全称是 Yet Another Resource Negotiator ,意思是“另一种资源调度器”,这种命名和“有间客栈”这种可谓是异曲同工之妙。这里多说一句,以前 Java 有一个项目编译工具,叫做 Ant,他的命名也是类似的,叫做 “Another Neat Tool”的缩写,翻译过来是“另一种整理工具”。

既然都叫做资源调度器了,那么自然,它的功能也是负责资源管理和调度的,接下来,我们就深入到 Yarn 这个东西内部一探究竟吧。

Yarn 架构

hadoop-Yarn

① Client:客户端,负责向集群提交作业。

② ResourceManager:集群主进程,仲裁中心,负责集群资源管理和任务调度。

③ Scheduler:资源仲裁模块。

④ ApplicationManager:选定,启动和监管ApplicationMaster。

⑤ NodeManager:集群从进程,管理监视Containers,执行具体任务。

⑥ Container:本机资源集合体,如某Container为4个CPU,8GB内存。

⑦ ApplicationMaster:任务执行和监管中心。

三个主要组件

再看最上面的图,我们能直观发现的两个主要的组件是 ResourceManagerNodeManager ,但其实还有一个 ApplicationMaster 在图中没有直观显示。我们分别来看这三个组件。

ResourceManager

我们先来说说上图中最中央的那个 ResourceManager(RM)。从名字上我们就能知道这个组件是负责资源管理的,整个系统有且只有一个 RM ,来负责资源的调度。

它也包含了两个主要的组件:定时调用器(Scheduler)以及应用管理器(ApplicationManager)

定时调度器(Scheduler):从本质上来说,定时调度器就是一种策略,或者说一种算法。当 Client 提交一个任务的时候,它会根据所需要的资源以及当前集群的资源状况进行分配。注意,它只负责向应用程序分配资源,并不做监控以及应用程序的状态跟踪。

应用管理器(ApplicationManager):同样,听名字就能大概知道它是干嘛的。应用管理器就是负责管理 Client 用户提交的应用。上面不是说到定时调度器(Scheduler)不对用户提交的程序监控嘛,其实啊,监控应用的工作正是由应用管理器(ApplicationManager)完成的。

ApplicationMaster

每当 Client 提交一个 Application 时候,就会新建一个 ApplicationMaster 。由这个 ApplicationMaster 去与 ResourceManager 申请容器资源,获得资源后会将要运行的程序发送到容器上启动,然后进行分布式计算。

这里可能有些难以理解,为什么是把运行程序发送到容器上去运行?如果以传统的思路来看,是程序运行着不动,然后数据进进出出不停流转。但当数据量大的时候就没法这么玩了,因为海量数据移动成本太大,时间太长。但是中国有一句老话山不过来,我就过去。大数据分布式计算就是这种思想,既然大数据难以移动,那我就把容易移动的应用程序发布到各个节点进行计算呗,这就是大数据分布式计算的思路。

NodeManager

NodeManager 是 ResourceManager 在每台机器的上代理,负责容器的管理,并监控他们的资源使用情况(cpu,内存,磁盘及网络等),以及向 ResourceManager/Scheduler 提供这些资源使用报告。

Yarn的主要思想是将MRv1版JobTracker的两大功能——资源管理和任务调度,拆分成两个独立的进程:

hadoop-Yarn

  • Yarn依旧是master/slave结构

  • 主进程ResourceManager是整个集群资源仲裁中心

  • 从进程NodeManager管理本机资源

  • ResourceManager和从属节点的进程NodeManager组成了Hadoop 2.0的分布式数据计算框架

提交一个 Application 到 Yarn 的流程

hadoop-Yarn

这张图简单地标明了提交一个程序所经历的流程,接下来我们来具体说说每一步的过程。

  • Client 向 Yarn 提交 Application,这里我们假设是一个 MapReduce 作业。

  • ResourceManager 向 NodeManager 通信,为该 Application 分配第一个容器。并在这个容器中运行这个应用程序对应的 ApplicationMaster。

  • ApplicationMaster 启动以后,对 作业(也就是 Application) 进行拆分,拆分 task 出来,这些 task 可以运行在一个或多个容器中。然后向
    ResourceManager 申请要运行程序的容器,并定时向 ResourceManager 发送心跳。

  • 申请到容器后,ApplicationMaster 会去和容器对应的 NodeManager 通信,而后将作业分发到对应的 NodeManager 中的容器去运行,这里会将拆分后的 MapReduce 进行分发,对应容器中运行的可能是 Map 任务,也可能是 Reduce 任务。

  • 容器中运行的任务会向 ApplicationMaster 发送心跳,汇报自身情况。当程序运行完成后, ApplicationMaster 再向 ResourceManager 注销并释放容器资源。
    以上就是一个作业的大体运行流程。

hadoop-Yarn

Yarn 架构典型拓扑

除了ResourceManagerNodeManager两个实体外,Yarn还包括WebAppProxyServerJobHistoryServer两个实体。

hadoop-Yarn

JobHistoryServer:管理已完成的Yarn任务

  • 历史任务的日志和执行时的各种统计信息统一由JobTracker管理
  • Yarn将管理历史任务的功能抽象成一独立实体JobHistoryServer

WebAppProxyServer:任务执行时的Web页面代理

  • 通过使用代理,不仅进一步降低了ResourceManager的压力,还能降低Yarn受到的Web攻击
  • 负责监管具体MapReduce任务执行全过程,将从Container那里收集过的任务执行信息汇总并显示到一个Web界面上

Yarn 调度策略

容量调度算法
CapacityScheduler是一种多用户多任务调度策略,它以队列为单位划分任务,以Container为单位分配资源

hadoop-Yarn

公平调度策略
FairScheduler是一种允许多个Yarn任务公平使用集群资源的可插拔式调度策略

hadoop-Yarn

HDFS设计原则

设计目标

存储非常大的文件:这里非常大指的是几百M、G、或者TB级别。

  • 采用流式的数据访问方式: HDFS基于这样的一个假设:最有效的数据处理模式是一次写入、多次读取数据集经常从数据源生成或者拷贝一次,然后在其上做很多分析工作
    分析工作经常读取其中的大部分数据,即使不是全部。 因此读取整个数据集所需时间比读取第一条记录的延时更重要。

  • 运行于商业硬件上: Hadoop不需要特别贵的、reliable的(可靠的)机器,可运行于普通商用机器(可以从多家供应商采购) ,商用机器不代表低端机器。在集群中(尤其是大的集群),节点失败率是比较高的HDFS的目标是确保集群在节点失败的时候不会让用户感觉到明显的中断。

HDFS不适合的应用类型

有些场景不适合使用HDFS来存储数据。下面列举几个:

  1. 低延时的数据访问
    对延时要求在毫秒级别的应用,不适合采用HDFS。HDFS是为高吞吐数据传输设计的,因此可能牺牲延时HBase更适合低延时的数据访问。

  2. 大量小文件
    文件的元数据(如目录结构,文件block的节点列表,block-node mapping)保存在NameNode的内存中, 整个文件系统的文件数量会受限于NameNode的内存大小。
    经验而言,一个文件/目录/文件块一般占有150字节的元数据内存空间。如果有100万个文件,每个文件占用1个文件块,则需要大约300M的内存。因此十亿级别的文件数量在现有商用机器上难以支持。

  3. 多方读写,需要任意的文件修改
    HDFS采用追加(append-only)的方式写入数据。不支持文件任意offset的修改。不支持多个写入器(writer)

HDFS定位

为提高扩展性,HDFS采用了master/slave架构来构建分布式存储集群,这种架构很容易向集群中任意添加或删除slave。

HDFS是Hadoop生态系统中的一个重要组件,它是一个分布式文件系统,旨在存储大量数据,并提供高吞吐量的数据访问。HDFS的设计目标是将数据存储在廉价的硬件上,并提供高容错性。它通过将数据分散到集群中的多个节点上来实现这一目标。HDFS的定位是作为一个批处理系统,适用于大规模数据的离线处理。

HDFS的主要特点包括:

  • 高容错性:HDFS将数据分散到多个节点上,因此即使某个节点出现故障,数据仍然可以通过其他节点进行访问。
  • 高吞吐量:HDFS的设计目标是支持大规模数据的批处理,因此它提供了高吞吐量的数据访问。
  • 适用于大文件:HDFS适用于存储大文件,因为它将文件分成多个块进行存储,并将这些块分散到多个节点上。
  • 流式数据访问:HDFS支持流式数据访问,这意味着它可以高效地处理大量的数据流。

hadoop-HDFS

HDFS体系架构

HDFS采用master/slave体系来构建分布式存储服务,提高了HDFS的可扩展性又简化了架构设计。
HDFS里将文件分块存储,优化存储颗粒度。namenode统一管理所有slave机器datanode存储空间,datanode以块为单位存储实际的数据。真正的文件I/O操作时客户端直接和datanode交互。

HDFS核心概念

Blocks

物理磁盘中有块的概念,磁盘的物理Block是磁盘操作最小的单元,读写操作均以Block为最小单元,一般为512 Byte。文件系统在物理Block之上抽象了另一层概念,文件系统Block物理磁盘Block的整数倍。通常为几KB。Hadoop提供的df、fsck这类运维工具都是在文件系统的Block级别上进行操作。

HDFS的Block块比一般单机文件系统大得多,默认为128M。HDFS的文件被拆分成block-sized的chunk,chunk作为独立单元存储。比Block小的文件不会占用整个Block,只会占据实际大小。例如, 如果一个文件大小为1M,则在HDFS中只会占用1M的空间,而不是128M。

HDFS的Block为什么这么大?
是为了最小化查找(seek)时间,控制定位文件与传输文件所用的时间比例。假设定位到Block所需的时间为10ms,磁盘传输速度为100M/s。如果要将定位到Block所用时间占传输时间的比例控制1%,则Block大小需要约100M。
但是如果Block设置过大,在MapReduce任务中,Map或者Reduce任务的个数 如果小于集群机器数量,会使得作业运行效率很低。

Block抽象的好处

  • Block的拆分使得单个文件大小可以大于整个磁盘的容量,构成文件的Block可以分布在整个集群, 理论上,单个文件可以占据集群中所有机器的磁盘。
  • Block的抽象也简化了存储系统,对于Block,无需关注其权限,所有者等内容(这些内容都在文件级别上进行控制)。
  • Block作为容错和高可用机制中的副本单元,即以Block为单位进行复制。

Namenode & Datanode

整个HDFS集群由Namenode和Datanode构成master-worker(主从)模式。Namenode负责构建命名空间,管理文件的元数据等,而Datanode负责实际存储数据,负责读写工作。

Namenode

Namenode存放文件系统树及所有文件、目录的元数据。元数据持久化为2种形式:

  • namespace image
  • edit log

但是持久化数据中不包括Block所在的节点列表,及文件的Block分布在集群中的哪些节点上,这些信息是在系统重启的时候重新构建(通过Datanode汇报的Block信息)。
在HDFS中,Namenode可能成为集群的单点故障,Namenode不可用时,整个文件系统是不可用的。HDFS针对单点故障提供了2种解决机制:

  1. 备份持久化元数据
    将文件系统的元数据同时写到多个文件系统, 例如同时将元数据写到本地文件系统及NFS。这些备份操作都是同步的、原子的。

  2. Secondary Namenode
    Secondary节点定期合并主Namenode的namespace image和edit log, 避免edit log过大,通过创建检查点checkpoint来合并。它会维护一个合并后的namespace image副本, 可用于在Namenode完全崩溃时恢复数据。下图为Secondary Namenode的管理界面:

hadoop-HDFS

Secondary Namenode通常运行在另一台机器,因为合并操作需要耗费大量的CPU和内存。其数据落后于Namenode,因此当Namenode完全崩溃时,会出现数据丢失。 通常做法是拷贝NFS中的备份元数据到Second,将其作为新的主Namenode。
在HA(High Availability高可用性)中可以运行一个Hot Standby,作为热备份,在Active Namenode故障之后,替代原有Namenode成为Active Namenode。

Datanode

数据节点负责存储和提取Block,读写请求可能来自namenode,也可能直接来自客户端。数据节点周期性向Namenode汇报自己节点上所存储的Block相关信息。

hadoop-HDFS

经典HDFS体系架构

NameNode负责管理文件系统的元数据信息,而DataNode则负责存储文件块的实际数据。 这种分工使得HDFS能够高效地存储和管理大规模数据。

hadoop-HDFS

具体来说,当一个客户端需要读取或写入一个文件时,它会向NameNode发送请求。NameNode会返回文件的元数据信息和文件块的位置信息。客户端根据这些信息与DataNode进行通信,从而读取或写入文件块的实际数据。

因此,NameNode和DataNode在HDFS体系架构中扮演着不同的角色。

作用上的区别是什么?

HDFS是Hadoop分布式文件系统的缩写,是Hadoop生态系统中的一个重要组件。HDFS的体系架构包括一个NameNode和多个DataNode。NameNode是HDFS的主节点,负责管理文件系统的命名空间、文件的元数据信息以及文件块的位置信息。而DataNode则是HDFS的从节点,负责存储文件块的实际数据。

具体来说,当一个客户端需要读取或写入一个文件时,它会向NameNode发送请求。NameNode会返回文件的元数据信息和文件块的位置信息。客户端根据这些信息与DataNode进行通信,从而读取或写入文件块的实际数据。

hadoop-HDFS

一般拓扑

只有单个NameNode节点,使用SecondaryNameNode或BackupNode节点实时获取NameNode元数据信息,备份元数据。

hadoop-HDFS

商用拓扑

有两个NameNode节点,并使用ZooKeeper实现NameNode节点间的热切换。

hadoop-HDFS

命令行接口

HDFS提供了各种交互方式,例如通过Java API、HTTP、shell命令行的。命令行的交互主要通过hadoop fs来操作。例如:

hadoop fs -copyFromLocal // 从本地复制文件到HDFS
hadoop fs mkdir // 创建目录
hadoop fs -ls // 列出文件列表

Hadoop中,文件和目录的权限类似于POSIX模型,包括读、写、执行3种权限:

读权限(r):用于读取文件或者列出目录中的内容
写权限(w):对于文件,就是文件的写权限。目录的写权限指在该目录下创建或者删除文件(目录)的权限。
执行权限(x):文件没有所谓的执行权限,被忽略。对于目录,执行权限用于访问器目录下的内容。

每个文件或目录都有owner,group,mode三个属性:

owner:指文件的所有者
group:为权限组
mode:由所有者权限、文件所属的组中组员的权限、非所有者非组员的权限组成。

hadoop-HDFS

数据流(读写流程)

读文件

大致读文件的流程如下:

hadoop-HDFS

  1. 客户端传递一个文件Path给FileSystem的open方法

  2. DFS采用RPC远程获取文件最开始的几个block的datanode地址。Namenode会根据网络拓扑结构决定返回哪些节点(前提是节点有block副本),如果客户端本身是Datanode并且节点上刚好有block副本,直接从本地读取。

  3. 客户端使用open方法返回的FSDataInputStream对象读取数据(调用read方法)

  4. DFSInputStream(FSDataInputStream实现了改类)连接持有第一个block的、最近的节点,反复调用read方法读取数据

  5. 第一个block读取完毕之后,寻找下一个block的最佳datanode,读取数据。如果有必要,DFSInputStream会联系Namenode获取下一批Block 的节点信息(存放于内存,不持久化),这些寻址过程对客户端都是不可见的。

  6. 数据读取完毕,客户端调用close方法关闭流对象

在读数据过程中,如果与Datanode的通信发生错误,DFSInputStream对象会尝试从下一个最佳节点读取数据,并且记住该失败节点, 后续Block的读取不会再连接该节点

读取一个Block之后,DFSInputStram会进行检验和验证,如果Block损坏,尝试从其他节点读取数据,并且将损坏的block汇报给Namenode。

客户端连接哪个datanode获取数据,是由namenode来指导的,这样可以支持大量并发的客户端请求,namenode尽可能将流量均匀分布到整个集群。

Block的位置信息是存储在namenode的内存中,因此相应位置请求非常高效,不会成为瓶颈。

写文件

hadoop-HDFS

步骤分解

  1. 客户端调用DistributedFileSystem的create方法

  2. DistributedFileSystem远程RPC调用Namenode在文件系统的命名空间中创建一个新文件,此时该文件没有关联到任何block。 这个过程中,Namenode会做很多校验工作,例如是否已经存在同名文件,是否有权限,如果验证通过,返回一个FSDataOutputStream对象。 如果验证不通过,抛出异常到客户端。

  3. 客户端写入数据的时候,DFSOutputStream分解为packets(数据包),并写入到一个数据队列中,该队列由DataStreamer消费。

  4. DateStreamer负责请求Namenode分配新的block存放的数据节点。这些节点存放同一个Block的副本,构成一个管道。 DataStreamer将packet写入到管道的第一个节点,第一个节点存放好packet之后,转发给下一个节点,下一个节点存放 之后继续往下传递。

  5. DFSOutputStream同时维护一个ack queue队列,等待来自datanode确认消息。当管道上的所有datanode都确认之后,packet从ack队列中移除。

  6. 数据写入完毕,客户端close输出流。将所有的packet刷新到管道中,然后安心等待来自datanode的确认消息。全部得到确认之后告知Namenode文件是完整的。 Namenode此时已经知道文件的所有Block信息(因为DataStreamer是请求Namenode分配block的),只需等待达到最小副本数要求,然后返回成功信息给客户端。

Namenode如何决定副本存在哪个Datanode?

HDFS的副本的存放策略是可靠性、写带宽、读带宽之间的权衡。默认策略如下:

第一个副本放在客户端相同的机器上,如果机器在集群之外,随机选择一个(但是会尽可能选择容量不是太慢或者当前操作太繁忙的)

第二个副本随机放在不同于第一个副本的机架上。

第三个副本放在跟第二个副本同一机架上,但是不同的节点上,满足条件的节点中随机选择。

更多的副本在整个集群上随机选择,虽然会尽量避免太多副本在同一机架上。

副本的位置确定之后,在建立写入管道的时候,会考虑网络拓扑结构。下面是可能的一个存放策略:

hadoop-HDFS

这样选择很好的平衡了可靠性、读写性能

  • 可靠性:Block分布在两个机架上

  • 写带宽:写入管道的过程只需要跨越一个交换机

  • 读带宽:可以从两个机架中任选一个读取

HDFS内部特性

数据冗余

  • HDFS将每个文件存储成一系列数据块(Block),默认块大小为64MB(可配置)。

  • 为了容错,文件的所有数据块都会有副本(副本数量即复制因子,可配置)。

  • HDFS的文件都是一次性写入的,并且严格限制为任何时候都只有一个写用户。

副本存放

  • HDFS集群一般运行在多个机架上,不同机架上机器的通信需要通过交换机。

  • HDFS采用机架感知(Rack-aware)的策略来改进数据的可靠性、可用性和网络带宽的利用率。

  • 机架的错误远比节点的错误少,这个策略可以防止整个机架失效时数据丢失,提高数据的可靠性和可用性,又能保证性能。

副本选择

  • HDFS会尽量使用离程序最近的副本来满足用户请求,这样可以减少总带宽消耗和读延时。

  • HDFS的架构支持数据均衡策略。

心跳检测

  • NameNode周期性地从集群中的每个DataNode接受心跳包和块报告,收到心跳包说明该DataNode工作正常。

  • NameNode会标记最近没有心跳的DataNode为宕机,不会发给它们任何新的I/O请求。

  • NameNode会不断检测这些需要复制的数据块,并在需要的时候重新复制。

数据完整性检测

  • 多种原因可能造成从DataNode获取的数据块有损坏。

  • HDFS客户端软件实现了对HDFS文件内容的校验和检查(Checksum)。

  • DataNode获得的数据块对应的校验和隐藏文件中的不同,客户端就会判定数据块有损坏,将从其他DataNode获取该数据块的副本。

简单一致性模型、流式数据访问

  • HDFS的应用程序一般对文件实行一次写、多次读的访问模式。

  • 文件一旦创建、写入和关闭之后就不需要再更改了。

  • 这样就简化了数据一致性问题,高吞吐量的数据访问才成为可能;运行在HDFS上的应用主要以流式读为主,做批量处理;更注重数据访问的高吞吐量。

客户端缓存

  • 客户端创建文件的请求不是立即到达NameNode,HDFS客户端先把数据缓存到本地的一个临时文件,程序的写操作透明地重定向到这个临时文件。

  • 当这个临时文件累积的数据超过一个块的大小(64MB)时,客户端才会联系NameNode。

  • 如果NameNode在文件关闭之前死机,那么文件将会丢失。

  • 如果不采用客户端缓存,网络速度和拥塞都会对输出产生很大的影响。

MapReduce 定义

MapReduce 是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应用”的核心框架,其核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个hadoop集群上。

MapReduce 产生缘由

为什么需要MapReduce?

  • 海量数据在单机上处理因为硬件资源限制,无法胜任。
  • 而一旦将单机版程序扩展到集群来分布式运行,将极大增加程序的复杂度和开发难度。
  • 引入MapReduce框架后,开发人员可以将绝大部分工作集中在业务逻辑的开发上,而将分布式计算中的复杂性交由框架来处理。

设想一个海量数据场景下的wordcount需求:

  • 单机版:内存受限,磁盘受限,运算能力受限
  • 分布式:文件分布式存储(HDFS)、运算逻辑需要至少分成2个阶段(一个阶段独立并发,一个阶段汇聚)、运算程序如何分发、程序如何分配运算任务(切片)、两阶段的程序如何启动?如何协调?、整个程序运行过程中的监控?容错?重试?

可见在程序由单机版扩成分布式时,会引入大量的复杂工作。

MapReduce与Yarn的关系

Yarn 是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台。而MapReduce等运算程序则相当于运行于操作系统之上的应用程序。

YARN的重要概念:

  1. yarn并不清楚用户提交的程序的运行机制;

  2. yarn只提供运算资源的调度(用户程序向yarn申请资源,yarn就负责分配资源);

  3. yarn中的主管角色叫ResourceManager;

  4. yarn中具体提供运算资源的角色叫NodeManager;

  5. 这样一来,yarn其实就与运行的用户程序完全解耦,就意味着yarn上可以运行各种类型的分布式运算程序(MapReduce只是其中的一种),比如MapReduce、storm程序,spark程序,tez……;

  6. 所以,spark、storm等运算框架都可以整合在yarn上运行,只要他们各自的框架中有符合yarn规范的资源请求机制即可;

  7. Yarn就成为一个通用的资源调度平台,从此,企业中以前存在的各种运算集群都可以整合在一个物理集群上,提高资源利用率,方便数据共享。

MapReduce 工作原理

严格说起来MapReduce并不是一种算法, 而是一个计算思想。它由map和reduce两个阶段组成。

MapReduce 进程

为了提高开发效率,可以将分布式程序中的公共功能封装成框架,让开发人员可以将精力集中于业务逻辑。

而MapReduce就是这样一个分布式程序的通用框架,整体结构如下(在分布式运行时有三类实例进程):

  • MRAppMaster:负责整个程序的过程调度及状态协调
  • MapTask:负责map阶段的整个数据处理流程
  • ReduceTask:负责reduce阶段的整个数据处理流程

MapReduce 运行机制

hadoop

流程描述如下:

  1. 一个MR程序启动的时候,最先启动的是MRAppMaster,MRAppMaster启动后根据本次job的描述信息,计算出需要的MapTask实例数量,然后向集群申请机器启动相应数量的MapTask进程;

  2. MapTask进程启动之后,根据给定的数据切片范围进行数据处理,主体流程为:

    • 利用客户指定的inputformat来获取RecordReader读取数据,形成输入KV对;
    • 将输入KV对传递给客户定义的map()方法,做逻辑运算,并将map()方法输出的KV对收集到缓存;
    • 将缓存中的KV对按照K分区排序后不断溢写到磁盘文件。
  3. MRAppMaster监控到所有MapTask进程任务完成之后,会根据客户指定的参数启动相应数量的ReduceTask进程,并告知ReduceTask进程要处理的数据范围(数据分区);

  4. ReduceTask进程启动之后,根据MRAppMaster告知的待处理数据所在位置,从若干台MapTask运行所在机器上获取到若干个MapTask输出结果文件,并在本地进行重新归并排序,然后按照相同key的KV为一个组,调用客户定义的reduce()方法进行逻辑运算,并收集运算输出的结果KV,然后调用客户指定的outputformat将结果数据输出到外部存储。

我们来举个例子。

hadoop
上图是一个统计词频的任务。

  1. Hadoop将输入数据切成若干个分片,并将每个split(分割)交给一个map task(Map任务)处理。

  2. Mapping之后,相当于得出这个task里面,每个词以及它出现的次数。

  3. shuffle(拖移)将相同的词放在一起,并对它们进行排序,分成若干个分片。

  4. 根据这些分片,进行reduce(归约)。

  5. 统计出reduce task的结果,输出到文件。

在MapReduce里,为了完成上面这些过程,需要两个角色:JobTracker和TaskTracker。

hadoop

JobTracker用于调度和管理其它的TaskTracker。JobTracker可以运行于集群中任一台计算机上。TaskTracker 负责执行任务,必须运行于 DataNode 上。

现在这边给出一个简单的mapreduce实现示例:

用于统计输入文件中每个单词的出现次数。

  1. 导入必要的包:

    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  2. 定义Mapper类:

    public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            // 将每行文本拆分为单词,然后发送到Reducer
            String[] words = line.split("\\s+");
            for (String word : words) {
                context.write(new Text(word), new IntWritable(1));
            }
        }
    }

    Mapper类的作用是将输入的文本数据拆分成单词,然后为每个单词输出一个键-值对(单词, 1)。

  3. 定义Reducer类:

    public static class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            // 对相同单词的出现次数进行累加
            for (IntWritable value : values) {
                sum += value.get();
            }
            // 输出单词和其出现的总次数
            context.write(key, new IntWritable(sum));
        }
    }

    Reducer类的作用是接收来自Mapper的键-值对,对相同键的值进行累加,然后输出单词和其总出现次数。

  4. 主函数(main方法):

    public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(word.class);
    
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
    
        job.setReducerClass(MyReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
    
        // 设置输入路径和输出路径
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
        // 提交作业并等待完成
        job.waitForCompletion(true);
    }

hadoop

在整个Hadoop架构中,计算框架起到承上启下的作用,一方面可以操作HDFS中的数据,另一方面可以被封装,提供Hive、Pig这样的上层组件的调用。

我们简单介绍一下其中几个比较重要的组件。

HBase:来源于Google的BigTable;是一个高可靠性、高性能、面向列、可伸缩的分布式数据库。

Hive:是一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,通过类SQL语句快速实现简单的MapReduce统计,不必开发专门的MapReduce应用,十分适合数据仓库的统计分析。

Pig:是一个基于Hadoop的大规模数据分析工具,它提供的SQL-LIKE语言叫Pig Latin,该语言的编译器会把类SQL的数据分析请求转换为一系列经过优化处理的MapReduce运算。

ZooKeeper:来源于Google的Chubby;它主要是用来解决分布式应用中经常遇到的一些数据管理问题,简化分布式应用协调及其管理的难度。

Ambari:Hadoop管理工具,可以快捷地监控、部署、管理集群。

Sqoop:用于在Hadoop与传统的数据库间进行数据的传递。

Mahout:一个可扩展的机器学习和数据挖掘库。

Hadoop的优点和应用

总的来看,Hadoop有以下优点:

高可靠性:这个是由它的基因决定的。它的基因来自Google。Google最擅长的事情,就是“垃圾利用”。Google起家的时候就是穷,买不起高端服务器,所以,特别喜欢在普通电脑上部署这种大型系统。虽然硬件不可靠,但是系统非常可靠。

高扩展性:Hadoop是在可用的计算机集群间分配数据并完成计算任务的,这些集群可以方便地进行扩展。说白了,想变大很容易。

高效性:Hadoop能够在节点之间动态地移动数据,并保证各个节点的动态平衡,因此处理速度非常快。

高容错性:Hadoop能够自动保存数据的多个副本,并且能够自动将失败的任务重新分配。这个其实也算是高可靠性。

低成本:Hadoop是开源的,依赖于社区服务,使用成本比较低。

基于这些优点,Hadoop适合应用于大数据存储和大数据分析的应用,适合于服务器几千台到几万台的集群运行,支持PB级的存储容量。

Hadoop的应用非常广泛,包括:搜索、日志处理、推荐系统、数据分析、视频图像分析、数据保存等,都可以使用它进行部署。

Hadoop是什么?

Hadoop是一个由Apache基金会所开发的分布式系统基础架构,是一个存储系统+计算框架的软件框架。主要解决海量数据存储与计算的问题,是大数据技术中的基石。Hadoop以一种可靠、高效、可伸缩的方式进行数据处理,用户可以在不了解分布式底层细节的情况下,开发分布式程序,用户可以轻松地在Hadoop上开发和运行处理海量数据的应用程序。

Hadoop能解决什么问题

  • 海量数据存储

    HDFS有高容错性的特点,并且设计用来部署在低廉的(low-cost)硬件上;而且它提供高吞吐量(High throughput)来访问数据,适合那些有着超大数据集(large data set)的应用程序,它由n台运行着DataNode的机器组成和1台(另外一个standby)运行NameNode进程一起构成。每个DataNode 管理一部分数据,然后NameNode负责管理整个HDFS 集群的信息(存储元数据)。

  • 资源管理,调度和分配

    Apache Hadoop YARN(Yet Another Resource Negotiator,另一种资源协调者)是一种新的 Hadoop 资源管理器,它是一个通用资源管理系统和调度平台,可为上层应用提供统一的资源管理和调度,它的引入为集群在利用率、资源统一管理和数据共享等方面带来了巨大好处。

Hadoop的由来

hadoop

Hadoop的核心架构

Hadoop的核心,说白了,就是HDFSMapReduce。HDFS为海量数据提供了存储,而MapReduce为海量数据提供了计算框架。

HDFS

hadoop

整个HDFS有三个重要角色:NameNode(名称节点)DataNode(数据节点)Client(客户机)

典型的主从架构,用TCP/IP通信

  • NameNode:Master节点(主节点),可以看作是分布式文件系统中的管理者,主要负责管理文件系统的命名空间、集群配置信息和存储块的复制等。NameNode会将文件系统的Meta-data存储在内存中,这些信息主要包括了文件信息、每一个文件对应的文件块的信息和每一个文件块在DataNode的信息等。

  • DataNode:Slave节点(从节点),是文件存储的基本单元,它将Block存储在本地文件系统中,保存了Block的Meta-data,同时周期性地将所有存在的Block信息发送给NameNode。

  • Client:切分文件;访问HDFS;与NameNode交互,获得文件位置信息;与DataNode交互,读取和写入数据。

还有一个Block(块)的概念:Block是HDFS中的基本读写单元;HDFS中的文件都是被切割为block(块)进行存储的;这些块被复制到多个DataNode中;块的大小(通常为64MB)和复制的块数量在创建文件时由Client决定。

MapReduce

MapReduce是一种分布式计算模型,它将大规模数据集(大于1TB)分成许多小数据块,然后在集群中的各个节点上进行并行处理,最后将结果汇总。MapReduce的计算过程可以分为两个阶段:Map阶段和Reduce阶段。

  • Map阶段:将输入数据切分成若干个小数据块,然后由多个Map任务并行处理,每个Map任务将处理结果输出为若干个键值对。

  • Reduce阶段:将Map阶段的输出结果按照键值对中的键进行分组,然后由多个Reduce任务并行处理,每个Reduce任务将处理结果输出为若干个键值对。

总结

Hadoop是一个分布式系统基础架构,主要解决海量数据存储与计算的问题。它的核心是HDFS和MapReduce,其中HDFS为海量数据提供了存储,而MapReduce为海量数据提供了计算框架。除此之外,Hadoop还有一个重要的组件——YARN,它是一个通用资源管理系统和调度平台,可为上层应用提供统一的资源管理和调度。