Plasma GitLab Archive
Projects Blog Knowledge

Plasma Project:

Module Mapred_io

module Mapred_io: sig .. end
Utility library for record-based I/O

A file consists here of records. A record usually needs some framing bytes in order to be recognizable in the file. For example, the representation as text file uses the LF byte to mark the end of the record. Other representations are also supported, see below.

The record reader can be used to iterate over a whole file, or only a part. For the latter, it is assumed that the file is processed bigblock by bigblock. Of course, it is then possible that the records do not end at bigblock boundaries. However, it must be determined whether the block reader for bigblock N or the block reader for bigblock N+1 processes such records. The following rules do that:

  • The first bigblock does not have this problem. Its first record is always processed by the block reader for block 0.
  • For the other bigblocks we define that the first processed record is the first record that can be recognized as the first record starting in the bigblock. (See below for the details per representation.)
  • The last record in the bigblock to process is the record that preceeds the first record of the next bigblock
For best efficiency, the block reader should not be used for reading individual bigblocks, but for contiguous ranges of bigblocks.


class type record_config = object .. end
class type record_reader = object .. end
class type record_writer = object .. end
type read_flag = [ `Bof_block of int64 | `Eof_pos of int64 ] 
Flags for the reader:
  • `Bof_block n: Assume that the first record to read is at position n of the file. The position should not be after the first block to read.
  • `Eof_pos n: Assume that the EOF position is at n (in bytes)
`Bof and `Eof can be used to read from a part of the file only.
class type record_reader_factory = object .. end
class type record_writer_factory = object .. end
class type record_rw_factory = object .. end

File formats

val line_structured_format : unit -> record_rw_factory
The line structured file format represents every record as a line in a text file. The line is terminated by a LF character. Of course, it is not possible to have an LF character inside the record data then.

Otherwise, a line can include every byte (including NUL). We do not assume any character set - a line is simply a sequence of bytes.

For line-structured files, the first record (line) to process in a bigblock is the line following the first LF byte. The only exception is the first bigblock, where the first line starts at the beginning of the file.

val fixed_size_format : int -> record_rw_factory
A record has exactly n bytes. The record can have any contents, and binary representations are fully supported.

For files with fixed-size records, the first record of block k of the file starts at byte

 (k * blocksize) mod recordsize 

This format cannot be used with streaming mode!

val var_size_format : unit -> record_rw_factory
A record consists of a variable number of bytes (at most a bigblock). Binary representations are fully supported.

The file consists of chunks (where a whole number of chunks must fit into a bigblock). Every chunk of N bytes has a header of 32 bytes, followed by a data region until the end of the chunk:

 chunk<k> = header<k> ^ data<k> 

Here, k is the index identifying the chunk, k=0..K-1.

The length of a chunk is currently always 64K, with no option for the user to change it. Note that the last chunk of the file may be incompletely stored: The data part may be cut off when the real end of the data stream is reached.

If no compression is done, the concatenated data regions contain the sequence of records. A record may start in one region and be continued in the next:

 data<0> ^ ... ^ data<K-1> = record<0> ^ ... ^ record<R-1> 

If a compression algorithm is applied, every chunk is compressed individually. So:

 U(data<0>) ^ ... ^ U(data<K-1>) = record<0> ^ ... ^ record<R-1> 

where U is the uncompression function.

The header consists of:

 header = chunksize ^ datasize ^ recstart ^ flags ^ checksum 


  • chunksize is the number N of bytes every chunk consists of. It is represented as 64 bit number in big endian order.
  • datasize is the number of bytes for the data area. Normally, this should be chunksize-32, but for compressed chunks it is possible that some bytes must remain free. Again a 64 bit number in big endian order.
  • recstart is the offset to the start of the first record in the data area (if compression is done, this refers to the data in uncompressed form). The offset is relative to the start of the data area. E.g. 0 means that the data area starts immediately with a record. If -1, no record begins in this chunk. Again a 64 bit number in big endian order.
  • flags are 32 bits that can be used for flags (also big endian). Bit 0 says whether GZIP compression is enabled. Other bits are not yet assigned.
  • checksum are the first four bytes of MD5(chunksize ^ datasize ^ recstart ^ flags ^ dec(k)) where dec(k) is the decimal representation of k, the chunk index.
