Hadoop

What is Hadoop Yarn?

In the ancient Hadoop 1.0, the JobTracker of MapReduce was responsible for too many tasks, including resource scheduling and managing numerous TaskTrackers. This was naturally unreasonable. Therefore, during the upgrade process from 1.0 to 2.0, Hadoop separated the resource scheduling work of JobTracker and made it an independent resource management framework, which directly made Hadoop the most stable cornerstone in big data. This independent resource management framework is Yarn.

Before we introduce Yarn in detail, let’s briefly talk about Yarn. The full name of Yarn is “Yet Another Resource Negotiator”, which means “another resource scheduler”. This naming is similar to “Have a Nice Inn”. Here’s a little more information: there used to be a Java project compilation tool called Ant, which was named similarly, “Another Neat Tool” in abbreviation, which means “another organizing tool”.

Since it is called a resource scheduler, its function is naturally responsible for resource management and scheduling. Next, let’s take a closer look at Yarn.

Yarn Architecture

hadoop-Yarn

① Client: The client is responsible for submitting jobs to the cluster.

② ResourceManager: The main process of the cluster, the arbitration center, is responsible for cluster resource management and task scheduling.

③ Scheduler: Resource arbitration module.

④ ApplicationManager: Selects, starts, and supervises the ApplicationMaster.

⑤ NodeManager: The cluster’s secondary process, which manages and monitors Containers and executes specific tasks.

⑥ Container: A collection of local resources, such as a Container with 4 CPUs and 8GB of memory.

⑦ ApplicationMaster: The task execution and supervision center.

Three Main Components

Looking at the top of the figure, we can intuitively see two main components, ResourceManager and NodeManager, but there is actually an ApplicationMaster that is not displayed in the figure. Let’s take a look at these three components separately.

ResourceManager

Let’s start with the ResourceManager in the center of the figure. From the name, we can know that this component is responsible for resource management, and there is only one ResourceManager in the entire system to be responsible for resource scheduling.

It also includes two main components: the Scheduler and the ApplicationManager.

The Scheduler: Essentially, the Scheduler is a strategy or algorithm. When a client submits a task, it allocates resources based on the required resources and the current state of the cluster. Note that it only allocates resources to the application and does not monitor the status of the application.

ApplicationManager: Similarly, you can roughly guess what it does from its name. The ApplicationManager is responsible for managing the applications submitted by the client. Didn’t we say that the Scheduler does not monitor the program submitted by the user? In fact, the monitoring of the application is done by the ApplicationManager.

ApplicationMaster

Every time a client submits an Application, a new ApplicationMaster is created. This ApplicationMaster applies to the ResourceManager for container resources, sends the program to be run to the container after obtaining the resources, and then performs distributed computing.

This may be a bit difficult to understand. Why send the running program to the container? If you look at it from a traditional perspective, the program runs still, and data flows in and out constantly. But when the data volume is large, it cannot be done because the cost of moving massive data is too high and takes too long. However, there is an old Chinese saying that “if the mountain will not come to Muhammad, then Muhammad must go to the mountain.” This is the idea of big data distributed computing. Since big data is difficult to move, I will publish the application program that is easy to move to each node for calculation. This is the idea of big data distributed computing.

NodeManager

The NodeManager is a proxy for the ResourceManager on each machine, responsible for container management, monitoring their resource usage (CPU, memory, disk, and network, etc.), and providing these resource usage reports to the ResourceManager/Scheduler.

The main idea of Yarn is to split the two functions of resource management and task scheduling of MRv1 JobTracker into two independent processes:

hadoop-Yarn

  • Yarn is still a master/slave structure.

  • The main process ResourceManager is the resource arbitration center of the entire cluster.

  • The secondary process NodeManager manages local resources.

  • ResourceManager and the subordinate node process NodeManager form the Hadoop 2.0 distributed data computing framework.

The Process of Submitting an Application to Yarn

hadoop-Yarn

