A few preliminary answers:
map
and reduce
. The user normally
only implements these placeholders, and the algorithm around these
is implemented by a framework such as Plasma MapReduce.n
, the duration of the whole Map/Reduce computation
increases by c * n * log(n)
(where c
is a constant). This does not
only hold theoretically but
also practically. Map/Reduce can process giant files in reasonable
time, given a sufficient number of computers is available.The effect of the scheme can be explained in a few paragraphs. This describes into what the scheme transforms the data, but not why this can be well implemented (which we leave open here - read the published articles for this). In the following we use Ocaml pseudo syntax to get a relatively exact description:
The input data must be given as a sequence of records:
let input =
[ r0; r1; r2; ...; r(n-1) ]
In a first step, the records are mapped. Each record is transformed
to a sequence of map outputs by applying the map
function. The
output type of map
must be pairs of keys and values:
let map_output =
List.flatten
(List.map
(fun r ->
let (key, value) = map r in
(key, value)
)
input)
The mapping step is useful because the data is often not stored in the best format, and needs to be preprocessed before the next step, sorting, can be started.
The sequence map_output
is the one that is sorted. So we get
let sort_output =
List.sort
(fun (key1, value1) (key2, value2) -> compare key1 key2)
map_output
Next, we partition the sequence. The partitions have numbers from
0 to p-1
. Which keys go into which partition is configurable by the
user, but the framework will do some automatic partitioning if the user
does not have any wishes.
let filter_partition q l =
List.filter (fun (key,value) -> partition_of_key key = q) l
let partitions =
List.map
(fun q -> filter_partition q group_output)
[0; 1; ...; p-1]
Partioning the data means to split the whole dataset up into distinct parts. Why is this useful? First, it gives an opportunity to classify the data (i.e. all the records with a certain property are moved to a certain partition). Second, the implementation profits from this, because partitioning gives additional chances for parallelizing the algorithm.
Finally, there is the reduction step. Each partition is passed through
reduce
, the other placeholder function. reduce
works like a
folding operator: It has an accumulator (i.e. local state), and gets
all the keys and values of a single partition as input:
let reduce_partition partition =
List.fold_left
reduce
empty_state
partition
let output =
List.map
reduce_partition
partitions
The output consists of as many sequences of keys and values as there are partititons.
All the sequences in the algorithm scheme are actually data files in the implementation. In Plasma MapReduce, all files are stored in PlasmaFS, even intermediate files. The user needs to create three directories:
Plasmamr_file_formats
.)
There is the restriction that lines must not become longer than a certain maximum length. This length is also known as bigblock size. Bigblocks are the units in which map/reduce processes data. Bigblocks can be larger than the blocks of the filesystem.
The framework uses three types of elementary steps, called tasks, to execute the algorithm scheme. Each task can be run on every machine of the cluster. However, some machines are better than others, because the data (or some part of it) is stored locally on the computer.
The task types are:
map
function for each record, and writes map output
files. The fragments in the input may not only come from one
input file, but from several. The framework tries to choose
the fragments so that the data blocks are all stored on the computer
where the map task is executed. The output files are written so
they do not exceed a certain size (sort limit). When the limit
is hit, the map task starts writing to a second output file.reduce
function is also called before the data is written
into an output file.reduce
over it.
In some sense Map/Reduce is nothing but an enhanced disk-based merge sort, where inputs can be preprocessed (mapped) and outputs can be folded (reduced).
The tasks are dynamically assigned to computers just before they are executed. It is important to know how resource limits affect this.
Most important here, there is the illusion of infinite disk space. This means, disk space is not limited by the capacity of the local machine, but only by the sum of all disks of the cluster. This is typically so large that the developer of a Map/Reduce application never has to think about it.
Note that Plasma MapReduce keeps this illusion even for intermediate files. This is different from other implementations (e.g. Hadoop) where intermediate files can only be stored on local disks, and nodes become unusable when the disk fills up.
The CPU is also a non-scarce resource, especially as CPU's have multiple cores nowadays. Plasma MapReduce can take advantage of multi-core CPU's because it is implemented via multi-processing. All tasks are run in their own process.
Now to the resources that may become problematic. First, the speed of sequential disk I/O is probably the worst bottleneck. Many Plasma MapReduce tasks access data on the local disk, and these tasks usually saturate the I/O capacity. It is advisable to stay away from very small blocksizes, because for small blocks there is also an increased risk that random disk seeks also slow down I/O. PlasmaFS includes a lot of optimizations that try to avoid to scatter the blocks of a file on the whole disk, but there are limits. For blocksizes of 1M and up this is probably no big problem anymore.
The network is the next bottleneck. As a rule of thumb, all input data have to travel a few times over the network. The faster the network the better.
The RAM of the machines may also become a real problem. Unlike for other resources, there is a hard limit. As Plasma MapReduce uses a lot of shared memory, it is likely that the limit for shared memory is hit first. When determining how many tasks should run at once on a machine considerations concerning RAM consumption play an important role. Plasma MapReduce already implements some logic to reduce RAM consumption when RAM is tight (effectively, the size of adjustable buffers is reduced) to make the system more flexible in this respect.
As I/O and data copying dominate Map/Reduce, the available memory bandwidth may also be important. However, this is not yet explored.
There are, of course, also "self-made" limits. There is not yet much known where these are for Plasma MapReduce, but one should have an eye on: