Plasma GitLab Archive
Projects Blog Knowledge

Plasma Project:

Module Mapred_tasks

module Mapred_tasks: sig .. end
Representation of tasks

type file_fragment = string * int64 * int64 
Filename, start block pos, length in blocks. If the length is 0, this means "till end of file"
type file = file_fragment list 
type file_tag = [ `Tag of string ] 
Tags may be attached to files. If not used, `Tag "" is the value
type locality = [ `Any | `Dn_identity of string ] 
For some outputs, it is possible to request where the file is placed.

  • `Dn_identity s: The file blocks are allocated on the datanode with identity s, if possible
  • `Any: no such request is done, any place is equally good

type map_task = {
   map_input :file; (*the list of input block fragments to read, given as triples (filename,block,len)*)
   map_output_prefix :string; (*the preifx for output files to create and to write*)
   map_output_suffix :string; (*the suffix for output files*)
   map_id :int; (*identifies the map task (numbered 0 to n-1)*)
   map_best_hosts :Unix.inet_addr list; (*best hosts for executing this task (might be empty)*)
type sort_task = {
   sort_input :file; (*The file to sort*)
   sort_input_del :bool; (*whether to delete the input after finishing*)
   sort_output :string; (*the output file to create and to write*)
   sort_output_locality :locality; (*Request where to create the output file*)
   sort_id :int; (*the ID of the input map task*)
type shuffle_task = {
   shuffle_input :(file * int * int) list; (*The inputs: list of files (file, kmin, kmax). This means that file contains all records from the map tasks kmin to kmax which are also in shuffle_partitions.

It is required that the inputs are contiguous regarding the coverage of map tasks, i.e. for all tasks k given in shuffle_coverage there must be an input file covering it.

   shuffle_input_del :bool; (*whether to delete the input after finishing*)
   shuffle_output :(string * int * int * locality) list; (*the outputs: list of files (file, pmin, pmax, loc). This means that file contains all records of the input that fall into the range of partitions pmin to pmax.*)
   shuffle_partitions :int * int; (*The files are in the range of partitions (pmin,pmax)*)
   shuffle_coverage :int * int; (*The files cover the map task range (kmin,kmax)*)
   shuffle_reduce :bool; (*Whether this shuffle task is also a reduce task*)
   shuffle_round :int; (*The round of shuffling (starts with 0)*)
   shuffle_avg_part_width :float; (*The average pmax-pmin+1 value for shuffle tasks in this round of shuffling*)
Shuffling means to rearrange the output of the map/sort tasks into a set of partitions. The map tasks are numbered 0 to n-1 (via map_id) and for each task there is a file that serves here as input. The partitions are numbered 0 to m-1, and finally there is for each partition exactly one output file.
type emap_task = {
   emap_map :map_task; (*The map part is specified as for map_task*)
   emap_output :(file_tag * int * int * locality) list; (*The output is split into this number of sets. Each set is specified as (tag, pmin, pmax, loc), and will contain partitions from the given range pmin..pmax.

The specified tags are also attached to the result of the task (in `Ok).

Enhanced map tasks do not only map, but also prepartition the result and sort it
type task = [ `Cleanup
| `Emap of emap_task
| `Map of map_task
| `Shuffle of shuffle_task
| `Sort of sort_task ]
type task_ok = {
   ok_files :(file_tag * file) list;
   ok_counters :Mapred_stats.stats;
type task_result = [ `Corrupt_input of file list
| `Error of string
| `Ok of task_ok
| `Retry_later ]
- `Ok ok: Task is successful. ok.ok_files is the list of created files (or file fragments), and ok.ok_counters returns statistics
  • `Retry_later: Task cannot be started because of lack of resources. No files have been created (or these are already deleted). The task should be again tried when it is suspected that resources have been freed.
  • `Corrupt_input files: Inputs files of the task were not readable. The files should be created again and the task retried. No files have been created (or these are already deleted).
  • `Error msg: Fatal error msg. This is an internal error or an error from user code

module Ord: sig .. end
val encode_task : task -> string
val decode_task : string -> task
val encode_task_result : task_result * string list -> string
val decode_task_result : string -> task_result * string list
val lock_name_of_task_id : task -> string
The name used for forming the name of the lock files
val string_of_task_id : task -> string
val print_task_id : Netchannels.out_obj_channel -> task -> int -> unit
val print_task : Netchannels.out_obj_channel -> task -> int -> unit
val print_file : ?tag:string ->
Netchannels.out_obj_channel -> file -> int -> unit
This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml