forked from discoproject/disco
-
Notifications
You must be signed in to change notification settings - Fork 0
a Map/Reduce framework for distributed computing
License
dimazest/disco
Folders and files
Name | Name | Last commit message | Last commit date | |
---|---|---|---|---|
Repository files navigation
Directory Structure ------------------- - ctrl/ -- the web interface - master/ -- disco master (Erlang stuff) - py/ -- Python stuff (worker & client interface) - util/ -- miscellaneous utilities Quick start: How to count word frequencies in a large text file --------------------------------------------------------------- Let's assume that your large text file is called bigfile.txt. 1. Split the file to 1M-line chunks and save them to a directory: mkdir bigtxt split -l 1000000 bigfile.txt bigtxt/bigtxt- 2. Distribute chunks to nodes: python disco/util/distrfiles.py bigtxt /scratch/cnodes > bigtxt.chunks Here the file /scratch/cnodes contains a list of nodes where the chunks are distributed, one node per line. The script outputs a disco-address for each chunk If you want to repeat this command multiple times, e.g. for a new set of chunks, you need to run REMOVE_FIRST=1 python disco/util/distrfiles.py bigtxt /scratch/cnodes > bigtxt.chunks which first removes the target directory on each node before copying the chunks. Note that distrfiles uses the directory name as the name for the data set. 3. Run the example script: export PYTHONPATH=disco/py python disco/py/disco_test.py disco://cfront:4000 `cat bigtxt.chunks` > bigtxt.results The script creates a new job, using the chunks in bigtxt.chunks as its inputs. Job results are printed out to bigtxt.results. 4. Check the job status with your browser at: http://cfront:4000 Client API ---------- A new disco job is started with the disco.job() function. It takes all information needed to run a job, posts a job request to the disco master and in the default case blocks to wait for the results. The results are not fetched automatically to the calling host, but that can be done easily with disco.result_iterator(). A job request may contain several user-defined functions, as specified below. When writing custom functions, take into account the following features of the disco worker environment: - Only the specified function is included in the request. The function can't call any other functions specified in your source file or it can't refer to any global names, including any imported modules. If you need a special module in your function, import it within the function body. Use of global variables or functions with side-effects, such as external files besides the given input, is strongly discouraged. - The function should not print anything to the stdout or stderr. Instead, you can use the function msg("hello world!") to send messages to the status display. You can use the function data_err("outch!", input_file) to abort the task on this node and request transfer to another node. Otherwise it is a good idea to just let the task die if any exceptions occur -- do not catch any exceptions from which you can't recover. Disco.job() ''''''''''' disco.job(master, name, input_files, fun_map, [map_reader, reduce, partition, combiner, nr_maps, nr_reduces, sort, em_sort_limit, params, async, clean]) -> [result files] The first four parameters are required, the rest are optional. Returns a list like [[1, "disco://reduce-1"], [2, "disco://reduce-2"]] containing partition-ID, result-file pairs. If the job fails, a JobException is raised. The exception contains instance variables name and master which you can use to call disco.clean_job() to remove the failed job entry on the master. * master [string] (required) Address of the disco master, e.g. "disco://cfront:4000". * name [string] (required) Job name. The disco client appends "@[timestamp]" suffix to the name to ensure uniqueness. If you start more than one job per second, you cannot rely on the timestamp which increments only once per second. In any case, users are strongly recommended to devise a good naming scheme of their own. Only characters in [a-zA-Z0-9_] are allowed in the job name. * input_files [list] (required) List of input files for the map phase. A file name must be specified in one of the following three protocols: - http://www.example.com/data - any HTTP address - disco://cnode03/bigtxt/file_name - disco address. Refers to cnode03:/var/disco/bigtxt/file_name. Currently this is an alias for http://cnode03:8989/bigtxt/file_name. - /home/bob/bigfile.txt - a local file. Note that the file must either exist on all the nodes or you must make sure that the job is run only on the nodes where the file exists. Due to these restrictions, this form has only limited use. * fun_map [function] (required) The map function. The map function gets a single parameter [e] def fun_map(e, params) that is an input entry returned by the map_reader function. By default it is a single line from the input. It returns an iterable object (list, generator or iterator) which produces a sequence of (key, value) tuples. params is a user-defined object that is given to disco.job() and then passed to fun_map and fun_reduce. It may be also used to maintain state between several calls to the map function. By default, params is an empty dictionary. * map_reader [function] (optional) The map reader function parses input entries from the given input file. It accepts three parameters: def map_line_reader(fd, size, fname) where [fd] is the file descriptor for an input file, [size] the input file size and [fname] the input file name. Disco worker provides a convience function called [re_reader] which can be used to construct new map readers using regular expressions. For example, the following map reader produces full HTML documents as input entries for the map function: def map_line_reader(fd, size, fname): for x in re_reader("<HTML>(.*?)</HTML>", fd, size, fname): yield x[0] The regular expression, given as the first parameter to re_reader(), must include at least one group, defined by parentheses, and must match so that no extra characters are left between or after any input entry. You can perform many map/reduce operations in sequence by specifying map_reader = disco.chain_reader which reads output of a previous map or reduce operation. * reduce [function] (optional) The reduce function. The reduce function gets three parameters: def fun_reduce(iter, out, params) where [iter] is an iterator returning (key, value) pairs from the map functions, [out] is an output object which includes one function, add(), that should be used to output the result pairs as in out.add(key, value) The last parameter, [job], includes the original job specification and may be used to affect the function behavior. The use of this parameter is likely to be extended in the future. By default there isn't any reduce function and the job will quit after the map functions finish. * partition [function] (optional) The partition function defines which of the reduce functions gets which key-value pair from the map function. The function takes two parameters: def fun_partition(key, nr_reduces) here [key] is the key returned by a call to the map function, and [nr_reduces] the number of reduce operations, as defined by the disco.job() parameter of the same name. The default partition function is defined as follows: def default_partition(key, nr_reduces): return hash(str(key)) % nr_reduces A utility function called disco.make_range_partition(min, max) can be used to create a partition function that partitions a numeric range [min:max] to [nr_reduces] equal-width partitions. * combiner [function] (optional) The combiner function takes the key-value pairs from the map function and merges several pairs together to produce a more compact output. It takes four parameters: def fun_combiner(key, value, comb_buffer, flush) where the first two parameters correspond to a single key-value pair from the map function. The third parameter, [comb_buffer], is an accumulator object, a dictionary, that combiner can use to save its state. Combiner must control the comb_buffer size, to prevent it from eating all available memory. If the combiner function returns True, the disco worker will immediately flush comb_buffer, as defined below. The last parameter, [flush], is a boolean value that instructs combiner to transform comb_buffer to valid key-value pairs for the output stream. After the combiner call returns, the worker will iterate through the comb_buffer, add values to the output stream and empty the buffer. This feature allows the combiner function to store data in comb_buffer in any way it likes and transform it to proper stringified key-value pairs only when needed. By default no combiner function is used and the output of the map function is used as such. * nr_maps [integer] (optional) The number of parallel map operations. By default, nr_maps = len(input_files). * nr_reduces [integer] (optional) The number of parallel reduce operations. By default, nr_reduces = max(nr_maps / 2, 1) * sort [boolean] (optional) Sort the map outputs, true by default. * mem_sort_limit [integer] (optional) The maximum amount of RAM reserved for in-memory sorting in bytes. If the reduce input exceeds this amount, an external disk-based sort is used instead. By default 256MB. * params (optional) Any python object that is passed as such to map_fun and map_reduce. * async [boolean] (optional) Do not block the disco.job() call but return immediately after the job request has been posted. By default false, meaning that the call will block. If set to true, use the disco.wait_job() function to poll the results. In the asyncronous mode the parameter 'clean' is ignored, so you must call disco.clean_job() explicitely to remove the job entry from the master. * clean [boolean] (optional) Calls disco.clean_job() automatically when the job has finished succesfully. True by default. disco.wait_job() '''''''''''''''' wait_job(master, name, poll_interval = 5, timeout = None) Polls for a job [name] to finish on the disco master at [master]. [poll_interval] specifies the polling interval in seconds. The timeout value specifies the maximum number of seconds to wait. disco.result_iterator(results, notifier = None) ''''''''''''''''''''''''''''''''''''''''''''''' result_iterator(results, notifier = None): The result iterator can be used to iterate through a result list returned by disco.job(). It fetches result files from nodes and goes through the key-value pairs in each file. An optional [notifier] callback function is called whenever the partition changes. The callback function gets two parameters: def notifier(part_id, url) where [part_id] is the partition ID for the next values and URL address of the result file. disco.clean_job(master, name) ''''''''''''''''''''''''''''' Clean the job entry 'name' at 'master'. Note that after calling this function you can't access status information on the master anymore and you have 24 hours time to collect the results before they are garbage collected automatically. disco.kill_job(master, name) '''''''''''''''''''''''''''' Kill the job 'name' at 'master'. Note that this entry doesn't remove the job entry from the master, so you can still access status information for this job. Use disco.clean_job() to remove the job entry totally.
About
a Map/Reduce framework for distributed computing
Resources
License
Stars
Watchers
Forks
Packages 0
No packages published
Languages
- Erlang 53.6%
- Python 33.0%
- C 10.1%
- JavaScript 3.2%
- Shell 0.1%