We rely on atomic commits of map and reduce task outputs to
achieve this property. Each in-progress task writes its output to private
temporary files. A reduce task produces one such file, and a map task
produces R such files (one per reduce task). When a map task completes, the worker sends a message to the master and includes the names
of the R temporary files in the message. If the master receives a completion message for an already completed map task, it ignores the message.
Otherwise, it records the names of R files in a master data structure.
When a reduce task completes, the reduce worker atomically renames
its temporary output file to the final output file. If the same reduce task
is executed on multiple machines, multiple rename calls will be executed
for the same final output file. We rely on the atomic rename operation provided by the underlying file system to guarantee that the final file system
state contains only the data produced by one execution of the reduce task.
The vast majority of our map and reduce operators are deterministic,
and the fact that our semantics are equivalent to a sequential execution
in this case makes it very easy for programmers to reason about their
program’s behavior. When the map and/or reduce operators are nondeterministic, we provide weaker but still reasonable semantics. In the
presence of nondeterministic operators, the output of a particular
reduce task R1 is equivalent to the output for R1 produced by a sequential execution of the nondeterministic program. However, the output for
a different reduce task R2 may correspond to the output for R2 produced
by a different sequential execution of the nondeterministic program.
Consider map task M and reduce tasks R1 and R2. Let e(Ri) be the
execution of R1 that committed (there is exactly one such execution).
The weaker semantics arise because e(R1) may have read the output
produced by one execution of M, and e(R2) may have read the output
produced by a different execution of M.
3. 4 Locality
Network bandwidth is a relatively scarce resource in our computing environment. We conserve network bandwidth by taking advantage of the
fact that the input data (managed by GFS [ 10]) is stored on the local
disks of the machines that make up our cluster. GFS divides each file
into 64MB blocks and stores several copies of each block (typically 3
copies) on different machines. The MapReduce master takes the location information of the input files into account and attempts to schedule
a map task on a machine that contains a replica of the corresponding
input data. Failing that, it attempts to schedule a map task near a replica
of that task’s input data (e.g., on a worker machine that is on the same
network switch as the machine containing the data). When running large
MapReduce operations on a significant fraction of the workers in a cluster, most input data is read locally and consumes no network bandwidth.
3. 5 Task Granularity
We subdivide the map phase into M pieces and the reduce phase into
R pieces as described previously. Ideally, M and R should be much
larger than the number of worker machines. Having each worker perform many different tasks improves dynamic load balancing and also
speeds up recovery when a worker fails: the many map tasks it has
completed can be spread out across all the other worker machines.
There are practical bounds on how large M and R can be in our implementation since the master must make O(M+R) scheduling decisions
and keep O(M R) state in memory as described. (The constant factors
for memory usage are small, however. The O(M R) piece of the state
consists of approximately one byte of data per map task/reduce task pair.)
Furthermore, R is often constrained by users because the output of
each reduce task ends up in a separate output file. In practice, we tend
to choose M so that each individual task is roughly 16MB to 64MB of
input data (so that the locality optimization described previously is most
effective), and we make R a small multiple of the number of worker
machines we expect to use. We often perform MapReduce computations with M=200,000 and R= 5,000, using 2,000 worker machines.
3. 6 Backup Tasks
One of the common causes that lengthens the total time taken for a
MapReduce operation is a straggler, that is, a machine that takes an
unusually long time to complete one of the last few map or reduce tasks
in the computation. Stragglers can arise for a whole host of reasons. For
example, a machine with a bad disk may experience frequent correctable errors that slow its read performance from 30MB/s to 1MB/s.
The cluster scheduling system may have scheduled other tasks on the
machine, causing it to execute the MapReduce code more slowly due
to competition for CPU, memory, local disk, or network bandwidth. A
recent problem we experienced was a bug in machine initialization
code that caused processor caches to be disabled: computations on
affected machines slowed down by over a factor of one hundred.
We have a general mechanism to alleviate the problem of stragglers.
When a MapReduce operation is close to completion, the master
schedules backup executions of the remaining in-progress tasks. The
task is marked as completed whenever either the primary or the
backup execution completes. We have tuned this mechanism so that it
typically increases the computational resources used by the operation
by no more than a few percent. We have found that this significantly
reduces the time to complete large MapReduce operations. As an
example, the sort program described in Section 5. 3 takes 44% longer
to complete when the backup task mechanism is disabled.
Although the basic functionality provided by simply writing map and
reduce functions is sufficient for most needs, we have found a few
extensions useful. These include:
• user-specified partitioning functions for determining the mapping
of intermediate key values to the R reduce shards;
• ordering guarantees: Our implementation guarantees that within
each of the R reduce partitions, the intermediate key/value pairs are
processed in increasing key order;
• user-specified combiner functions for doing partial combination of
generated intermediate values with the same key within the same
map task (to reduce the amount of intermediate data that must be
transferred across the network);
• custom input and output types, for reading new input formats and
producing new output formats;
• a mode for execution on a single machine for simplifying debugging
and small-scale testing.
The original article has more detailed discussions of each of these
items [ 8].