In this context files are sequences of records. Plasma knows three different representations:
Note that Plasma simply considers a record as a string (on this level of the abstraction), without assuming any particular format of this string. The user has to configure how to extract the key from the string. The key is then used for sorting, and for determining the partition.
All formats take the size of the bigblocks into account. The bigblocks are the smallest units into which the files are split for distributing the work among the available task nodes. Bigblocks need to be multiples of filesystem blocks. For example, a user may specify bigblocks of 64 MB for a filesystem that uses 1 MB blocks. The connection with file formats is that the formats need to support this splitting of the files into smaller pieces, where each piece consists of a whole number of bigblocks.
There is a command-line utility for converting between formats, see
The file formats use conventionally different file name suffixes
for easier recognition: ".var" means the variable-size format,
".fixed<n>" the fixed-size format with n bytes, and everything else
is considered as text. There is also
which determines the format of input files by the file name suffix.
For toolkit users: The following discussions and examples are older
than the toolkit, and they assume the classic job definition
Mapred_def.mapred_job). Of course, the various file formats are
also available in the toolkit. There is a
parameter for places, and e.g. passing
`Var_size_format here would
select this format. The following is nevertheless interesting to know
The text format is very simple: A record is a sequence of bytes
followed by an LF byte. The length of the sequence is basically
unlimited, but, however, for practical reasons the maximum is
bigblock_size-1. The sequence may contain every byte except 10
(LF). Note that we do not assume any character encoding, and that even
the null byte is allowed.
The big advantage of the text format is that it is compatible with lots of existing utilities like text editors, viewers, and command-line tools. The downside is that one needs to treat data specially that may include LF bytes.
The text format is implemented by
Example: We want to analyze the frequency of names in various countries. Every record has three fields: Name, country code, and frequency. The name is only composed of Unicode letters (lowercase if possible). The country code is the ISO code like "US", or "DE". The frequency is an integer. Because the LF byte cannot occur in the payload data, we need not to escape LF. Also, we can use TAB bytes to separate the three fields. A line could look like (the bytes are separated by spaces for clarity):
g e r d TAB D E TAB 6 5 2 4 3
When summing up the names by countries, the sort key would be the name plus the country code, so we would extract the first and the second field (by searching the second TAB byte). Note that it is a requirement in Plasma that the key is a contiguous part of the record (i.e. it would not be allowed to consider the first and third field as a key - the workaround is to introduce extra key fields).
In the code, the format is described as part of the
object. Here, we could define the relevant methods as:
let job = object method input_record_io me jc = Mapred_io.line_structured_format() method output_record_io me jc = Mapred_io.line_structured_format() method internal_record_io me jc = Mapred_io.line_structured_format() method extract_key me jc record = Mapred_fields.text_field_range ~sep:"\t" ~field:1 ~num:2 record method sorter = Mapred_sorters.generic_sorter ~hash:Mapred_sorters.String_asc.hash ~cmp:Mapred_sorters.String_asc.cmp ... end
Note that you need to implement some more methods for a full job
definition (e.g. the
You may wonder why we have to refer to
three times. The answer is that we can specify the file format for three
different occasions separately:
input_record_iothe format is described that is expected as input of the map/red job.
ouput_record_iothe format is described that is written as output of the map/red job.
internal_record_iothe format is described that is used by the internal steps. This includes the format written after
map, and the format used within
reduce(except of the final reduce output).
Mapred_fields.text_field_rangefunction returns the position of the key in the record as a pair
pis the position where the key starts, and
lis the length in bytes.
The sorter is here specified so that it sorts the key in ascending order.
We use here
Mapred_sorters.generic_sorter and configure this
implementation with two argument functions
cmp. There are
lots of definitions for
Mapred_sorters, and we
Mapred_sorters.String_asc - which sorts strings in an
cmp works in the same way as in the Ocaml standard
library (e.g. in
Array.sort), but the two strings to compare are
given as triple
len. This means the key to
compare is to be taken from the larger string
buffer in the byte
pos+len-1. The function
hash returns for every
key (given in the same style) a 30 bit integer. Basically,
generic_sorter sorts first by this integer, and only for keys with
the same hash the
cmp function is used. The definitions in
Mapred_sorters are quite clever, though, and the hash values are
mostly used as accelerators:
hash is then defined such that the
same sorting order is achieved one would get if only
A simple escaping mechanism is provided by the Ocaml runtime. It is
basically the same Ocaml uses for its own strings - LF can be written
as \n, and TAB as \t. The mechanism is a bit hidden, and only available
In order to escape a string, use the
%S format specifier in
let escaped = Printf.sprintf "%S" raw
Note that this also puts double quotes around the string.
The reverse is done with
let raw = Scanf.sscanf escaped "%S" (fun s -> s)
This mechanism has the drawback that also all non-ASCII characters are escaped with backslashes, which may increase the size of the records significantly if they really contain binary data.
sscanfis slow. There are the faster alternatives
Mapred_fields.unescape. These functions also do not enclose the escaped string in double quotes.
netstringpackage). Unfortunately, BASE-64 does not keep the sorting order (but see below).
Mapred_fields.decode_alt64use a variant of BASE-64 called ALT-64 that keeps the sorting order, i.e. the encoded strings are sorted in the same order as the decoded strings. Like the standard BASE-64, ALT-64 increases the data volume by 1/3.
cmpthat are passed to
let unescape escaped = Scanf.sscanf escaped "%S" (fun s -> s) let hash s pos len = let s' = unescape(String.sub s pos len) in Mapred_sorters.String_asc.hash s' 0 (String.length s') let cmp s1 pos1 len1 s2 pos2 len2 = let s1' = unescape(String.sub s1 pos1 len1) in let s2' = unescape(String.sub s2 pos2 len2) in Mapred_sorters.String_asc.cmp s1' 0 (String.length s1') s2' 0 (String.length s2')
Now define the sorter as
Mapred_sorters.generic_sorter ~hash ~cmp.
The fixed-size format uses a fixed number of bytes per record. Every byte can occur, and thus escaping mechanisms are not needed. This format is normally used with binary numeric data because the fields usually also have a fixed "width".
Example: We have many data points that are simply enumerated with an integer (point #0, #1, #2, etc.). For each point there is an associated float. We need to sort the data by value.
The representation is now that we use 16 bytes per record, and the first
eight bytes represent the ordinal number of the data point, and the second
eight bytes are the float (double-precision IEEE float). We use big-endian
byte order (which is recommended here because this is "compatible" with
the lexicographic sorting of strings - if
n1 < n2 we also have that the
big-endian representations BE fulfil
BE(n1) < BE(n2) if we just sort
Netnumber module of Ocamlnet (in the
netstring package) can be
used to convert the numbers to the external representation and back:
let encode (ord,value) = let s1 = Netnumber.BE.int8_as_string (Netnumber.int8_of_int ord) in let s2 = Netnumber.BE.fp8_as_string (Netnumber.fp8_of_float value) in s1 ^ s2 let decode s = assert(String.length s = 16); let ord = Netnumber.int_of_int8 (Netnumber.read_int8 s 0) in let value = Netnumber.float_of_fp8 (Netnumber.read_fp8 s 8) in (ord,value)
The job definition is now:
let job = object method input_record_io me jc = Mapred_io.fixed_size_format 16 method output_record_io me jc = Mapred_io.fixed_size_format 16 method internal_record_io me jc = Mapred_io.fixed_size_format 16 method extract_key me jc record = (8, 8) method sorter = Mapred_sorters.generic_sorter ~hash:Mapred_sorters.Float_xdr_asc.hash ~cmp:Mapred_sorters.Float_xdr_asc.cmp ... end
The extraction of the key is now trivial: The float value can always be found at the eighth byte and is eight bytes long.
For sorting we use here the special comparator module
Mapred_sorters.Float_xdr_asc which sorts a float given in binary
Basically, the variable-size format precedes every record with a length field. However, the way a map/reduce job processes data requires another principle for structuring the file. Because a task must be able to decode the records even when it is only processing a fragment of a file, we need to include a bit of helper information that guides the task to find the beginning of a record when starting in the middle of the file.
This is done by organizing the file as a sequence of chunks:
+----------------------------------------------------+ | Chunk 0 header | +----------------------------------------------------+ | Chunk 0 payload | | | ... ... | | | | +----------------------------------------------------+ | Chunk 1 header | +----------------------------------------------------+ | Chunk 1 payload | | | ... ... | | | | +----------------------------------------------------+ ...
Normally, the chunks have a fixed size of 64K (header plus payload), where the header occupies the first 32 bytes. The header contains the information:
The real payload is now the sequence of the payload regions in the chunks. It is possible that a record starts in one chunk and is continued in the next chunk (and even continued beyond).
As mentioned, the records have now a length header. If the length is up to 254 bytes, this header consists just of a single byte. Otherwise, the header is made up from 9 bytes.
The details of the variable-size format are documented for the function
It is recommended to just use
let write_something() = let fmt = Mapred_io.var_size_format() in let wr = fmt # write_file cluster rc "/filename" in wr # output_record "something"; wr # output_record "next"; ... wr # close_out() let read_back() = let fmt = Mapred_io.var_size_format() in let n = Mapred_io.file_blocks cluster "/filename" in let rd = fmt # read_file  cluster rc "/filename" 0L n in let r1 = rd # input_record() in let r2 = rd # input_record() in ... (* until End_of_file *) rd # close_in()
There is now maximum freedom for the layout of the records. It is now
possible to handle records in the same way as for the text format,
or in the same way as for the fixed-size format. Sophisticated layouts
are now also possible: The module
Mapred_fields includes support
for a special representation of fields within the record. As the
records as a whole the fields have now length headers. For example,
Mapred_fields.var_concat ["one"; "two"; "three"]
creates a record with three fields. Note that here each field can have any value - LF and nulls included, of any length.
Example: We want to analyze a social network, and have four columns with: Name of a person, person ID, number of friends, photo as JPEG. We want to sort the data by the number of friends.
We now use
Mapred_fields to represent the records such that
let record = Mapred_fields.var_concat [ name; string_of_int id; string_of_int friends; photo ]
Because there are no restrictions about the contents of the fields, there also no escaping problems.
This leads to this job definition:
let job = object method input_record_io me jc = Mapred_io.var_size_format () method output_record_io me jc = Mapred_io.var_size_format () method internal_record_io me jc = Mapred_io.var_size_format () method extract_key me jc record = Mapred_fields.var_field_range ~field:3 record method sorter = Mapred_sorters.generic_sorter ~hash:Mapred_sorters.Integer_asc.hash ~cmp:Mapred_sorters.Integer_asc.cmp ... end
partition_of_key, and the
in a certain way. Bascially,
extract_key is a preprocessor of the latter
two functions, and factors common work out. What we finally need are:
extract_key normally identifies the interesting part of the
record that needs to be analyzed by
partition_of_key and the
sorter, and returns this as a pair
(pos,len). Sometimes, it is not
possible to pinpoint any such part, and we can only return
len=String.length record. In this case,
partition_of_key and the
sorter get the complete record as input, and can look into different
parts of the records to distill the required information.
partition_of_key and the sorter functions
get their input as a triple
s is some string
(often not only the record but a buffer containing the record), and
len are the output of
extract_key relative to
means we can reconstruct the record by running
let record = String.sub s pos len
Example: We have three fields. We want to sort by the first and third field, and we take the hash of the second key to get the partition. We assume here text records with TAB-separated fields.
The relevant part of the job definition is then:
let job = object method extract_key me jc record = (0, String.length record) method sorter = let hash s pos len = (* Only use the first field for hashing *) let (p1,l1) = Mapred_fields.text_field_range ~pos ~len ~field:1 s in Mapred_sorters.String_asc.hash s p1 l1 (* (p1,l1) is the range of the first field in s *) and cmp s1 pos1 len1 s2 pos2 len2 = let (p1,l1) = Mapred_fields.text_field_range ~pos:pos1 ~len:len1 ~field:1 s1 in (* (p1,l1) is the range of the first field in s1 *) let (p2,l2) = Mapred_fields.text_field_range ~pos:pos2 ~len:len2 ~field:1 s2 in (* (p2,l2) is the range of the first field in s2 *) let d = Mapred_sorters.String_asc.cmp s1 p1 l1 s2 p2 l2 in if d <> 0 then d else (* Compare only by the third field if the first fields are equal *) let (q1,m1) = Mapred_fields.text_field_range ~pos:pos1 ~len:len1 ~field:3 s1 in (* (q1,m1) is the range of the third field in s1 *) let (q2,m2) = Mapred_fields.text_field_range ~pos:pos2 ~len:len2 ~field:3 s2 in (* (q2,m2) is the range of the third field in s2 *) Mapred_sorters.String_asc.cmp s1 q1 m1 s2 q2 m2 in Mapred_sorters.generic_sorter ~hash ~cmp method partition_of_key me jc s pos len = let field2 = Mapred_fields.text_field ~pos ~len ~field:2 s in (Hashtbl.hash field2) mod jc#partitions ... end
This example works even better if one of the binary formats is chosen:
text_extract, because the positions in the record are fixed (e.g. the first field is found in bytes 0-7, the second in bytes 8-15, and the third field in bytes 16-23).
Mapred_fields.var_field_rangewhich does the corresponding to
text_field_range, but is faster especially for the case that the records are long.