of data. Before it transmits each data
packet to be written by a DataNode
2. Write packet), it pushes a copy
of that packet onto a queue. The DFSOutputStream keeps that packet in
the queue until it receives an acknowledgment (
3. ACK packet) from each
DataNode that the write operation
When an exception is thrown (for
example, in the stack trace) the DFSOutputStream attempts to remedy the situation by reprocessing the
packets to complete the HDFS write.
The DFSOutputStream can make additional remediation attempts up to
one less than the replication factor. In
the case of TeraSort, however, since the
replication factor is set to one, the lack
of a single HDFS packet acknowledgment will cause the entire DFSOutputStream write operation to fail.
The DFSOutputStream endeavors
to process its data in an unfettered
way, assuming the DataNodes will be
able to keep up and respond with acknowledgments. If, however, the underlying IO subsystem on a DataNode
cannot keep up with this demand, an
outstanding packet can go unacknowledged for too long. Since there is only
a single replication in the case of TeraSort, no remediation is undertaken.
Instead, the DFSOutputStream immediately regards the outstanding
write packet to be AWOL and throws
an exception that propagates back up
to the Reduce task in Figure 13.
Since the Reduce task does not
know how to handle this IO exception, it completes with a TASK _
STATUS=”FAILED”. The MapReduce
framework will eventually retry the
entire Reduce task, possibly more than
once (see Table 3), and that will be reflected in a stretched T1 value that is ultimately responsible for the observed
This operational insight into Reduce failures can be used to construct
a list of simple tactics to avoid runtime
1. Resize the buffer cache.
2. Tune kernel parameters to increase IO throughput.
3.Reconfigure Hadoop default
If maintaining a BigMem-type cluster is dictated by nonengineering requirements (for example, budgetary
3). The potential difference in runtime
resulting from Reduce retries is obscured by the aforementioned variation in runtime measurements, which
is also on the order of 10%.
Table 3 shows 12 rows corresponding to 12 parallel TeraSort jobs, each
running on its own BigMem single-node cluster. A set of metrics indicating how each of the runs executed
is stored in the Hadoop job-history
log and extracted using Hadoop log
The 840 Map tasks are determined
by the TeraSort job partitioning 100 (
binary) GB of data into 128 (decimal) MB
HDFS blocks. No Map failures occurred.
The number of Reduce tasks was set to
three per cluster node. The number of
failed Reduce tasks varied randomly
between none and four. In comparison,
there were no Reduce failures for the
corresponding BigDisk case.
The average runtime for Hadoop
jobs was 13057078.67 ms, shown as
T1 in Table 2. Additional statistical
analysis reveals a strong correlation
between the number of Reduce task
retries and longer runtimes. Recalling
the definition of speedup, if the mean
single-node runtime, T1, is longer
than successive values of p Tp, then the
speedup will be superlinear.
Whence reduce fails? The number
of failed Reduces in Table 3 indicates
that a write failure in the Reduce task
causes it to retry the write operation—
possibly multiple times. In addition,
failed Reduce tasks tend to incur longer
runtimes as a consequence of those ad-
ditional retries. The only outstanding
question is, what causes the writes to
fail in the first place? We already know
that write operations are involved dur-
ing a failure, and that suggests examin-
ing the HDFS interface.
Returning to the earlier failed Re-
duce stack trace, closer scrutiny reveals
the following lines, with important key
words shown in bold in Figure 12.
The “All datanodes are bad” Java
IOException means the HDFS DataNode
pipeline in Figure 13 has reached a state
where the setupPipelineForAp-pendOrRecovery method, on the
DFSOutputStream Java class, cannot
recover the write operation, and the Reduce task fails to complete.
When the pipeline is unhindered,
a Reduce task makes a call into the
HDFSClient, which then initiates the
creation of a HDFS DataNode pipeline. The HDFSClient opens a
DFSOutputStream and readies it for writing
1. Write in Figure 13) by allocating
a HDFS data block on a DataNode.
The DFSOutputStream then breaks
the data stream into smaller packets
Figure 13. HDFS DataNode pipeline showing single replication (blue) and default triple
replication blue and gray).
Triple replication pipeline
2. Write packet
3. ACK packet