These sections describe how the basic elements of HDFS—client, NameNode, and DataNode—are decomposed and the basic concepts that underlie each element.
The client library is decomposed into two classes. The application code interacts with DistributedFileSystem.java which in turns uses DFSClient.java. DFSClient.java uses the ClientProtocol to communicate with the NameNode and connects directly to DataNodes to read/write block data
Communication between the Client and the NameNode uses RPC whereas communication between the Client and a DataNode uses streaming I/O through a socket.
The client asks NameNode to create a new file entry in the namespace. NameNode throws an exception if it is unable to create the file entry.
When a file is created, the client is granted a lease on the file. The lease prevents other threads from writing to the file but not from reading from it. A lease has an expiration time (the default value is one minute) and it is the responsibility of the client to renew the lease before it expires. The client sets an interrupt at half the lease life so that it can ask for a renewal if necessary.
The lease can be renewed explicitly through the ClientProtocol or implicitly through AddBlock.
Append differs from simply writing to the end of a file in two ways:
These two differences are related because if multiple simultaneous appends occur, the contents of one replica may differ in order from the contents of another replica. If the appends are successful, then all of the contents are contained in each replica, although potentially in a different order.
AddBlock requests the NameNode to allocate a new block and return the list of DataNodes the block data should be replicated to. The transferring of the data to the DataNodes is described in the Write section.
AddBlock also commits the previous block by reporting to the NameNode the actual generation (time) stamp and the length of the block that the client has transmitted to Data-Nodes. A write in not considered completed until it has been committed.
An application request to create a file does not reach the NameNode immediately. In fact, initially the HDFS client caches the file data into a temporary local file. Application writes are transparently redirected to this temporary local file. When the local file accumulates data worth at least one HDFS block size, the client contacts the NameNode to create a file. The NameNode then proceeds as described in the section on . The client flushes the block of data from the local temporary file to the specified DataNodes. When a file is closed, the remaining un-flushed data in the temporary local file is transferred to the DataNode. The client then tells the NameNode that the file is closed. At this point, the NameNode commits the file creation operation into a persistent store. If the NameNode dies before the file is closed, the file is lost.
The bulk of the work is done in DFSOutputStream.java. It uses the method addBlock() to allocate blocks for the new data. A set of DataNode locations are returned to the client. Each DataNode is intended to contain one replica of the block.
The client application writes data that is cached internally by this stream. Data is broken up into packets, each packet is typically 64K in size. A packet is comprised of chunks. Each chunk is typically 512 bytes and has an associated checksum with it.
When a client application fills up the currentPacket, it is enqueued into dataQueue. The DataStreamer thread picks up packets from the dataQueue, sends it to the first DataNode in the pipeline and moves it from the dataQueue to the ackQueue. The ResponseProcessor receives acks from the DataNodes. When a successful ack for a packet is received from all DataNodes, the ResponseProcessor removes the corresponding packet from the ackQueue.
In case of error, all outstanding packets are moved from ackQueue. A new pipeline is set up by eliminating the bad DataNode from the original pipeline. The DataStreamer now starts sending packets from the dataQueue.
When the client is done writing data to the given filename, the Complete function is called to close the file. The function complete() in the ClientProtocol returns a Boolean, indicating whether the file has been closed successfully or not. If the function returns false, the caller should try again. The function complete() also commits the last block of the file by reporting to the NameNode.java the actual generation stamp and the length of the block that the client has transmitted to data nodes.
A call to complete() will not return true until all the file's blocks have been replicated the minimum number of times. Thus, DataNode failures may cause a client to call complete() several times before succeeding.
Finally the lease on the file is terminated.
From the client’s perspective, a read occurs in two steps. First, the ClientProtocol is used to locate the block using LocatedBlock. The NameNode returns a list of locations of the replicates. The client then contacts the DataNodes which hold the replicates to retrieve the data. DataNodes maintain an open server socket so that clients can read or write efficiently. The actual reading of the data is managed by DFSInputStream.java.
If one of the DataNodes has failed—i.e. the read from the DataNode times out—the client is responsible for retrieving the desired block from the next DataNode in the replication sequence.
It is possible that
some of the blocks retrieved from the DataNode are corrupted. In such a
the client should inform the NameNode of the corrupted block(s) using reportBadBlocks in the Client Protocol. A
is determined by the use of checksums within the block (as described in
section on ).
NameNode operates in one of three different modes.
In addition, there are 3 threads that manage various aspects of NameNode’s responsibilities. These are described separately.
This section describes the portions of NameNode that are involved in normal operation. In particular, the file management, create, write, read, and close operations. The main classes used by NameNode are NameNode.java and FSNamesystem.java and the main responsibilities within NameNode are and .
NameNode File Management
This section describes how HDFS files are managed and mapped to data containing blocks on DataNodes. This information is taken partially from the paper [http://labs.google.com/papers/gfs.html].
Files in HDFS consist of blocks. The information about any particular file is contained in the HDFS namespace. Each block is replicated and stored on multiple DataNodes. The mapping from block replicas to DataNodes is stored in the INode data structure. INode.java defines this data structure and uses it to maintain an in-memory representation of the directory structure (file/block hierarchy). It is a base INode class containing common fields for file and directory INodes.
The NameNode controls two critical tables:
1) filename->blocksequence (namespace)
2) block replica->machinelist ("inodes")
The namespace is persisted. See INodes are not persisted and, in the event of a NameNode restart, are recovered from DataNodes reporting through a blockReport(). for a description of how this is accomplished. The
The client performs a read through use of the Client Protocol getBlockLocations with parameters: fully qualified name of file, offset within the file of the desired data to be read, and length of the data to be read.
FSNameSystem.java uses INodeDirectory.java to locate the block numbers based on the file name, the offset, and the length. BlockManager.java locates the DataNodes on which the blocks are replicated. Returned to the client is a list of DataNodes on which the desired blocks are located (and replicated).
The create() method in the ClientProtocol creates an empty file. It is handed a full path name and, after checking for duplicates, permissions, whether NameNode is in , and a few other things, the INode data structure is created. Subsequently, the full path name is used by the client, via the various ClientProtocol methods, to access this data structure. Once created, the file is available for reading. Deletion, rename, or re-create can only be done after the file has been completed (closed).
The client is granted a lease as long as there is not another lease holder for this file. A lease ensures that there is only one writer to a file at a time. If the client is not actively using the lease, it will time out after one minute (default value). Management of leases is synchronized to prevent race conditions over leases. See where this is described.
Writing to an open (created) file is done by the client interacting directly with DataNodes. Locations to be written are created by NameNode (see addBlock(). A completed write consists of:) in response to the Client Protocol
· Verify the client is the lease holder for this file.
· NameNode allocates replicate number of blocks on DataNodes and stores that information using BlockManager.
· The client writes a block to the first allocated DataNode and it, in turn, sends the block to the second allocated DataNode, and so forth until all replicas have been written
· When a block has been written on a DataNode, the DataNode reports to the NameNode using BlockReceived.java. See for a description of this process.
· When all of the replicas have been reported as written, the block is committed in NameNode and marked as written, with its replicates indicated.
· The lease is renewed.
· The EditLog is written with the block allocation.
This is not all done synchronously with the write. In particular, the writing of the replicates is done asynchronously.
The block written by the client is given a sequence number. The DataNode will report to the NameNode when it writes a block using blockReceived() in DataNodeProtocol.java. The DataNodeProtocol is managed in NameNode.java. The sequence number for the block is one check that all of the replicates have written the same block. If some of the replicates do not report or have the wrong sequence number, then the block is placed on the NameNode Block Replica list for subsequent processing. This processing involves the creation of additional replicates, deletion of dated replicas, and instructing a DataNode with a current copy of the block to copy it to the new replica(s).
When an HDFS file is closed, the NameNode performs the necessary bookkeeping. This includes releasing the lease and committing the writes to the EditLog.
In addition to the functions described above, it is also possible to rename, delete, concatenate, and do other file management activities. These are relatively straightforward and we will not describe them here.
describes how NameNode is recovered in the event of a failure of the
Conceptually, the restart of the NameNode is based on the existence of an EditLog and the fact that actual block allocations are maintained by the DataNodes. Every modification to the HDFS file is recorded to the EditLog that is persisted on the host file system. In the event of a restart, the EditLog is read in and the DataNodes report their block allocations. However, solely using an EditLog becomes inefficient when the EditLog grows so the BackupNode and the Checkpoint image are used to optimize the restart process.
The information in the EditLog is also contained in the HDFS namespace maintained by NameNode. Periodically, the namespace is written to persistent storage as a checkpoint and the EditLog is truncated. Future modifications to the namespace continue to be added to the EditLog. Thus, the combination of the EditLog and the checkpoint is a copy of the namespace. One subtlety is that modifications to the namespace are not committed until after they have been copied to the EditLog.
A backup NameNode is used during the checkpointing process and this backup NameNode and BackupNode.java manage the updating of the checkpoint and the truncation of the EditLog.
On startup, the NameNode enters a special state called SafeMode. Replication of data blocks does not occur when the NameNode is in the SafeMode state. This is because the DataNodes maintain the actual allocations. Until the DataNodes have reported their allocations, the NameNode has no knowledge of which replicas exist on which DataNodes.
The NameNode receives Heartbeat and blockReport() messages from all of the functioning DataNodes. A block report contains the list of data blocks that a DataNode is hosting. Each block has a specified minimum number of replicas. A block is considered safely replicated when the minimum number of replicas of that data block has checked in with the NameNode. After a configurable percentage of safely replicated data blocks checks in with the NameNode (plus an additional 30 seconds), the NameNode exits the SafeMode state. It then determines the list of data blocks (if any) that still have fewer than the specified number of replicas. The NameNode then replicates these blocks to other DataNodes.
The allocation of block replicas has is described in . In this section, we describe how blocks and replicas are managed once they have been allocated.
A block is in one of the following states:
A replica is in one of the following states:
· Under construction. The replica has not yet been written and verified on a DataNode. From NameNode’s perspective, a replica has been written and verified when it receives a blockReport() from the DataNode with the correct timestamp.
The synchronous portion of lease management is performed during the create, write, and complete operations and described in . During these operations, the lease is created, a lease time stamp is renewed, and the lease is terminated. There is also an asynchronous portion of lease management. The asynchronous portion examines the full list of leases to see if any have expired.
Leases are managed by FsNamesystem.java and LeaseManager.java. The separate thread that periodically examines the leases and determines which have expired is created during NameNode initiation in the FSNamesystem.java method activate(). Leases are kept in a TreeMap data structure that enables the mapping of path names to leases and clients to leases.
The methods in LeaseManager.java are synchronized because the TreeMap data structure is a critical section.
If a lease expires, either the client must explicitly renew the lease or the client has failed. In the latter case, LeaseManager.java must determine the last block successfully written by the client. The following Lease recovery algorithm is used when a lease expires. This describes what happens when a failed client is detected:
1. NameNode retrieves lease information
2. For each file f in the lease, consider the last block b of f
a) Rretrieve the identities of the DataNodes which contain replicas of b
b) Assign one of the DataNodes as the primary DataNode p
c) NameNode creates a new generation stamp for the block and passes it to p
d) p gets the block information from each DataNode that contains b
e) p computes the minimum block length
f) p updates the DataNodes containing replicas of the b
g) p informs the NameNode that the updates were successful
h) NameNode updates the BlockInfo for the block b
i) NameNode frees f from the lease and removes the lease
j) NameNode commit changes to EditLog
This section describes the activties of the ReplicationMonitor thread. The allocation of replicas is described in . Replicas are managed in FSNamesystem.java through the ReplicationMonitor. computeReplicationWork() is periodically called to determine what work is to be done with respect to replicas.
There are two cases for a replica in this thread.
Heartbeats are the mechanism by which the NameNode determines which DataNodes are currently active. There is a HeartbeatMonitor thread in NameNode that controls the management.
NameNode maintains information about the DataNodes in DataNodeDescriptor.java. DataNodeDescriptor tracks statistics on a given DataNode, such as available storage capacity, last update time, etc., and maintains a set of blocks stored on the DataNode. This data structure is internal to the NameNode. It is not sent over-the-wire to the Client or the DataNodes. Neither is it stored persistently in the fsImage (namespace image) file.
If a DataNode fails to send a heartbeat for a long time (e.g., 10 minutes), then the HeartbeatMonitor will decide the DataNode is dead, and consider its replicas to be no longer available. This typically causes a brief flurry of replication commands to other DataNodes, to bring the replication count of affected blocks back up to the required number of replicas. The DataNodeDescriptor for the expired DataNode is retained, however, to help recognize it if it recovers and reconnects to the NameNode.
This section describes how NameNode handles the majority of communications with DataNodes and Clients. The Namenode maintains a pool of threads which accept messages from the RPC queue, and handle them. The form of these RPC messages is defined by DatanodeProtocol and ClientProtocol interfaces.
For each message, a handler thread interprets the message, updates NameNode information if necessary, constructs the response message, and returns the response via the RPC mechanism. The thread then returns to the pool. If the processing throws an exception, an appropriate error message is returned via RPC.In sections above, we saw how some Client communications can cause the NameNode to allocate new blocks, or look up block/DataNode locations and return them to the Client. The next section will discuss how DataNode communications affect the NameNode.
A DataNode initiates communicates with the NameNode in four ways.
The typical response from the NameNode to the DataNode involves block replication or deletion commands. These commands are from a queue built for each DataNode by the Block and Replica Management activities discussed previously.
a substantial amount of time passed since the last DataNode block
report then NameNode
requests an immediate block report.
the DataNode is not recognized by the NameNode (e.g., due to either the
NameNode having restarted, or the network connection between NameNode
DataNode being down long enough for the DataNode to be timed out), then
NameNode requests the DataNode to re-register.
This section describes how blocks are managed by DataNodes. Some of the information contained here can be found at:
A DataNode serves up blocks of data over the network using a block protocol specific to HDFS. DataNodes are responsible for serving read and write requests from the file system’s clients. The DataNodes also perform block creation, deletion, and replication upon instruction from the NameNode.
Files in HDFS consist of blocks. The information about any particular file is contained in the namespace. Blocks are replicated and stored on DataNodes. NameNode periodically receives a and a blockReport() from each of the DataNodes in a cluster. Receipt of a Heartbeat implies that the DataNode is functioning properly. A block report contains a list of all blocks replicated on a DataNode. When a DataNode receives a new block replica--from a Client (file create/append), or from another Datanode (block replication)--it immediately sends a blockReceived() command to NameNode. In rare cases (e.g. recovering from major errors) this may prevent NameNode temporarily from asking for a full block report since the receipt of a blockReceived() message indicates that the DataNode is still alive.
The majority of DataNode’s functionality is implemented in DataNode.java. DataNode is a class (and program) that stores a set of block replicas for an HDFS deployment. A single deployment can have one or many DataNodes. Each DataNode communicates regularly with a single NameNode. It also communicates with client code (for reads and writes) and other DataNodes (for replication purposes) from time to time. The NameNode can create new DataNodes (using the DataNode constructor in DataNode.java) whenever it likes, by passing the DataNode a configuration (an authentication framework) and an array of dataDirs (files where blocks are stored). Once created, the NameNode can start the DataNode (using startDataNode() in DataNode.java) on a specified server, as a daemon.
DataNodes store a series of named block replicas. The DataNode allows client code to read these block replicas, or to write new block replica data. The DataNode may also, in response to instructions from its NameNode, delete block replicas or copy block replicas to/from other DataNodes. Clients can also communicate directly with DataNodes to create, read, write, and copy block replicas, as described below.
The DataNode maintains just one critical table:
Block replica-> stream of bytes (of BLOCK_SIZE or less)
This table is stored on a local disk. The DataNode reports the table's contents to the NameNode upon startup and every so often afterwards.
DataNodes spend their lives in an endless loop of waiting for something to do. They can receive instructions from the NameNode although a NameNode does not connect to a DataNode directly; a NameNode simply returns values from functions invoked by a DataNode. A DataNode can also get instructions for block replica transfer from a client.
The DataNode Daemon maintains an open server socket so that client code or other DataNodes can read/write data. The host/port for this server is reported to the NameNode, which then sends that information to clients or other DataNodes that might be interested.
The DataNode sends a heartbeat to the NameNode
every 3 seconds (or as configured), which gives the NameNode the
opportunity to respond with commands such as:
The DataNode initiates a BlockReport to the
NameNode every hour (or as configured), which prevents any divergence
in the NameNode and DataNode understanding about which replicas are
held by each DataNode.
Relationship to Files
The DataNode stores HDFS data in files in its local file system. The DataNode has no knowledge about HDFS files. It stores each block replica of HDFS data in a separate file in its local file system. When a DataNode starts up, it scans through its local file system, generates a list of all HDFS data block replicas that correspond to each of these local files and sends this report to the NameNode: this is the block report. A blockReport() is also issued periodically by the DataNode's HeartBeat thread.