This figure shows the process of submitting a program, and we will discuss the process of each step in detail below.

  • The client submits an application to Yarn, assuming it is a MapReduce job.

  • The ResourceManager communicates with the NodeManager to allocate the first container for the application and runs the ApplicationMaster corresponding to the application in this container.

  • After the ApplicationMaster is started, it splits the job (i.e., the application) into tasks that can run in one or more containers. Then it applies to the ResourceManager for containers to run the program and sends heartbeats to the ResourceManager regularly.

  • After obtaining the container, the ApplicationMaster communicates with the NodeManager corresponding to the container and distributes the job to the container in the NodeManager. The MapReduce that has been split will be distributed here, and the container may run Map tasks or Reduce tasks.

  • The task running in the container sends heartbeats to the ApplicationMaster to report its status. When the program is finished, the ApplicationMaster logs out and releases the container resources to the ResourceManager.
    The above is the general process of running a job.

hadoop-Yarn

Typical Topology of Yarn Architecture

In addition to the two entities of ResourceManager and NodeManager, Yarn also includes two entities of WebAppProxyServer and JobHistoryServer.

hadoop-Yarn

JobHistoryServer: Manages completed Yarn tasks

  • The logs and various statistical information of historical tasks are managed by JobTracker.
  • Yarn abstracts the function of managing historical tasks into an independent entity, JobHistoryServer.

WebAppProxyServer: Web page proxy during task execution

  • By using a proxy, not only the pressure on ResourceManager is further reduced, but also the Web attacks on Yarn can be reduced.
  • Responsible for supervising the entire MapReduce task execution process, collecting the task execution information from the Container, and displaying it on a Web interface.

Yarn Scheduling Strategy

Capacity Scheduling Algorithm
CapacityScheduler is a multi-user and multi-task scheduling strategy that divides tasks into queues and allocates resources in Container units.

hadoop-Yarn

Fair Scheduling Strategy
FairScheduler is a pluggable scheduling strategy that allows multiple Yarn tasks to use cluster resources fairly.

hadoop-Yarn

HDFS Design Principles

Design Goals

Store very large files: “very large” here means several hundred M, G, or even TB.

  • Adopt a stream-based data access method: HDFS is based on the assumption that the most effective data processing mode is to generate or copy a data set once and then do a lot of analysis work on it. Analysis work often reads most of the data in the data set, even if not all of it. Therefore, the time required to read the entire data set is more important than the delay in reading the first record.

  • Run on commercial hardware: Hadoop does not require special expensive, reliable machines and can run on ordinary commercial machines (which can be purchased from multiple vendors). Commercial machines do not mean low-end machines. In a cluster (especially a large one), the node failure rate is relatively high. HDFS’s goal is to ensure that the cluster does not cause significant interruptions to users when nodes fail.

Application Types Not Suitable for HDFS

Some scenarios are not suitable for storing data in HDFS. Here are a few examples:

  1. Low-latency data access
    Applications that require latency in the millisecond range are not suitable for HDFS. HDFS is designed for high-throughput data transmission, so latency may be sacrificed. HBase is more suitable for low-latency data access.

  2. A large number of small files
    The metadata of files (such as directory structure, node list of file blocks, and block-node mapping) is stored in the memory of the NameNode. The number of files in the entire file system is limited by the memory size of the NameNode. As a rule of thumb, a file/directory/file block generally occupies 150 bytes of metadata memory space. If there are one million files, each file occupies one file block, which requires about 300M of memory. Therefore, the number of files in the billions is difficult to support on existing commercial machines.

  3. Multiple reads and writes, requiring arbitrary file modification
    HDFS writes data in an append-only manner. It does not support arbitrary offset modification of files. It does not support multiple writers.

HDFS Positioning

To improve scalability, HDFS uses a master/slave architecture to build a distributed storage cluster, which makes it easy to add or remove slaves to the cluster.

HDFS is an important component of the Hadoop ecosystem. It is a distributed file system designed to store large amounts of data and provide high-throughput data access. HDFS is designed to store data on inexpensive hardware and provide high fault tolerance. It achieves this goal by distributing data to multiple nodes in the cluster. HDFS is positioned as a batch processing system suitable for offline processing of large-scale data.