Records consist of a length header followed by the data:

 record = length ^ record_data 

If record_data is up to 254 bytes long, the length is just this length as single byte. If record_data is longer, the length is represented as a single byte 0xFF followed by a 64 bit number in big endian order.

This format cannot be used with streaming mode!

val auto_input_format : unit -> record_rw_factory
This "format" recognizes the format for input files by the file suffix (".var", "fixed<n>", or nothing).

Writing files is not supported!

Generic functions

val bigblock_size : Mapred_fs.filesystem -> string -> int -> int
bigblock_size fs path suggested: Returns the real bigblock size for the suggested value. The real size is rounded up to the next block multiple.

The blocksize is retrieved from fs (synchronously)

type sync_readers = (unit -> record_reader) list 
type async_readers = (unit -> record_reader Uq_engines.engine) list 
val read_multiple : Mapred_fs.filesystem ->
record_config ->
readers:[ `Async of async_readers | `Sync of sync_readers ] ->
unit -> record_reader
Constructs a record reader that reads from the input readers one after the other. The readers can be given in a synchronous form, or in an asynchronous form. The latter is preferrable when the reader is in asynchronous mode (i.e. when to_fd_e is running).
val write_multiple : Mapred_fs.filesystem ->
Plasma_shm.shm_manager ->
record_config ->
string ->
int64 ->
int ->
create_sync:(string -> int -> string) ->
create_async:(string -> int -> string Uq_engines.engine) option ->
record_writer_factory -> record_writer
write_multiple fs shm rc prefix size_limit lines_limit create_sync create_async rwf: Writes into a sequence of files whose names are composed of prefix followed by an integer k. The files are created by calling create_async prefix k. A new file is started when the current file reaches the size size_limit (in bytes), or when the current file has lines_limit lines.

Note that the size limit is checked without taking the LF line separator into account.

create_async: If passed, this function is used instead of create_sync in asynchronous mode (i.e. when from_fd_e is running).

Restriction: from_dev_e is not implemented.

val divide : record_config -> int -> record_config
Divides buffer_size and buffer_size_tight by this number

File system access

More functions for file system access can be found in Plasma_netfs.

On the lower level you can of course also use Plasma_client directly, but the interface is slightly more complicated.

val file_exists : Mapred_fs.filesystem -> string -> bool
file_exists fs name: whether this file, directory or symbolic link exists
val is_directory : Mapred_fs.filesystem -> string -> bool
file_exists fs name: whether this directory exists
val create_file : ?repl:int ->
?pref_nodes:string list -> Mapred_fs.filesystem -> string -> unit
create_file fs name: Creates this file exclusively. repl is the replication factor, 0 by default (i.e. use server default). pref_nodes can be set to the list of preferred datanode identities (actually, this only configured the cluster).
val create_dir : Mapred_fs.filesystem -> string -> unit
create_dir fs name: Creates a directory exclusively
val delete_file : Mapred_fs.filesystem -> string -> unit
Delete this file or empty dir
val delete_rec : Mapred_fs.filesystem -> string -> unit
Deletes this file or directory recursively
val file_blocks : Mapred_fs.filesystem -> string -> int64
Get the length of the file in blocks
type fileinfo = [ `Directory | `Other | `Regular of int64 ] 
A file is either `Regular n where n is the length, or `Directory, or `Other. We follow symlinks in this module. `Other is also returned for dead symlinks
val scan_file : Mapred_fs.filesystem -> string -> fileinfo
Returns information about this file
val scan_dir : Mapred_fs.filesystem -> string -> bool -> (string * fileinfo) list
scan_dir fs dir deeply: Scans the directory dir for files, and returns the files (w/o directory prefix) as pairs (name,info). If deeply, subdirectories are recursively scanned, otherwise they are ignored.

Files and directories starting with "." and "_" are ignored.

Symbolic links are followed (and the max scanning depth is restricted). The returned data refers to the files pointed to by the symlinks. Dead symlinks are skipped.

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml