Skip to content

dimazest/disco

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

76 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation


Directory Structure
-------------------

 - ctrl/ -- the web interface
 - master/ -- disco master (Erlang stuff)
 - py/ -- Python stuff (worker & client interface)
 - util/ -- miscellaneous utilities


Quick start: How to count word frequencies in a large text file
---------------------------------------------------------------

Let's assume that your large text file is called bigfile.txt.

1. Split the file to 1M-line chunks and save them to a directory:

mkdir bigtxt
split -l 1000000 bigfile.txt bigtxt/bigtxt-

2. Distribute chunks to nodes:

python disco/util/distrfiles.py bigtxt /scratch/cnodes > bigtxt.chunks

Here the file /scratch/cnodes contains a list of nodes where the chunks
are distributed, one node per line. The script outputs a disco-address
for each chunk

If you want to repeat this command multiple times, e.g. for a new set of
chunks, you need to run

REMOVE_FIRST=1 python disco/util/distrfiles.py bigtxt /scratch/cnodes > bigtxt.chunks

which first removes the target directory on each node before copying the
chunks. Note that distrfiles uses the directory name as the name for the data set.

3. Run the example script:

export PYTHONPATH=disco/py 
python disco/py/disco_test.py disco://cfront:4000 `cat bigtxt.chunks` > bigtxt.results

The script creates a new job, using the chunks in bigtxt.chunks as its inputs.
Job results are printed out to bigtxt.results.

4. Check the job status with your browser at:

http://cfront:4000


Client API
----------

A new disco job is started with the disco.job() function. It takes all
information needed to run a job, posts a job request to the disco master
and in the default case blocks to wait for the results. The results are
not fetched automatically to the calling host, but that can be done 
easily with disco.result_iterator(). 

A job request may contain several user-defined functions, as specified
below. When writing custom functions, take into account the following 
features of the disco worker environment:

- Only the specified function is included in the request. The function
  can't call any other functions specified in your source file or it can't
  refer to any global names, including any imported modules. If you need
  a special module in your function, import it within the function body.
  Use of global variables or functions with side-effects, such as
  external files besides the given input, is strongly discouraged.

- The function should not print anything to the stdout or stderr.
  Instead, you can use the function msg("hello world!") to send messages
  to the status display. You can use the function data_err("outch!",
  input_file) to abort the task on this node and request transfer to
  another node. Otherwise it is a good idea to just let the task die if
  any exceptions occur -- do not catch any exceptions from which you can't
  recover.