The main features of HDFS include:

  • High fault tolerance: HDFS distributes data to multiple nodes, so even if a node fails, data can still be accessed through other nodes.
  • High throughput: HDFS is designed to support batch processing of large-scale data, so it provides high-throughput data access.
  • Suitable for large files: HDFS is suitable for storing large files because it divides files into multiple blocks for storage and distributes these blocks to multiple nodes.
  • Stream data access: HDFS supports stream data access, which means it can efficiently process large amounts of data streams.

hadoop-HDFS

HDFS Architecture

HDFS uses a master/slave architecture to build a distributed storage service, which improves the scalability of HDFS and simplifies the architecture design. HDFS stores files in blocks, optimizing storage granularity. The NameNode manages the storage space of all slave machines, while the DataNode is responsible for actual data storage and read/write operations.

Blocks

There is a concept of blocks in physical disks. The physical block of a disk is the smallest unit of disk operation for reading and writing, usually 512 bytes. The file system abstracts another layer of concepts on top of the physical block of the disk, and the file system block is an integer multiple of the physical disk block. Generally, it is several KB. The blocks in Hadoop are much larger than those in general single-machine file systems, with a default size of 128M. The file in HDFS is split into block-sized chunks for storage, and these chunks are scattered across multiple nodes. If the size of a file is smaller than the block size, the file will not occupy the entire block, only the actual size. For example, if a file is 1M in size, it will only occupy 1M of space in HDFS, not 128M.

Why are HDFS blocks so large?
To minimize the seek time and control the ratio of time spent locating and transmitting files. Assuming that the time required to locate a block is 10ms and the disk transmission speed is 100M/s. If the proportion of time spent locating a block to the transmission time is controlled to 1%, the block size needs to be about 100M. However, if the block is set too large, in MapReduce tasks, if the number of Map or Reduce tasks is less than the number of cluster machines, the job efficiency will be very low.

Benefits of block abstraction

  • The splitting of blocks allows a single file size to be larger than the capacity of the entire disk, and the blocks that make up the file can be distributed across the entire cluster. In theory, a single file can occupy the disk of all machines in the cluster.
  • Block abstraction also simplifies the storage system, without worrying about its permissions, owner, and other content (these contents are controlled at the file level).
  • Blocks are the unit of replication in fault tolerance and high availability mechanisms.

Namenode & Datanode

The entire HDFS cluster consists of a master-slave model of Namenode and Datanode. The Namenode stores the file system tree and metadata of all files and directories. The metadata is persisted in two forms:

  • Namespace image
  • Edit log

However, the persistent data does not include the node list where the block is located and which nodes the file blocks are distributed to in the cluster. This information is reconstructed when the system is restarted (through the block information reported by the Datanode). In HDFS, the Namenode may become a single point of failure for the cluster. When the Namenode is unavailable, the entire file system is unavailable. HDFS provides two solutions to single point of failure:

  1. Backup persistent metadata
    Write the file system metadata to multiple file systems at the same time, such as writing metadata to both the local file system and NFS at the same time. These backup operations are synchronous and atomic.

  2. Secondary Namenode
    The Secondary node periodically merges the namespace image and edit log of the main Namenode to avoid the edit log being too large, and merges them by creating a checkpoint. It maintains a merged namespace image replica that can be used to recover data when the Namenode completely crashes. The following figure shows the management interface of the Secondary Namenode:

hadoop-HDFS

Internal Features of HDFS

Data Redundancy

  • HDFS stores each file as a series of data blocks, with a default block size of 64MB (configurable).

  • For fault tolerance, all data blocks of a file have replicas (the replication factor is configurable).

  • HDFS files are written once and strictly limited to only one write user at any time.

Replica Placement

  • HDFS clusters usually run on multiple racks, and communication between machines on different racks requires switches.

  • HDFS uses a rack-aware strategy to improve data reliability, availability, and network bandwidth utilization.

  • Rack failures are much less common than node failures, and this strategy can prevent data loss when an entire rack fails, improve data reliability and availability, and ensure performance.

Replica Selection

  • HDFS tries to use the replica closest to the program to meet user requests, reducing total bandwidth consumption and read latency.

  • The HDFS architecture supports data balancing strategies.

Heartbeat Detection

  • The NameNode periodically receives heartbeats and block reports from each DataNode in the cluster, indicating that the DataNode is working properly.

  • The NameNode marks DataNodes that have not sent heartbeats recently as down and does not send them any new I/O requests.

  • The NameNode continuously checks these data blocks that need to be replicated and re-replicates them when necessary.

Data Integrity Check

  • For various reasons, the data block obtained from the DataNode may be corrupted.

Classic HDFS Architecture

The NameNode is responsible for managing the metadata of the file system, while the DataNode is responsible for storing the actual data of the file blocks. This division of labor enables HDFS to efficiently store and manage large-scale data.

hadoop-HDFS

Specifically, when a client needs to read or write a file, it sends a request to the NameNode. The NameNode returns the metadata information of the file and the location information of the file blocks. The client communicates with the DataNode based on this information to read or write the actual data of the file blocks.

Therefore, the NameNode and DataNode play different roles in the HDFS architecture.

What is the difference in function?

HDFS is an abbreviation for Hadoop Distributed File System, an important component of the Hadoop ecosystem. The HDFS architecture includes one NameNode and multiple DataNodes. The NameNode is the master node of HDFS, responsible for managing the namespace of the file system, the metadata information of the file, and the location information of the file blocks. The DataNode is the slave node of HDFS, responsible for storing the actual data of the file blocks.

Specifically, when a client needs to read or write a file, it sends a request to the NameNode. The NameNode returns the metadata information of the file and the location information of the file blocks. The client communicates with the DataNode based on this information to read or write the actual data of the file blocks.

hadoop-HDFS

General Topology

There is only one NameNode node, and the SecondaryNameNode or BackupNode node is used to obtain NameNode metadata information in real time and back up metadata.

hadoop-HDFS

Commercial Topology

There are two NameNode nodes, and ZooKeeper is used to implement hot standby between NameNode nodes.

hadoop-HDFS

Command Line Interface

HDFS provides various interaction methods, such as Java API, HTTP, and shell command line. Command line interaction is mainly operated through hadoop fs. For example:

hadoop fs -copyFromLocal // Copy files from local to HDFS
hadoop fs mkdir // Create a directory
hadoop fs -ls // List file list

In Hadoop, the permissions of files and directories are similar to the POSIX model, including three permissions: read, write, and execute.

Read permission (r): Used to read files or list the contents of a directory
Write permission (w): For files, it is the write permission of the file. The write permission of the directory refers to the permission to create or delete files (directories) under the directory.
Execute permission (x): Files do not have so-called execute permissions and are ignored. For directories, execute permission is used to access the contents of the directory.

Each file or directory has three attributes: owner, group, and mode:

Owner: Refers to the owner of the file
Group: For permission groups
Mode: Consists of the owner’s permissions, the permissions of the members of the file’s group, and the permissions of non-owners and non-group members.

hadoop-HDFS

Data Flow (Read and Write Process)

Read File

The rough process of reading a file is as follows:

hadoop-HDFS

  1. The client passes a file Path to the FileSystem’s open method.

  2. DFS uses RPC to remotely obtain the datanode addresses of the first few blocks of the file. The NameNode determines which nodes to return based on the network topology structure (provided that the node has a block replica). If the client itself is a DataNode and there is a block replica on the node, it is read directly from the local node.

  3. The client uses the FSDataInputStream object returned by the open method to read data (call the read method).

  4. The DFSInputStream (FSDataInputStream implements this class) connects to the node that holds the first block and repeatedly calls the read method to read data.

  5. After the first block is read, find the best datanode for the next block and read the data. If necessary, DFSInputStream will contact the NameNode to obtain the node information of the next batch of Blocks (stored in memory, not persistent), and these addressing processes are invisible to the client.

  6. After the data is read, the client calls the close method to close the stream object.

During the data reading process, if communication with the DataNode fails, the DFSInputStream object will try to read data from the next best node and remember the failed node, and subsequent block reads will not connect to the node.

After reading a Block, DFSInputStram performs checksum verification. If the Block is damaged, it tries to read data from other nodes and reports the damaged block to the NameNode.

Which DataNode does the client connect to get the data block is guided by the NameNode, which can support a large number of concurrent client requests, and the NameNode evenly distributes traffic to the entire cluster as much as possible.