Disco.job()
'''''''''''

disco.job(master, name, input_files, fun_map, [map_reader, reduce,
          partition, combiner, nr_maps, nr_reduces, sort,
          em_sort_limit, params, async, clean]) -> [result files]

The first four parameters are required, the rest are optional. Returns
a list like [[1, "disco://reduce-1"], [2, "disco://reduce-2"]] containing
partition-ID, result-file pairs.

If the job fails, a JobException is raised. The exception contains instance
variables name and master which you can use to call disco.clean_job() to 
remove the failed job entry on the master.

* master [string] (required)

Address of the disco master, e.g. "disco://cfront:4000". 


* name [string] (required)

Job name. The disco client appends "@[timestamp]" suffix to the name
to ensure uniqueness. If you start more than one job per second, you
cannot rely on the timestamp which increments only once per second. In
any case, users are strongly recommended to devise a good naming scheme
of their own. Only characters in [a-zA-Z0-9_] are allowed in the job
name.


* input_files [list] (required)

List of input files for the map phase. A file name must be specified in
one of the following three protocols:

 - http://www.example.com/data - any HTTP address

 - disco://cnode03/bigtxt/file_name - disco address. Refers to 
   cnode03:/var/disco/bigtxt/file_name. Currently this is an alias for
   http://cnode03:8989/bigtxt/file_name.

 - /home/bob/bigfile.txt - a local file. Note that the file must either
   exist on all the nodes or you must make sure that the job is run only
   on the nodes where the file exists. Due to these restrictions, this
   form has only limited use.


* fun_map [function] (required)

The map function. The map function gets a single parameter [e]

 def fun_map(e, params)

that is an input entry returned by the map_reader function. By default
it is a single line from the input. It returns an iterable object (list,
generator or iterator) which produces a sequence of (key, value) tuples.

params is a user-defined object that is given to disco.job() and then
passed to fun_map and fun_reduce. It may be also used to maintain state
between several calls to the map function. By default, params is an
empty dictionary.

* map_reader [function] (optional)

The map reader function parses input entries from the given input file.
It accepts three parameters:

 def map_line_reader(fd, size, fname)

where [fd] is the file descriptor for an input file, [size] the input file
size and [fname] the input file name. Disco worker provides a convience
function called [re_reader] which can be used to construct new map
readers using regular expressions. For example, the following map reader
produces full HTML documents as input entries for the map function:

 def map_line_reader(fd, size, fname):
 	for x in re_reader("<HTML>(.*?)</HTML>", fd, size, fname):
		yield x[0]

The regular expression, given as the first parameter to re_reader(), must
include at least one group, defined by parentheses, and must match so that
no extra characters are left between or after any input entry.

You can perform many map/reduce operations in sequence by specifying

map_reader = disco.chain_reader

which reads output of a previous map or reduce operation.


* reduce [function] (optional)

The reduce function. The reduce function gets three parameters:

 def fun_reduce(iter, out, params)

where [iter] is an iterator returning (key, value) pairs from the map
functions, [out] is an output object which includes one function, add(),
that should be used to output the result pairs as in

 out.add(key, value) 

The last parameter, [job], includes the original job specification and
may be used to affect the function behavior. The use of this parameter is 
likely to be extended in the future. 

By default there isn't any reduce function and the job will quit after
the map functions finish.

* partition [function] (optional)

The partition function defines which of the reduce functions gets which 
key-value pair from the map function. The function takes two parameters:

 def fun_partition(key, nr_reduces)

here [key] is the key returned by a call to the map function, and
[nr_reduces] the number of reduce operations, as defined by the
disco.job() parameter of the same name. The default partition
function is defined as follows:

  def default_partition(key, nr_reduces):
  	return hash(str(key)) % nr_reduces

A utility function called disco.make_range_partition(min, max) can
be used to create a partition function that partitions a numeric range
[min:max] to [nr_reduces] equal-width partitions.

* combiner [function] (optional)

The combiner function takes the key-value pairs from the map function 
and merges several pairs together to produce a more compact output. It
takes four parameters:

 def fun_combiner(key, value, comb_buffer, flush)

where the first two parameters correspond to a single key-value pair
from the map function. The third parameter, [comb_buffer], is an
accumulator object, a dictionary, that combiner can use to save its
state. Combiner must control the comb_buffer size, to prevent
it from eating all available memory. If the combiner function returns
True, the disco worker will immediately flush comb_buffer, as defined
below.

The last parameter, [flush], is a boolean value that instructs combiner
to transform comb_buffer to valid key-value pairs for the output stream.
After the combiner call returns, the worker will iterate through the
comb_buffer, add values to the output stream and empty the buffer. This
feature allows the combiner function to store data in comb_buffer in
any way it likes and transform it to proper stringified key-value pairs
only when needed.

By default no combiner function is used and the output of the map function
is used as such.


* nr_maps [integer] (optional)

The number of parallel map operations. By default, 
  nr_maps = len(input_files).


* nr_reduces [integer] (optional)

The number of parallel reduce operations. By default,
  nr_reduces = max(nr_maps / 2, 1)


* sort [boolean] (optional)

Sort the map outputs, true by default.


* mem_sort_limit [integer] (optional)

The maximum amount of RAM reserved for in-memory sorting in bytes. If
the reduce input exceeds this amount, an external disk-based sort is
used instead. By default 256MB.


* params (optional)

Any python object that is passed as such to map_fun and map_reduce.


* async [boolean] (optional)

Do not block the disco.job() call but return immediately after the job
request has been posted. By default false, meaning that the call will
block. If set to true, use the disco.wait_job() function to poll the
results. In the asyncronous mode the parameter 'clean' is ignored, so
you must call disco.clean_job() explicitely to remove the job entry
from the master.

* clean [boolean] (optional)

Calls disco.clean_job() automatically when the job has finished
succesfully. True by default.

disco.wait_job()
''''''''''''''''

wait_job(master, name, poll_interval = 5, timeout = None)

Polls for a job [name] to finish on the disco master at [master]. 
[poll_interval] specifies the polling interval in seconds. The timeout
value specifies the maximum number of seconds to wait.


disco.result_iterator(results, notifier = None)
'''''''''''''''''''''''''''''''''''''''''''''''

result_iterator(results, notifier = None):

The result iterator can be used to iterate through a result list
returned by disco.job(). It fetches result files from nodes and goes
through the key-value pairs in each file. An optional [notifier]
callback function is called whenever the partition changes. The callback
function gets two parameters:

 def notifier(part_id, url)

where [part_id] is the partition ID for the next values and URL address
of the result file.


disco.clean_job(master, name)
'''''''''''''''''''''''''''''

Clean the job entry 'name' at 'master'. Note that after calling this
function you can't access status information on the master anymore and
you have 24 hours time to collect the results before they are garbage
collected automatically.

disco.kill_job(master, name)
''''''''''''''''''''''''''''

Kill the job 'name' at 'master'. Note that this entry doesn't remove
the job entry from the master, so you can still access status information
for this job. Use disco.clean_job() to remove the job entry totally.




























About

a Map/Reduce framework for distributed computing

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Erlang 53.6%
  • Python 33.0%
  • C 10.1%
  • JavaScript 3.2%
  • Shell 0.1%