The location information of the Block is stored in the memory of the NameNode, so the corresponding location request is very efficient and will not become a bottleneck.

Write File

hadoop-HDFS

Step breakdown

  1. The client calls the create method of DistributedFileSystem.

  2. DistributedFileSystem remotely RPC calls the Namenode to create a new file in the namespace of the file system, which is not associated with any blocks at this time. During this process, the Namenode performs many verification tasks, such as whether there is a file with the same name, whether there are permissions, if the verification passes, it returns an FSDataOutputStream object. If the verification fails, an exception is thrown to the client.

  3. When the client writes data, DFSOutputStream is decomposed into packets (data packets) and written to a data queue, which is consumed by DataStreamer.

  4. DataStreamer is responsible for requesting the Namenode to allocate new blocks to store data nodes. These nodes store replicas of the same Block and form a pipeline. DataStreamer writes the packet to the first node of the pipeline. After the first node stores the packet, it forwards it to the next node, and the next node continues to pass it down.

  5. DFSOutputStream also maintains an ack queue, waiting for confirmation messages from datanodes. After all datanodes on the pipeline confirm, the packet is removed from the ack queue.

  6. After the data is written, the client closes the output stream. Flush all packets to the pipeline, and then wait for confirmation messages from datanodes. After all are confirmed, inform the Namenode that the file is complete. At this time, the Namenode already knows all the Block information of the file (because DataStreamer is requesting the Namenode to allocate blocks), and only needs to wait for the minimum replica number requirement to be reached, and then return a successful message to the client.

How does the Namenode determine which DataNode the replica is on?

The storage strategy of HDFS replicas is a trade-off between reliability, write bandwidth, and read bandwidth. The default strategy is as follows:

The first replica is placed on the machine where the client is located. If the machine is outside the cluster, a random one is selected (but it will try to choose a capacity that is not too slow or too busy).

The second replica is randomly placed on a rack different from the first replica.

The third replica is placed on the same rack as the second replica, but on a different node, and a random selection is made from the nodes that meet the conditions.

More replicas are randomly selected throughout the cluster, although too many replicas are avoided on the same rack as much as possible.

After the location of the replica is determined, when establishing the write pipeline, the network topology structure is considered. The following is a possible storage strategy:

hadoop-HDFS

This selection balances reliability, read and write performance well.

  • Reliability: Blocks are distributed on two racks.

  • Write bandwidth: The write pipeline process only needs to cross one switch.

  • Read bandwidth: You can choose one of the two racks to read from.

Internal Features of HDFS

Data Redundancy

  • HDFS stores each file as a series of data blocks, with a default block size of 64MB (configurable).

  • For fault tolerance, all data blocks of a file have replicas (the replication factor is configurable).

  • HDFS files are written once and strictly limited to only one writing user at any time.

Replica Placement

  • HDFS clusters usually run on multiple racks, and communication between machines on different racks requires switches.

  • HDFS uses a rack-aware strategy to improve data reliability, availability, and network bandwidth utilization.

  • Rack failures are much less common than node failures, and this strategy can prevent data loss when an entire rack fails, improving data reliability and availability while ensuring performance.

Replica Selection

  • HDFS tries to use the replica closest to the program to satisfy user requests, reducing total bandwidth consumption and read latency.

  • HDFS architecture supports data balancing strategies.

Heartbeat Detection

  • The NameNode periodically receives heartbeats and block reports from each DataNode in the cluster. Receiving a heartbeat indicates that the DataNode is working properly.

  • The NameNode marks DataNodes that have not sent a heartbeat recently as dead and does not send them any new I/O requests.

  • The NameNode continuously checks for data blocks that need to be replicated and replicates them when necessary.

Data Integrity Check

  • Various reasons may cause the data block obtained from the DataNode to be corrupted.

  • HDFS client software implements checksum verification of HDFS file content.

  • If the checksum of the data block obtained by the DataNode is different from that in the hidden file corresponding to the data block, the client judges that the data block is corrupted and obtains a replica of the data block from another DataNode.

Simple Consistency Model, Stream Data Access

  • HDFS applications generally access files in a write-once, read-many mode.

  • Once a file is created, written, and closed, it does not need to be changed again.

  • This simplifies data consistency issues and makes high-throughput data access possible. Applications running on HDFS mainly focus on stream reading and batch processing, emphasizing high-throughput data access.

Client Cache

  • The request for the client to create a file does not immediately reach the NameNode. The HDFS client first caches the data to a local temporary file, and the write operation of the program is transparently redirected to this temporary file.

  • When the accumulated data in this temporary file exceeds the size of a block (64MB), the client contacts the NameNode.

  • If the NameNode crashes before the file is closed, the file will be lost.

  • If client caching is not used, network speed and congestion will have a significant impact on output.

Definition of MapReduce

MapReduce is a programming framework for distributed computing programs. It is the core framework for developing “Hadoop-based data analysis applications”. Its core function is to integrate the user’s written business logic code and default components into a complete distributed computing program, which runs concurrently on a Hadoop cluster.

Reason for the Emergence of MapReduce

Why do we need MapReduce?

  • Massive data cannot be processed on a single machine due to hardware resource limitations.
  • Once the single-machine version of the program is extended to run on a cluster, it will greatly increase the complexity and development difficulty of the program.
  • With the introduction of the MapReduce framework, developers can focus most of their work on the development of business logic, while leaving the complexity of distributed computing to the framework to handle.

Consider a word count requirement in a scenario with massive data:

  • Single-machine version: limited memory, limited disk, limited computing power
  • Distributed: file distributed storage (HDFS), computing logic needs to be divided into at least two stages (one stage is independently concurrent, one stage is converged), how to distribute computing programs, how to allocate computing tasks (slicing), how to start the two-stage program? How to coordinate? Monitoring during the entire program running process? Fault tolerance? Retry?

It can be seen that when the program is extended from a single-machine version to a distributed version, a large amount of complex work will be introduced.

Relationship between MapReduce and Yarn

Yarn is a resource scheduling platform that is responsible for providing server computing resources for computing programs, which is equivalent to a distributed operating system platform. MapReduce and other computing programs are like application programs running on top of the operating system.

Important concepts of YARN:

  1. Yarn does not know the running mechanism of the program submitted by the user;
  2. Yarn only provides scheduling of computing resources (when the user program applies for resources from Yarn, Yarn is responsible for allocating resources);
  3. The supervisor role in Yarn is called ResourceManager;
  4. The role that specifically provides computing resources in Yarn is called NodeManager;
  5. In this way, Yarn is completely decoupled from the running user program, which means that various types of distributed computing programs (MapReduce is just one of them), such as MapReduce, storm programs, spark programs, tez, etc., can run on Yarn;
  6. Therefore, computing frameworks such as Spark and Storm can be integrated to run on Yarn, as long as they have resource request mechanisms that comply with Yarn specifications in their respective frameworks;
  7. Yarn becomes a universal resource scheduling platform. From then on, various computing clusters that previously existed in enterprises can be integrated on a physical cluster to improve resource utilization and facilitate data sharing.

MapReduce Working Principle

Strictly speaking, MapReduce is not an algorithm, but a computing idea. It consists of two stages: map and reduce.

MapReduce Process

To improve development efficiency, common functions in distributed programs can be encapsulated into frameworks, allowing developers to focus on business logic.

MapReduce is such a general framework for distributed programs, and its overall structure is as follows (there are three types of instance processes during distributed operation):

  • MRAppMaster: responsible for the process scheduling and status coordination of the entire program
  • MapTask: responsible for the entire data processing process of the map phase
  • ReduceTask: responsible for the entire data processing process of the reduce phase

MapReduce Mechanism

hadoop

The process is described as follows:

  1. When an MR program starts, the MRAppMaster is started first. After the MRAppMaster starts, according to the description information of this job, it calculates the number of MapTask instances required and applies to the cluster to start the corresponding number of MapTask processes.

  2. After the MapTask process is started, data processing is performed according to the given data slice range. The main process is:

  • Use the inputformat specified by the customer to obtain the RecordReader to read the data and form input KV pairs;
  • Pass the input KV pairs to the customer-defined map() method for logical operation, and collect the KV pairs output by the map() method to the cache;
  • Sort the KV pairs in the cache according to K partition and continuously overflow to the disk file.
  1. After the MRAppMaster monitors that all MapTask process tasks are completed, it will start the corresponding number of ReduceTask processes according to the customer-specified parameters, and inform the ReduceTask process of the data range (data partition) to be processed.

  2. After the ReduceTask process is started, according to the location of the data to be processed notified by the MRAppMaster, it obtains several MapTask output result files from several machines where the MapTask is running, and performs re-merging and sorting locally. Then, groups the KV with the same key into one group, calls the customer-defined reduce() method for logical operation, collects the result KV output by the operation, and then calls the customer-specified outputformat to output the result data to external storage.

Let’s take an example.

hadoop
The above figure shows a word frequency counting task.

  1. Hadoop divides the input data into several slices and assigns each split to a map task for processing.

  2. After mapping, each word and its frequency in this task are obtained.

  3. Shuffle puts the same words together, sorts them, and divides them into several slices.

  4. According to these slices, reduce is performed.

  5. The result of the reduce task is counted and output to a file.

In MapReduce, two roles are required to complete these processes: JobTracker and TaskTracker.

hadoop

JobTracker is used to schedule and manage other TaskTrackers. JobTracker can run on any computer in the cluster. TaskTracker is responsible for executing tasks and must run on DataNode.

Here is a simple MapReduce implementation example:

It is used to count the number of occurrences of each word in the input file.

  1. Import necessary packages:

    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  2. Define the Mapper class:

    public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
      protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        // Split each line of text into words and send them to the Reducer
        String[] words = line.split("\\s+");
        for (String word : words) {
          context.write(new Text(word), new IntWritable(1));
        }
      }
    }

    The Mapper class is responsible for splitting the input text data into words and outputting a key-value pair (word, 1) for each word.

  3. Define the Reducer class:

    public static class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
      protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        // Accumulate the number of occurrences of the same word
        for (IntWritable value : values) {
          sum += value.get();
        }
        // Output the word and its total number of occurrences
        context.write(key, new IntWritable(sum));
      }
    }

    The Reducer class receives key-value pairs from the Mapper, accumulates the values of the same key, and then outputs the word and its total number of occurrences.

  4. Main function (main method):

    public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
      Configuration conf = new Configuration();
      Job job = Job.getInstance(conf, "word count");
      job.setJarByClass(word.class);
    
      job.setMapperClass(MyMapper.class);
      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(IntWritable.class);
    
      job.setReducerClass(MyReduce.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(IntWritable.class);
    
      // Set the input and output paths
      FileInputFormat.addInputPath(job, new Path(args[0]));
      FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
      // Submit the job and wait for it to complete
      job.waitForCompletion(true);
    }

hadoop

In the entire Hadoop architecture, the computing framework plays a crucial role, on the one hand, it can operate on the data in HDFS, on the other hand, it can be encapsulated to provide calls from upper-level components such as Hive and Pig.

Let’s briefly introduce some of the more important components.

HBase: originated from Google’s BigTable; it is a highly reliable, high-performance, column-oriented, and scalable distributed database.

Hive: is a data warehouse tool that can map structured data files to a database table, and quickly implement simple MapReduce statistics through SQL-like statements, without the need to develop dedicated MapReduce applications, which is very suitable for statistical analysis of data warehouses.

Pig: is a large-scale data analysis tool based on Hadoop. It provides a SQL-LIKE language called Pig Latin. The compiler of this language converts SQL-like data analysis requests into a series of optimized MapReduce operations.

ZooKeeper: originated from Google’s Chubby; it is mainly used to solve some data management problems frequently encountered in distributed applications, simplifying the difficulty of coordinating and managing distributed application.

Ambari: Hadoop management tool, which can monitor, deploy, and manage clusters quickly.

Sqoop: used to transfer data between Hadoop and traditional databases.

Mahout: an extensible machine learning and data mining library.

Advantages and Applications of Hadoop

Overall, Hadoop has the following advantages:

High reliability: This is determined by its genes. Its genes come from Google. The best thing Google is good at is “garbage utilization.” When Google started, it was poor and couldn’t afford high-end servers, so it especially likes to deploy this kind of large system on ordinary computers. Although the hardware is unreliable, the system is very reliable.

High scalability: Hadoop distributes data and completes computing tasks among available computer clusters, and these clusters can be easily expanded. In other words, it is easy to become larger.

High efficiency: Hadoop can dynamically move data between nodes and ensure dynamic balance of each node, so the processing speed is very fast.

High fault tolerance: Hadoop can automatically save multiple copies of data and automatically redistribute failed tasks. This is also considered high reliability.

Low cost: Hadoop is open source and relies on community services, so the cost of use is relatively low.

Based on these advantages, Hadoop is suitable for applications in large data storage and large data analysis, suitable for running on clusters of several thousand to tens of thousands of servers, and supports PB-level storage capacity.

Hadoop’s applications are very extensive, including: search, log processing, recommendation systems, data analysis, video and image analysis, data storage, etc., can be deployed using it.

What is Hadoop?

Hadoop is a distributed system infrastructure developed by the Apache Foundation. It is a software framework that combines a storage system and a computing framework. It mainly solves the problem of storing and computing massive data and is the cornerstone of big data technology. Hadoop processes data in a reliable, efficient, and scalable way. Users can develop distributed programs on Hadoop without understanding the underlying details of the distributed system. Users can easily develop and run applications that process massive data on Hadoop.

What problems can Hadoop solve?

  • Massive data storage

    HDFS has high fault tolerance and is designed to be deployed on low-cost hardware. It provides high throughput for accessing data and is suitable for applications with large data sets. It consists of n machines running DataNode and one machine running NameNode (another standby). Each DataNode manages a portion of the data, and NameNode is responsible for managing the information (metadata) of the entire HDFS cluster.

  • Resource management, scheduling, and allocation

    Apache Hadoop YARN (Yet Another Resource Negotiator) is a new Hadoop resource manager. It is a general resource management system and scheduling platform that provides unified resource management and scheduling for upper-layer applications. Its introduction has brought huge benefits to the cluster in terms of utilization, unified resource management, and data sharing.

The origin of Hadoop

hadoop

The core architecture of Hadoop

The core of Hadoop is HDFS and MapReduce. HDFS provides storage for massive data, and MapReduce provides a computing framework for massive data.

HDFS

hadoop

The entire HDFS has three important roles: NameNode, DataNode, and Client.

Typical master-slave architecture, using TCP/IP communication.

  • NameNode: The master node of the distributed file system, responsible for managing the namespace of the file system, cluster configuration information, and storage block replication. The NameNode stores the metadata of the file system in memory, including file information, block information for each file, and information about each block in the DataNode.

  • DataNode: The slave node of the distributed file system, which is the basic unit of file storage. It stores blocks in the local file system and saves the metadata of the blocks. It also periodically sends information about all existing blocks to the NameNode.

  • Client: Splits files, accesses HDFS, interacts with the NameNode to obtain file location information, and interacts with the DataNode to read and write data.

There is also the concept of a block: a block is the basic read and write unit in HDFS. Files in HDFS are stored as blocks, which are replicated to multiple DataNodes. The size of a block (usually 64MB) and the number of replicated blocks are determined by the client when the file is created.

MapReduce

MapReduce is a distributed computing model that divides large data sets (greater than 1TB) into many small data blocks, and then performs parallel processing on various nodes in the cluster, and finally aggregates the results. The MapReduce calculation process can be divided into two stages: the Map stage and the Reduce stage.

  • Map stage: The input data is divided into several small data blocks, and then multiple Map tasks process them in parallel. Each Map task outputs the processing result as several key-value pairs.

  • Reduce stage: The output results of the Map stage are grouped according to the keys in the key-value pairs, and then multiple Reduce tasks process them in parallel. Each Reduce task outputs the processing result as several key-value pairs.

Summary

Hadoop is a distributed system infrastructure that mainly solves the problem of storing and computing massive data. Its core is HDFS and MapReduce, where HDFS provides storage for massive data, and MapReduce provides a computing framework for massive data. In addition, Hadoop also has an important component-YARN, which is a general resource management system and scheduling platform that provides unified resource management and scheduling for upper-layer applications.