Skip to content

Commit 356a875

Browse files
author
alex
committedNov 26, 2012
Initial import
0 parents  commit 356a875

38 files changed

+69355
-0
lines changed
 

‎AUTHORS

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Alexander Hurd

‎COPYING

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
<Place your desired license here.>

‎ChangeLog

Whitespace-only changes.

‎INSTALL

+365
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,365 @@
1+
Installation Instructions
2+
*************************
3+
4+
Copyright (C) 1994, 1995, 1996, 1999, 2000, 2001, 2002, 2004, 2005,
5+
2006, 2007, 2008, 2009 Free Software Foundation, Inc.
6+
7+
Copying and distribution of this file, with or without modification,
8+
are permitted in any medium without royalty provided the copyright
9+
notice and this notice are preserved. This file is offered as-is,
10+
without warranty of any kind.
11+
12+
Basic Installation
13+
==================
14+
15+
Briefly, the shell commands `./configure; make; make install' should
16+
configure, build, and install this package. The following
17+
more-detailed instructions are generic; see the `README' file for
18+
instructions specific to this package. Some packages provide this
19+
`INSTALL' file but do not implement all of the features documented
20+
below. The lack of an optional feature in a given package is not
21+
necessarily a bug. More recommendations for GNU packages can be found
22+
in *note Makefile Conventions: (standards)Makefile Conventions.
23+
24+
The `configure' shell script attempts to guess correct values for
25+
various system-dependent variables used during compilation. It uses
26+
those values to create a `Makefile' in each directory of the package.
27+
It may also create one or more `.h' files containing system-dependent
28+
definitions. Finally, it creates a shell script `config.status' that
29+
you can run in the future to recreate the current configuration, and a
30+
file `config.log' containing compiler output (useful mainly for
31+
debugging `configure').
32+
33+
It can also use an optional file (typically called `config.cache'
34+
and enabled with `--cache-file=config.cache' or simply `-C') that saves
35+
the results of its tests to speed up reconfiguring. Caching is
36+
disabled by default to prevent problems with accidental use of stale
37+
cache files.
38+
39+
If you need to do unusual things to compile the package, please try
40+
to figure out how `configure' could check whether to do them, and mail
41+
diffs or instructions to the address given in the `README' so they can
42+
be considered for the next release. If you are using the cache, and at
43+
some point `config.cache' contains results you don't want to keep, you
44+
may remove or edit it.
45+
46+
The file `configure.ac' (or `configure.in') is used to create
47+
`configure' by a program called `autoconf'. You need `configure.ac' if
48+
you want to change it or regenerate `configure' using a newer version
49+
of `autoconf'.
50+
51+
The simplest way to compile this package is:
52+
53+
1. `cd' to the directory containing the package's source code and type
54+
`./configure' to configure the package for your system.
55+
56+
Running `configure' might take a while. While running, it prints
57+
some messages telling which features it is checking for.
58+
59+
2. Type `make' to compile the package.
60+
61+
3. Optionally, type `make check' to run any self-tests that come with
62+
the package, generally using the just-built uninstalled binaries.
63+
64+
4. Type `make install' to install the programs and any data files and
65+
documentation. When installing into a prefix owned by root, it is
66+
recommended that the package be configured and built as a regular
67+
user, and only the `make install' phase executed with root
68+
privileges.
69+
70+
5. Optionally, type `make installcheck' to repeat any self-tests, but
71+
this time using the binaries in their final installed location.
72+
This target does not install anything. Running this target as a
73+
regular user, particularly if the prior `make install' required
74+
root privileges, verifies that the installation completed
75+
correctly.
76+
77+
6. You can remove the program binaries and object files from the
78+
source code directory by typing `make clean'. To also remove the
79+
files that `configure' created (so you can compile the package for
80+
a different kind of computer), type `make distclean'. There is
81+
also a `make maintainer-clean' target, but that is intended mainly
82+
for the package's developers. If you use it, you may have to get
83+
all sorts of other programs in order to regenerate files that came
84+
with the distribution.
85+
86+
7. Often, you can also type `make uninstall' to remove the installed
87+
files again. In practice, not all packages have tested that
88+
uninstallation works correctly, even though it is required by the
89+
GNU Coding Standards.
90+
91+
8. Some packages, particularly those that use Automake, provide `make
92+
distcheck', which can by used by developers to test that all other
93+
targets like `make install' and `make uninstall' work correctly.
94+
This target is generally not run by end users.
95+
96+
Compilers and Options
97+
=====================
98+
99+
Some systems require unusual options for compilation or linking that
100+
the `configure' script does not know about. Run `./configure --help'
101+
for details on some of the pertinent environment variables.
102+
103+
You can give `configure' initial values for configuration parameters
104+
by setting variables in the command line or in the environment. Here
105+
is an example:
106+
107+
./configure CC=c99 CFLAGS=-g LIBS=-lposix
108+
109+
*Note Defining Variables::, for more details.
110+
111+
Compiling For Multiple Architectures
112+
====================================
113+
114+
You can compile the package for more than one kind of computer at the
115+
same time, by placing the object files for each architecture in their
116+
own directory. To do this, you can use GNU `make'. `cd' to the
117+
directory where you want the object files and executables to go and run
118+
the `configure' script. `configure' automatically checks for the
119+
source code in the directory that `configure' is in and in `..'. This
120+
is known as a "VPATH" build.
121+
122+
With a non-GNU `make', it is safer to compile the package for one
123+
architecture at a time in the source code directory. After you have
124+
installed the package for one architecture, use `make distclean' before
125+
reconfiguring for another architecture.
126+
127+
On MacOS X 10.5 and later systems, you can create libraries and
128+
executables that work on multiple system types--known as "fat" or
129+
"universal" binaries--by specifying multiple `-arch' options to the
130+
compiler but only a single `-arch' option to the preprocessor. Like
131+
this:
132+
133+
./configure CC="gcc -arch i386 -arch x86_64 -arch ppc -arch ppc64" \
134+
CXX="g++ -arch i386 -arch x86_64 -arch ppc -arch ppc64" \
135+
CPP="gcc -E" CXXCPP="g++ -E"
136+
137+
This is not guaranteed to produce working output in all cases, you
138+
may have to build one architecture at a time and combine the results
139+
using the `lipo' tool if you have problems.
140+
141+
Installation Names
142+
==================
143+
144+
By default, `make install' installs the package's commands under
145+
`/usr/local/bin', include files under `/usr/local/include', etc. You
146+
can specify an installation prefix other than `/usr/local' by giving
147+
`configure' the option `--prefix=PREFIX', where PREFIX must be an
148+
absolute file name.
149+
150+
You can specify separate installation prefixes for
151+
architecture-specific files and architecture-independent files. If you
152+
pass the option `--exec-prefix=PREFIX' to `configure', the package uses
153+
PREFIX as the prefix for installing programs and libraries.
154+
Documentation and other data files still use the regular prefix.
155+
156+
In addition, if you use an unusual directory layout you can give
157+
options like `--bindir=DIR' to specify different values for particular
158+
kinds of files. Run `configure --help' for a list of the directories
159+
you can set and what kinds of files go in them. In general, the
160+
default for these options is expressed in terms of `${prefix}', so that
161+
specifying just `--prefix' will affect all of the other directory
162+
specifications that were not explicitly provided.
163+
164+
The most portable way to affect installation locations is to pass the
165+
correct locations to `configure'; however, many packages provide one or
166+
both of the following shortcuts of passing variable assignments to the
167+
`make install' command line to change installation locations without
168+
having to reconfigure or recompile.
169+
170+
The first method involves providing an override variable for each
171+
affected directory. For example, `make install
172+
prefix=/alternate/directory' will choose an alternate location for all
173+
directory configuration variables that were expressed in terms of
174+
`${prefix}'. Any directories that were specified during `configure',
175+
but not in terms of `${prefix}', must each be overridden at install
176+
time for the entire installation to be relocated. The approach of
177+
makefile variable overrides for each directory variable is required by
178+
the GNU Coding Standards, and ideally causes no recompilation.
179+
However, some platforms have known limitations with the semantics of
180+
shared libraries that end up requiring recompilation when using this
181+
method, particularly noticeable in packages that use GNU Libtool.
182+
183+
The second method involves providing the `DESTDIR' variable. For
184+
example, `make install DESTDIR=/alternate/directory' will prepend
185+
`/alternate/directory' before all installation names. The approach of
186+
`DESTDIR' overrides is not required by the GNU Coding Standards, and
187+
does not work on platforms that have drive letters. On the other hand,
188+
it does better at avoiding recompilation issues, and works well even
189+
when some directory options were not specified in terms of `${prefix}'
190+
at `configure' time.
191+
192+
Optional Features
193+
=================
194+
195+
If the package supports it, you can cause programs to be installed
196+
with an extra prefix or suffix on their names by giving `configure' the
197+
option `--program-prefix=PREFIX' or `--program-suffix=SUFFIX'.
198+
199+
Some packages pay attention to `--enable-FEATURE' options to
200+
`configure', where FEATURE indicates an optional part of the package.
201+
They may also pay attention to `--with-PACKAGE' options, where PACKAGE
202+
is something like `gnu-as' or `x' (for the X Window System). The
203+
`README' should mention any `--enable-' and `--with-' options that the
204+
package recognizes.
205+
206+
For packages that use the X Window System, `configure' can usually
207+
find the X include and library files automatically, but if it doesn't,
208+
you can use the `configure' options `--x-includes=DIR' and
209+
`--x-libraries=DIR' to specify their locations.
210+
211+
Some packages offer the ability to configure how verbose the
212+
execution of `make' will be. For these packages, running `./configure
213+
--enable-silent-rules' sets the default to minimal output, which can be
214+
overridden with `make V=1'; while running `./configure
215+
--disable-silent-rules' sets the default to verbose, which can be
216+
overridden with `make V=0'.
217+
218+
Particular systems
219+
==================
220+
221+
On HP-UX, the default C compiler is not ANSI C compatible. If GNU
222+
CC is not installed, it is recommended to use the following options in
223+
order to use an ANSI C compiler:
224+
225+
./configure CC="cc -Ae -D_XOPEN_SOURCE=500"
226+
227+
and if that doesn't work, install pre-built binaries of GCC for HP-UX.
228+
229+
On OSF/1 a.k.a. Tru64, some versions of the default C compiler cannot
230+
parse its `<wchar.h>' header file. The option `-nodtk' can be used as
231+
a workaround. If GNU CC is not installed, it is therefore recommended
232+
to try
233+
234+
./configure CC="cc"
235+
236+
and if that doesn't work, try
237+
238+
./configure CC="cc -nodtk"
239+
240+
On Solaris, don't put `/usr/ucb' early in your `PATH'. This
241+
directory contains several dysfunctional programs; working variants of
242+
these programs are available in `/usr/bin'. So, if you need `/usr/ucb'
243+
in your `PATH', put it _after_ `/usr/bin'.
244+
245+
On Haiku, software installed for all users goes in `/boot/common',
246+
not `/usr/local'. It is recommended to use the following options:
247+
248+
./configure --prefix=/boot/common
249+
250+
Specifying the System Type
251+
==========================
252+
253+
There may be some features `configure' cannot figure out
254+
automatically, but needs to determine by the type of machine the package
255+
will run on. Usually, assuming the package is built to be run on the
256+
_same_ architectures, `configure' can figure that out, but if it prints
257+
a message saying it cannot guess the machine type, give it the
258+
`--build=TYPE' option. TYPE can either be a short name for the system
259+
type, such as `sun4', or a canonical name which has the form:
260+
261+
CPU-COMPANY-SYSTEM
262+
263+
where SYSTEM can have one of these forms:
264+
265+
OS
266+
KERNEL-OS
267+
268+
See the file `config.sub' for the possible values of each field. If
269+
`config.sub' isn't included in this package, then this package doesn't
270+
need to know the machine type.
271+
272+
If you are _building_ compiler tools for cross-compiling, you should
273+
use the option `--target=TYPE' to select the type of system they will
274+
produce code for.
275+
276+
If you want to _use_ a cross compiler, that generates code for a
277+
platform different from the build platform, you should specify the
278+
"host" platform (i.e., that on which the generated programs will
279+
eventually be run) with `--host=TYPE'.
280+
281+
Sharing Defaults
282+
================
283+
284+
If you want to set default values for `configure' scripts to share,
285+
you can create a site shell script called `config.site' that gives
286+
default values for variables like `CC', `cache_file', and `prefix'.
287+
`configure' looks for `PREFIX/share/config.site' if it exists, then
288+
`PREFIX/etc/config.site' if it exists. Or, you can set the
289+
`CONFIG_SITE' environment variable to the location of the site script.
290+
A warning: not all `configure' scripts look for a site script.
291+
292+
Defining Variables
293+
==================
294+
295+
Variables not defined in a site shell script can be set in the
296+
environment passed to `configure'. However, some packages may run
297+
configure again during the build, and the customized values of these
298+
variables may be lost. In order to avoid this problem, you should set
299+
them in the `configure' command line, using `VAR=value'. For example:
300+
301+
./configure CC=/usr/local2/bin/gcc
302+
303+
causes the specified `gcc' to be used as the C compiler (unless it is
304+
overridden in the site shell script).
305+
306+
Unfortunately, this technique does not work for `CONFIG_SHELL' due to
307+
an Autoconf bug. Until the bug is fixed you can use this workaround:
308+
309+
CONFIG_SHELL=/bin/bash /bin/bash ./configure CONFIG_SHELL=/bin/bash
310+
311+
`configure' Invocation
312+
======================
313+
314+
`configure' recognizes the following options to control how it
315+
operates.
316+
317+
`--help'
318+
`-h'
319+
Print a summary of all of the options to `configure', and exit.
320+
321+
`--help=short'
322+
`--help=recursive'
323+
Print a summary of the options unique to this package's
324+
`configure', and exit. The `short' variant lists options used
325+
only in the top level, while the `recursive' variant lists options
326+
also present in any nested packages.
327+
328+
`--version'
329+
`-V'
330+
Print the version of Autoconf used to generate the `configure'
331+
script, and exit.
332+
333+
`--cache-file=FILE'
334+
Enable the cache: use and save the results of the tests in FILE,
335+
traditionally `config.cache'. FILE defaults to `/dev/null' to
336+
disable caching.
337+
338+
`--config-cache'
339+
`-C'
340+
Alias for `--cache-file=config.cache'.
341+
342+
`--quiet'
343+
`--silent'
344+
`-q'
345+
Do not print messages saying which checks are being made. To
346+
suppress all normal output, redirect it to `/dev/null' (any error
347+
messages will still be shown).
348+
349+
`--srcdir=DIR'
350+
Look for the package's source code in directory DIR. Usually
351+
`configure' can determine that directory automatically.
352+
353+
`--prefix=DIR'
354+
Use DIR as the installation prefix. *note Installation Names::
355+
for more details, including other options available for fine-tuning
356+
the installation locations.
357+
358+
`--no-create'
359+
`-n'
360+
Run the configure checks, but stop before creating any output
361+
files.
362+
363+
`configure' also accepts some other, not widely useful, options. Run
364+
`configure --help' for more details.
365+

‎Makefile.am

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
2+
SUBDIRS=src

‎NEWS

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Sample NEWS file for qfs_mapred project.

‎README

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Sample readme file for qfs_mapred project.

‎configure.ac

+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
dnl Process this file with autoconf to produce a configure script.
2+
3+
AC_PREREQ(2.59)
4+
AC_INIT(test, 1.0)
5+
6+
7+
AC_CANONICAL_SYSTEM
8+
AM_INIT_AUTOMAKE()
9+
10+
AC_PROG_CXX
11+
12+
AC_CONFIG_FILES(Makefile src/Makefile)
13+
AC_OUTPUT
14+

‎data/pg4300.txt

+33,055
Large diffs are not rendered by default.

‎data/pg5000.txt

+32,117
Large diffs are not rendered by default.

‎scripts/mapper.py

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
#!/usr/bin/env python
2+
"""A more advanced Mapper, using Python iterators and generators."""
3+
4+
import sys
5+
import re
6+
7+
def read_input(file):
8+
for line in file:
9+
# split the line into words
10+
yield line.split()
11+
12+
def main(separator='\t'):
13+
# input comes from STDIN (standard input)
14+
data = read_input(sys.stdin)
15+
for words in data:
16+
# write the results to STDOUT (standard output);
17+
# what we output here will be the input for the
18+
# Reduce step, i.e. the input for reducer.py
19+
#
20+
# tab-delimited; the trivial word count is 1
21+
for word in words:
22+
23+
if re.match('^[A-Za-z]+$', word):
24+
word = word.strip();
25+
word = word.upper();
26+
print '%s%s%d' % (word, separator, 1)
27+
28+
if __name__ == "__main__":
29+
main()

‎scripts/reducer.py

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
#!/usr/bin/env python
2+
"""A more advanced Reducer, using Python iterators and generators."""
3+
4+
from itertools import groupby
5+
from operator import itemgetter
6+
import sys
7+
8+
def read_mapper_output(file, separator='\t'):
9+
for line in file:
10+
yield line.rstrip().split(separator, 1)
11+
12+
def main(separator='\t'):
13+
# input comes from STDIN (standard input)
14+
data = read_mapper_output(sys.stdin, separator=separator)
15+
# groupby groups multiple word-count pairs by word,
16+
# and creates an iterator that returns consecutive keys and their group:
17+
# current_word - string containing a word (the key)
18+
# group - iterator yielding all ["<current_word>", "<count>"] items
19+
for current_word, group in groupby(data, itemgetter(0)):
20+
try:
21+
total_count = sum(int(count) for current_word, count in group)
22+
print "%s%s%d" % (current_word, separator, total_count)
23+
except ValueError:
24+
# count was not a number, so silently discard this item
25+
pass
26+
27+
if __name__ == "__main__":
28+
main()

‎src/Makefile.am

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
AM_CPPFLAGS=-Wall -I/home/alex/qfs/build/release/include
2+
AM_LDFLAGS= -L/home/alex/qfs/build/release/lib -lboost_program_options -lgearman -lqfs_client -lboost_regex -lboost_thread -lpthread
3+
4+
bin_PROGRAMS=qfs_mapred_submit mapper_worker mapper_to_qfs_partitions sorter_worker reducer_worker kvsorter
5+
6+
qfs_mapred_submit_SOURCES=qfs_mapred_submit_main.cc
7+
mapper_worker_SOURCES=mapper_worker_main.cc
8+
mapper_to_qfs_partitions_SOURCES=mapper_to_qfs_partitions_main.cc
9+
sorter_worker_SOURCES=sorter_worker_main.cc
10+
kvsorter_SOURCES=kvsorter_main.cc
11+
reducer_worker_SOURCES=reducer_worker_main.cc

‎src/headers/kv_struct.h

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
/*
2+
* kv_struct.h
3+
*
4+
* Created on: Nov 22, 2012
5+
* Author: alex
6+
*/
7+
8+
#ifndef KV_STRUCT_H_
9+
#define KV_STRUCT_H_
10+
11+
struct kv{
12+
std::string key;
13+
std::string value;
14+
};
15+
#endif /* KV_STRUCT_H_ */

‎src/headers/mapper_data_struct.h

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
/*
2+
* mapper_data_struct.h
3+
*
4+
* Created on: Nov 23, 2012
5+
* Author: alex
6+
*/
7+
8+
#ifndef MAPPER_DATA_STRUCT_H_
9+
#define MAPPER_DATA_STRUCT_H_
10+
11+
struct mapper_data{
12+
int partition;
13+
std::string data;
14+
};
15+
16+
17+
#endif /* MAPPER_DATA_STRUCT_H_ */

‎src/headers/workloads.h

+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* workloads.h
3+
*
4+
* Created on: Nov 17, 2012
5+
* Author: alex
6+
*/
7+
8+
#ifndef WORKLOADS_H_
9+
#define WORKLOADS_H_
10+
11+
#include "json_spirit_headers/json_spirit_value.h"
12+
13+
struct Mapper_Workload
14+
{
15+
std::string job_id;
16+
std::string qfs_meta_server_name;
17+
unsigned short qfs_meta_server_port;
18+
std::string qfs_file_input;
19+
std::string qfs_map_folder;
20+
std::string python_mapper_function;
21+
int partition_count;
22+
};
23+
24+
struct Sorter_Workload
25+
{
26+
std::string qfs_meta_server_name;
27+
unsigned short qfs_meta_server_port;
28+
std::string qfs_partition_file_input;
29+
std::string qfs_partition_file_output;
30+
};
31+
32+
struct Reducer_Workload
33+
{
34+
std::string qfs_meta_server_name;
35+
unsigned short qfs_meta_server_port;
36+
json_spirit::Array qfs_sorted_partition_files;
37+
std::string qfs_output_file;
38+
std::string python_reducer_function;
39+
};
40+
41+
#endif /* WORKLOADS_H_ */
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
#ifndef JSON_SPIRIT_ERROR_POSITION
2+
#define JSON_SPIRIT_ERROR_POSITION
3+
4+
// Copyright John W. Wilkinson 2007 - 2011
5+
// Distributed under the MIT License, see accompanying file LICENSE.txt
6+
7+
// json spirit version 4.05
8+
9+
#if defined(_MSC_VER) && (_MSC_VER >= 1020)
10+
# pragma once
11+
#endif
12+
13+
#include <string>
14+
15+
namespace json_spirit
16+
{
17+
// An Error_position exception is thrown by the "read_or_throw" functions below on finding an error.
18+
// Note the "read_or_throw" functions are around 3 times slower than the standard functions "read"
19+
// functions that return a bool.
20+
//
21+
struct Error_position
22+
{
23+
Error_position();
24+
Error_position( unsigned int line, unsigned int column, const std::string& reason );
25+
bool operator==( const Error_position& lhs ) const;
26+
unsigned int line_;
27+
unsigned int column_;
28+
std::string reason_;
29+
};
30+
31+
inline Error_position::Error_position()
32+
: line_( 0 )
33+
, column_( 0 )
34+
{
35+
}
36+
37+
inline Error_position::Error_position( unsigned int line, unsigned int column, const std::string& reason )
38+
: line_( line )
39+
, column_( column )
40+
, reason_( reason )
41+
{
42+
}
43+
44+
inline bool Error_position::operator==( const Error_position& lhs ) const
45+
{
46+
if( this == &lhs ) return true;
47+
48+
return ( reason_ == lhs.reason_ ) &&
49+
( line_ == lhs.line_ ) &&
50+
( column_ == lhs.column_ );
51+
}
52+
}
53+
54+
#endif
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
// Copyright John W. Wilkinson 2007 - 2011
2+
// Distributed under the MIT License, see accompanying file LICENSE.txt
3+
4+
// json spirit version 4.05
5+
6+
#include "json_spirit_reader.h"
7+
#include "json_spirit_reader_template.h"
8+
9+
using namespace json_spirit;
10+
11+
#ifdef JSON_SPIRIT_VALUE_ENABLED
12+
bool json_spirit::read( const std::string& s, Value& value )
13+
{
14+
return read_string( s, value );
15+
}
16+
17+
void json_spirit::read_or_throw( const std::string& s, Value& value )
18+
{
19+
read_string_or_throw( s, value );
20+
}
21+
22+
bool json_spirit::read( std::istream& is, Value& value )
23+
{
24+
return read_stream( is, value );
25+
}
26+
27+
void json_spirit::read_or_throw( std::istream& is, Value& value )
28+
{
29+
read_stream_or_throw( is, value );
30+
}
31+
32+
bool json_spirit::read( std::string::const_iterator& begin, std::string::const_iterator end, Value& value )
33+
{
34+
return read_range( begin, end, value );
35+
}
36+
37+
void json_spirit::read_or_throw( std::string::const_iterator& begin, std::string::const_iterator end, Value& value )
38+
{
39+
begin = read_range_or_throw( begin, end, value );
40+
}
41+
#endif
42+
43+
#if defined( JSON_SPIRIT_WVALUE_ENABLED ) && !defined( BOOST_NO_STD_WSTRING )
44+
bool json_spirit::read( const std::wstring& s, wValue& value )
45+
{
46+
return read_string( s, value );
47+
}
48+
49+
void json_spirit::read_or_throw( const std::wstring& s, wValue& value )
50+
{
51+
read_string_or_throw( s, value );
52+
}
53+
54+
bool json_spirit::read( std::wistream& is, wValue& value )
55+
{
56+
return read_stream( is, value );
57+
}
58+
59+
void json_spirit::read_or_throw( std::wistream& is, wValue& value )
60+
{
61+
read_stream_or_throw( is, value );
62+
}
63+
64+
bool json_spirit::read( std::wstring::const_iterator& begin, std::wstring::const_iterator end, wValue& value )
65+
{
66+
return read_range( begin, end, value );
67+
}
68+
69+
void json_spirit::read_or_throw( std::wstring::const_iterator& begin, std::wstring::const_iterator end, wValue& value )
70+
{
71+
begin = read_range_or_throw( begin, end, value );
72+
}
73+
#endif
74+
75+
#ifdef JSON_SPIRIT_MVALUE_ENABLED
76+
bool json_spirit::read( const std::string& s, mValue& value )
77+
{
78+
return read_string( s, value );
79+
}
80+
81+
void json_spirit::read_or_throw( const std::string& s, mValue& value )
82+
{
83+
read_string_or_throw( s, value );
84+
}
85+
86+
bool json_spirit::read( std::istream& is, mValue& value )
87+
{
88+
return read_stream( is, value );
89+
}
90+
91+
void json_spirit::read_or_throw( std::istream& is, mValue& value )
92+
{
93+
read_stream_or_throw( is, value );
94+
}
95+
96+
bool json_spirit::read( std::string::const_iterator& begin, std::string::const_iterator end, mValue& value )
97+
{
98+
return read_range( begin, end, value );
99+
}
100+
101+
void json_spirit::read_or_throw( std::string::const_iterator& begin, std::string::const_iterator end, mValue& value )
102+
{
103+
begin = read_range_or_throw( begin, end, value );
104+
}
105+
#endif
106+
107+
#if defined( JSON_SPIRIT_WMVALUE_ENABLED ) && !defined( BOOST_NO_STD_WSTRING )
108+
bool json_spirit::read( const std::wstring& s, wmValue& value )
109+
{
110+
return read_string( s, value );
111+
}
112+
113+
void json_spirit::read_or_throw( const std::wstring& s, wmValue& value )
114+
{
115+
read_string_or_throw( s, value );
116+
}
117+
118+
bool json_spirit::read( std::wistream& is, wmValue& value )
119+
{
120+
return read_stream( is, value );
121+
}
122+
123+
void json_spirit::read_or_throw( std::wistream& is, wmValue& value )
124+
{
125+
read_stream_or_throw( is, value );
126+
}
127+
128+
bool json_spirit::read( std::wstring::const_iterator& begin, std::wstring::const_iterator end, wmValue& value )
129+
{
130+
return read_range( begin, end, value );
131+
}
132+
133+
void json_spirit::read_or_throw( std::wstring::const_iterator& begin, std::wstring::const_iterator end, wmValue& value )
134+
{
135+
begin = read_range_or_throw( begin, end, value );
136+
}
137+
#endif
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
#ifndef JSON_SPIRIT_READER
2+
#define JSON_SPIRIT_READER
3+
4+
// Copyright John W. Wilkinson 2007 - 2011
5+
// Distributed under the MIT License, see accompanying file LICENSE.txt
6+
7+
// json spirit version 4.05
8+
9+
#if defined(_MSC_VER) && (_MSC_VER >= 1020)
10+
# pragma once
11+
#endif
12+
13+
#include "json_spirit_value.h"
14+
#include "json_spirit_error_position.h"
15+
#include <iostream>
16+
17+
namespace json_spirit
18+
{
19+
// functions to reads a JSON values
20+
21+
#ifdef JSON_SPIRIT_VALUE_ENABLED
22+
bool read( const std::string& s, Value& value );
23+
bool read( std::istream& is, Value& value );
24+
bool read( std::string::const_iterator& begin, std::string::const_iterator end, Value& value );
25+
26+
void read_or_throw( const std::string& s, Value& value );
27+
void read_or_throw( std::istream& is, Value& value );
28+
void read_or_throw( std::string::const_iterator& begin, std::string::const_iterator end, Value& value );
29+
#endif
30+
31+
#if defined( JSON_SPIRIT_WVALUE_ENABLED ) && !defined( BOOST_NO_STD_WSTRING )
32+
bool read( const std::wstring& s, wValue& value );
33+
bool read( std::wistream& is, wValue& value );
34+
bool read( std::wstring::const_iterator& begin, std::wstring::const_iterator end, wValue& value );
35+
36+
void read_or_throw( const std::wstring& s, wValue& value );
37+
void read_or_throw( std::wistream& is, wValue& value );
38+
void read_or_throw( std::wstring::const_iterator& begin, std::wstring::const_iterator end, wValue& value );
39+
#endif
40+
41+
#ifdef JSON_SPIRIT_MVALUE_ENABLED
42+
bool read( const std::string& s, mValue& value );
43+
bool read( std::istream& is, mValue& value );
44+
bool read( std::string::const_iterator& begin, std::string::const_iterator end, mValue& value );
45+
46+
void read_or_throw( const std::string& s, mValue& value );
47+
void read_or_throw( std::istream& is, mValue& value );
48+
void read_or_throw( std::string::const_iterator& begin, std::string::const_iterator end, mValue& value );
49+
#endif
50+
51+
#if defined( JSON_SPIRIT_WMVALUE_ENABLED ) && !defined( BOOST_NO_STD_WSTRING )
52+
bool read( const std::wstring& s, wmValue& value );
53+
bool read( std::wistream& is, wmValue& value );
54+
bool read( std::wstring::const_iterator& begin, std::wstring::const_iterator end, wmValue& value );
55+
56+
void read_or_throw( const std::wstring& s, wmValue& value );
57+
void read_or_throw( std::wistream& is, wmValue& value );
58+
void read_or_throw( std::wstring::const_iterator& begin, std::wstring::const_iterator end, wmValue& value );
59+
#endif
60+
}
61+
62+
#endif

‎src/json_spirit_headers/json_spirit_reader_template.h

+648
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
#ifndef JSON_SPIRIT_READ_STREAM
2+
#define JSON_SPIRIT_READ_STREAM
3+
4+
// Copyright John W. Wilkinson 2007 - 2011
5+
// Distributed under the MIT License, see accompanying file LICENSE.txt
6+
7+
// json spirit version 4.05
8+
9+
#if defined(_MSC_VER) && (_MSC_VER >= 1020)
10+
# pragma once
11+
#endif
12+
13+
#include "json_spirit_reader_template.h"
14+
15+
namespace json_spirit
16+
{
17+
// these classes allows you to read multiple top level contiguous values from a stream,
18+
// the normal stream read functions have a bug that prevent multiple top level values
19+
// from being read unless they are separated by spaces
20+
21+
template< class Istream_type, class Value_type >
22+
class Stream_reader
23+
{
24+
public:
25+
26+
Stream_reader( Istream_type& is )
27+
: iters_( is )
28+
{
29+
}
30+
31+
bool read_next( Value_type& value )
32+
{
33+
return read_range( iters_.begin_, iters_.end_, value );
34+
}
35+
36+
private:
37+
38+
typedef Multi_pass_iters< Istream_type > Mp_iters;
39+
40+
Mp_iters iters_;
41+
};
42+
43+
template< class Istream_type, class Value_type >
44+
class Stream_reader_thrower
45+
{
46+
public:
47+
48+
Stream_reader_thrower( Istream_type& is )
49+
: iters_( is )
50+
, posn_begin_( iters_.begin_, iters_.end_ )
51+
, posn_end_( iters_.end_, iters_.end_ )
52+
{
53+
}
54+
55+
void read_next( Value_type& value )
56+
{
57+
posn_begin_ = read_range_or_throw( posn_begin_, posn_end_, value );
58+
}
59+
60+
private:
61+
62+
typedef Multi_pass_iters< Istream_type > Mp_iters;
63+
typedef spirit_namespace::position_iterator< typename Mp_iters::Mp_iter > Posn_iter_t;
64+
65+
Mp_iters iters_;
66+
Posn_iter_t posn_begin_, posn_end_;
67+
};
68+
}
69+
70+
#endif
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
#ifndef JSON_SPIRIT_UTILS
2+
#define JSON_SPIRIT_UTILS
3+
4+
// Copyright John W. Wilkinson 2007 - 2011
5+
// Distributed under the MIT License, see accompanying file LICENSE.txt
6+
7+
// json spirit version 4.05
8+
9+
#if defined(_MSC_VER) && (_MSC_VER >= 1020)
10+
# pragma once
11+
#endif
12+
13+
#include "json_spirit_value.h"
14+
#include <map>
15+
16+
namespace json_spirit
17+
{
18+
template< class Obj_t, class Map_t >
19+
void obj_to_map( const Obj_t& obj, Map_t& mp_obj )
20+
{
21+
mp_obj.clear();
22+
23+
for( typename Obj_t::const_iterator i = obj.begin(); i != obj.end(); ++i )
24+
{
25+
mp_obj[ i->name_ ] = i->value_;
26+
}
27+
}
28+
29+
template< class Obj_t, class Map_t >
30+
void map_to_obj( const Map_t& mp_obj, Obj_t& obj )
31+
{
32+
obj.clear();
33+
34+
for( typename Map_t::const_iterator i = mp_obj.begin(); i != mp_obj.end(); ++i )
35+
{
36+
obj.push_back( typename Obj_t::value_type( i->first, i->second ) );
37+
}
38+
}
39+
40+
#ifdef JSON_SPIRIT_VALUE_ENABLED
41+
typedef std::map< std::string, Value > Mapped_obj;
42+
#endif
43+
44+
#if defined( JSON_SPIRIT_WVALUE_ENABLED ) && !defined( BOOST_NO_STD_WSTRING )
45+
typedef std::map< std::wstring, wValue > wMapped_obj;
46+
#endif
47+
48+
template< class Object_type, class String_type >
49+
const typename Object_type::value_type::Value_type& find_value( const Object_type& obj, const String_type& name )
50+
{
51+
for( typename Object_type::const_iterator i = obj.begin(); i != obj.end(); ++i )
52+
{
53+
if( i->name_ == name )
54+
{
55+
return i->value_;
56+
}
57+
}
58+
59+
return Object_type::value_type::Value_type::null;
60+
}
61+
}
62+
63+
#endif
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
/* Copyright (c) 2007 John W Wilkinson
2+
3+
This source code can be used for any purpose as long as
4+
this comment is retained. */
5+
6+
// json spirit version 2.00
7+
8+
#include "json_spirit_value.h"

‎src/json_spirit_headers/json_spirit_value.h

+585
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
// Copyright John W. Wilkinson 2007 - 2011
2+
// Distributed under the MIT License, see accompanying file LICENSE.txt
3+
4+
// json spirit version 4.05
5+
6+
#include "json_spirit_writer.h"
7+
#include "json_spirit_writer_template.h"
8+
9+
using namespace json_spirit;
10+
11+
#ifdef JSON_SPIRIT_VALUE_ENABLED
12+
void json_spirit::write( const Value& value, std::ostream& os, unsigned int options )
13+
{
14+
write_stream( value, os, options );
15+
}
16+
std::string json_spirit::write( const Value& value, unsigned int options )
17+
{
18+
return write_string( value, options );
19+
}
20+
21+
void json_spirit::write_formatted( const Value& value, std::ostream& os )
22+
{
23+
write_stream( value, os, pretty_print );
24+
}
25+
26+
std::string json_spirit::write_formatted( const Value& value )
27+
{
28+
return write_string( value, pretty_print );
29+
}
30+
#endif
31+
32+
#ifdef JSON_SPIRIT_MVALUE_ENABLED
33+
void json_spirit::write( const mValue& value, std::ostream& os, unsigned int options )
34+
{
35+
write_stream( value, os, options );
36+
}
37+
38+
std::string json_spirit::write( const mValue& value, unsigned int options )
39+
{
40+
return write_string( value, options );
41+
}
42+
43+
void json_spirit::write_formatted( const mValue& value, std::ostream& os )
44+
{
45+
write_stream( value, os, pretty_print );
46+
}
47+
48+
std::string json_spirit::write_formatted( const mValue& value )
49+
{
50+
return write_string( value, pretty_print );
51+
}
52+
#endif
53+
54+
#if defined( JSON_SPIRIT_WVALUE_ENABLED ) && !defined( BOOST_NO_STD_WSTRING )
55+
void json_spirit::write( const wValue& value, std::wostream& os, unsigned int options )
56+
{
57+
write_stream( value, os, options );
58+
}
59+
60+
std::wstring json_spirit::write( const wValue& value, unsigned int options )
61+
{
62+
return write_string( value, options );
63+
}
64+
65+
void json_spirit::write_formatted( const wValue& value, std::wostream& os )
66+
{
67+
write_stream( value, os, pretty_print );
68+
}
69+
70+
std::wstring json_spirit::write_formatted( const wValue& value )
71+
{
72+
return write_string( value, pretty_print );
73+
}
74+
#endif
75+
76+
#if defined( JSON_SPIRIT_WMVALUE_ENABLED ) && !defined( BOOST_NO_STD_WSTRING )
77+
void json_spirit::write_formatted( const wmValue& value, std::wostream& os )
78+
{
79+
write_stream( value, os, pretty_print );
80+
}
81+
82+
std::wstring json_spirit::write_formatted( const wmValue& value )
83+
{
84+
return write_string( value, pretty_print );
85+
}
86+
87+
void json_spirit::write( const wmValue& value, std::wostream& os, unsigned int options )
88+
{
89+
write_stream( value, os, options );
90+
}
91+
92+
std::wstring json_spirit::write( const wmValue& value, unsigned int options )
93+
{
94+
return write_string( value, options );
95+
}
96+
#endif
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
#ifndef JSON_SPIRIT_WRITER
2+
#define JSON_SPIRIT_WRITER
3+
4+
// Copyright John W. Wilkinson 2007 - 2011
5+
// Distributed under the MIT License, see accompanying file LICENSE.txt
6+
7+
// json spirit version 4.05
8+
9+
#if defined(_MSC_VER) && (_MSC_VER >= 1020)
10+
# pragma once
11+
#endif
12+
13+
#include "json_spirit_value.h"
14+
#include "json_spirit_writer_options.h"
15+
#include <iostream>
16+
17+
namespace json_spirit
18+
{
19+
// these functions to convert JSON Values to text
20+
21+
#ifdef JSON_SPIRIT_VALUE_ENABLED
22+
void write( const Value& value, std::ostream& os, unsigned int options = 0 );
23+
std::string write( const Value& value, unsigned int options = 0 );
24+
#endif
25+
26+
#ifdef JSON_SPIRIT_MVALUE_ENABLED
27+
void write( const mValue& value, std::ostream& os, unsigned int options = 0 );
28+
std::string write( const mValue& value, unsigned int options = 0 );
29+
#endif
30+
31+
#if defined( JSON_SPIRIT_WVALUE_ENABLED ) && !defined( BOOST_NO_STD_WSTRING )
32+
void write( const wValue& value, std::wostream& os, unsigned int options = 0 );
33+
std::wstring write( const wValue& value, unsigned int options = 0 );
34+
#endif
35+
36+
#if defined( JSON_SPIRIT_WMVALUE_ENABLED ) && !defined( BOOST_NO_STD_WSTRING )
37+
void write( const wmValue& value, std::wostream& os, unsigned int options = 0 );
38+
std::wstring write( const wmValue& value, unsigned int options = 0 );
39+
#endif
40+
41+
// these "formatted" versions of the "write" functions are the equivalent of the above functions
42+
// with option "pretty_print"
43+
44+
#ifdef JSON_SPIRIT_VALUE_ENABLED
45+
void write_formatted( const Value& value, std::ostream& os );
46+
std::string write_formatted( const Value& value );
47+
#endif
48+
#ifdef JSON_SPIRIT_MVALUE_ENABLED
49+
void write_formatted( const mValue& value, std::ostream& os );
50+
std::string write_formatted( const mValue& value );
51+
#endif
52+
53+
#if defined( JSON_SPIRIT_WVALUE_ENABLED ) && !defined( BOOST_NO_STD_WSTRING )
54+
void write_formatted( const wValue& value, std::wostream& os );
55+
std::wstring write_formatted( const wValue& value );
56+
#endif
57+
#if defined( JSON_SPIRIT_WMVALUE_ENABLED ) && !defined( BOOST_NO_STD_WSTRING )
58+
void write_formatted( const wmValue& value, std::wostream& os );
59+
std::wstring write_formatted( const wmValue& value );
60+
#endif
61+
}
62+
63+
#endif
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
#ifndef JSON_SPIRIT_WRITER_OPTIONS
2+
#define JSON_SPIRIT_WRITER_OPTIONS
3+
4+
// Copyright John W. Wilkinson 2007 - 2011
5+
// Distributed under the MIT License, see accompanying file LICENSE.txt
6+
7+
// json spirit version 4.05
8+
9+
#if defined(_MSC_VER) && (_MSC_VER >= 1020)
10+
# pragma once
11+
#endif
12+
13+
namespace json_spirit
14+
{
15+
enum Output_options{ pretty_print = 0x01, // Add whitespace to format the output nicely.
16+
17+
raw_utf8 = 0x02, // This prevents non-printable characters from being escapted using "\uNNNN" notation.
18+
// Note, this is an extension to the JSON standard. It disables the escaping of
19+
// non-printable characters allowing UTF-8 sequences held in 8 bit char strings
20+
// to pass through unaltered.
21+
22+
remove_trailing_zeros = 0x04,
23+
// outputs e.g. "1.200000000000000" as "1.2"
24+
single_line_arrays = 0x08,
25+
// pretty printing except that arrays printed on single lines unless they contain
26+
// composite elements, i.e. objects or arrays
27+
};
28+
}
29+
30+
#endif
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,385 @@
1+
#ifndef JSON_SPIRIT_WRITER_TEMPLATE
2+
#define JSON_SPIRIT_WRITER_TEMPLATE
3+
4+
// Copyright John W. Wilkinson 2007 - 2011
5+
// Distributed under the MIT License, see accompanying file LICENSE.txt
6+
7+
// json spirit version 4.05
8+
9+
#if defined(_MSC_VER) && (_MSC_VER >= 1020)
10+
# pragma once
11+
#endif
12+
13+
#include "json_spirit_value.h"
14+
#include "json_spirit_writer_options.h"
15+
16+
#include <cassert>
17+
#include <sstream>
18+
#include <iomanip>
19+
#include <boost/io/ios_state.hpp>
20+
21+
namespace json_spirit
22+
{
23+
inline char to_hex_char( unsigned int c )
24+
{
25+
assert( c <= 0xF );
26+
27+
const char ch = static_cast< char >( c );
28+
29+
if( ch < 10 ) return '0' + ch;
30+
31+
return 'A' - 10 + ch;
32+
}
33+
34+
template< class String_type >
35+
String_type non_printable_to_string( unsigned int c )
36+
{
37+
typedef typename String_type::value_type Char_type;
38+
39+
String_type result( 6, '\\' );
40+
41+
result[1] = 'u';
42+
43+
result[ 5 ] = to_hex_char( c & 0x000F ); c >>= 4;
44+
result[ 4 ] = to_hex_char( c & 0x000F ); c >>= 4;
45+
result[ 3 ] = to_hex_char( c & 0x000F ); c >>= 4;
46+
result[ 2 ] = to_hex_char( c & 0x000F );
47+
48+
return result;
49+
}
50+
51+
template< typename Char_type, class String_type >
52+
bool add_esc_char( Char_type c, String_type& s )
53+
{
54+
switch( c )
55+
{
56+
case '"': s += to_str< String_type >( "\\\"" ); return true;
57+
case '\\': s += to_str< String_type >( "\\\\" ); return true;
58+
case '\b': s += to_str< String_type >( "\\b" ); return true;
59+
case '\f': s += to_str< String_type >( "\\f" ); return true;
60+
case '\n': s += to_str< String_type >( "\\n" ); return true;
61+
case '\r': s += to_str< String_type >( "\\r" ); return true;
62+
case '\t': s += to_str< String_type >( "\\t" ); return true;
63+
}
64+
65+
return false;
66+
}
67+
68+
template< class String_type >
69+
String_type add_esc_chars( const String_type& s, bool raw_utf8 )
70+
{
71+
typedef typename String_type::const_iterator Iter_type;
72+
typedef typename String_type::value_type Char_type;
73+
74+
String_type result;
75+
76+
const Iter_type end( s.end() );
77+
78+
for( Iter_type i = s.begin(); i != end; ++i )
79+
{
80+
const Char_type c( *i );
81+
82+
if( add_esc_char( c, result ) ) continue;
83+
84+
if( raw_utf8 )
85+
{
86+
result += c;
87+
}
88+
else
89+
{
90+
const wint_t unsigned_c( ( c >= 0 ) ? c : 256 + c );
91+
92+
if( iswprint( unsigned_c ) )
93+
{
94+
result += c;
95+
}
96+
else
97+
{
98+
result += non_printable_to_string< String_type >( unsigned_c );
99+
}
100+
}
101+
}
102+
103+
return result;
104+
}
105+
106+
template< class Ostream >
107+
void append_double( Ostream& os, const double d, const int precision )
108+
{
109+
os << std::showpoint << std::setprecision( precision ) << d;
110+
}
111+
112+
template< class String_type >
113+
void erase_and_extract_exponent( String_type& str, String_type& exp )
114+
{
115+
const typename String_type::size_type exp_start= str.find( 'e' );
116+
117+
if( exp_start != String_type::npos )
118+
{
119+
exp = str.substr( exp_start );
120+
str.erase( exp_start );
121+
}
122+
}
123+
124+
template< class String_type >
125+
typename String_type::size_type find_first_non_zero( const String_type& str )
126+
{
127+
typename String_type::size_type result = str.size() - 1;
128+
129+
for( ; result != 0; --result )
130+
{
131+
if( str[ result ] != '0' )
132+
{
133+
break;
134+
}
135+
}
136+
137+
return result;
138+
}
139+
140+
template< class String_type >
141+
void remove_trailing( String_type& str )
142+
{
143+
String_type exp;
144+
145+
erase_and_extract_exponent( str, exp );
146+
147+
const typename String_type::size_type first_non_zero = find_first_non_zero( str );
148+
149+
if( first_non_zero != 0 )
150+
{
151+
const int offset = str[first_non_zero] == '.' ? 2 : 1; // note zero digits following a decimal point is non standard
152+
str.erase( first_non_zero + offset );
153+
}
154+
155+
str += exp;
156+
}
157+
158+
// this class generates the JSON text,
159+
// it keeps track of the indentation level etc.
160+
//
161+
template< class Value_type, class Ostream_type >
162+
class Generator
163+
{
164+
typedef typename Value_type::Config_type Config_type;
165+
typedef typename Config_type::String_type String_type;
166+
typedef typename Config_type::Object_type Object_type;
167+
typedef typename Config_type::Array_type Array_type;
168+
typedef typename String_type::value_type Char_type;
169+
typedef typename Object_type::value_type Obj_member_type;
170+
171+
public:
172+
173+
Generator( const Value_type& value, Ostream_type& os, unsigned int options )
174+
: os_( os )
175+
, indentation_level_( 0 )
176+
, pretty_( ( options & pretty_print ) != 0 || ( options & single_line_arrays ) != 0 )
177+
, raw_utf8_( ( options & raw_utf8 ) != 0 )
178+
, remove_trailing_zeros_( ( options & remove_trailing_zeros ) != 0 )
179+
, single_line_arrays_( ( options & single_line_arrays ) != 0 )
180+
, ios_saver_( os )
181+
{
182+
output( value );
183+
}
184+
185+
private:
186+
187+
void output( const Value_type& value )
188+
{
189+
switch( value.type() )
190+
{
191+
case obj_type: output( value.get_obj() ); break;
192+
case array_type: output( value.get_array() ); break;
193+
case str_type: output( value.get_str() ); break;
194+
case bool_type: output( value.get_bool() ); break;
195+
case real_type: output( value.get_real() ); break;
196+
case int_type: output_int( value ); break;
197+
case null_type: os_ << "null"; break;
198+
default: assert( false );
199+
}
200+
}
201+
202+
void output( const Object_type& obj )
203+
{
204+
output_array_or_obj( obj, '{', '}' );
205+
}
206+
207+
void output( const Obj_member_type& member )
208+
{
209+
output( Config_type::get_name( member ) ); space();
210+
os_ << ':'; space();
211+
output( Config_type::get_value( member ) );
212+
}
213+
214+
void output_int( const Value_type& value )
215+
{
216+
if( value.is_uint64() )
217+
{
218+
os_ << value.get_uint64();
219+
}
220+
else
221+
{
222+
os_ << value.get_int64();
223+
}
224+
}
225+
226+
void output( const String_type& s )
227+
{
228+
os_ << '"' << add_esc_chars( s, raw_utf8_ ) << '"';
229+
}
230+
231+
void output( bool b )
232+
{
233+
os_ << to_str< String_type >( b ? "true" : "false" );
234+
}
235+
236+
void output( double d )
237+
{
238+
if( remove_trailing_zeros_ )
239+
{
240+
std::basic_ostringstream< Char_type > os;
241+
242+
append_double( os, d, 16 ); // note precision is 16 so that we get some trailing space that we can remove,
243+
// otherwise, 0.1234 gets converted to "0.12399999..."
244+
245+
String_type str = os.str();
246+
247+
remove_trailing( str );
248+
249+
os_ << str;
250+
}
251+
else
252+
{
253+
append_double( os_, d, 17 );
254+
}
255+
}
256+
257+
static bool contains_composite_elements( const Array_type& arr )
258+
{
259+
for( typename Array_type::const_iterator i = arr.begin(); i != arr.end(); ++i )
260+
{
261+
const Value_type& val = *i;
262+
263+
if( val.type() == obj_type ||
264+
val.type() == array_type )
265+
{
266+
return true;
267+
}
268+
}
269+
270+
return false;
271+
}
272+
273+
template< class Iter >
274+
void output_composite_item( Iter i, Iter last )
275+
{
276+
output( *i );
277+
278+
if( ++i != last )
279+
{
280+
os_ << ',';
281+
}
282+
}
283+
284+
void output( const Array_type& arr )
285+
{
286+
if( single_line_arrays_ && !contains_composite_elements( arr ) )
287+
{
288+
os_ << '['; space();
289+
290+
for( typename Array_type::const_iterator i = arr.begin(); i != arr.end(); ++i )
291+
{
292+
output_composite_item( i, arr.end() );
293+
294+
space();
295+
}
296+
297+
os_ << ']';
298+
}
299+
else
300+
{
301+
output_array_or_obj( arr, '[', ']' );
302+
}
303+
}
304+
305+
template< class T >
306+
void output_array_or_obj( const T& t, Char_type start_char, Char_type end_char )
307+
{
308+
os_ << start_char; new_line();
309+
310+
++indentation_level_;
311+
312+
for( typename T::const_iterator i = t.begin(); i != t.end(); ++i )
313+
{
314+
indent();
315+
316+
output_composite_item( i, t.end() );
317+
318+
new_line();
319+
}
320+
321+
--indentation_level_;
322+
323+
indent(); os_ << end_char;
324+
}
325+
326+
void indent()
327+
{
328+
if( !pretty_ ) return;
329+
330+
for( int i = 0; i < indentation_level_; ++i )
331+
{
332+
os_ << " ";
333+
}
334+
}
335+
336+
void space()
337+
{
338+
if( pretty_ ) os_ << ' ';
339+
}
340+
341+
void new_line()
342+
{
343+
if( pretty_ ) os_ << '\n';
344+
}
345+
346+
Generator& operator=( const Generator& ); // to prevent "assignment operator could not be generated" warning
347+
348+
Ostream_type& os_;
349+
int indentation_level_;
350+
bool pretty_;
351+
bool raw_utf8_;
352+
bool remove_trailing_zeros_;
353+
bool single_line_arrays_;
354+
boost::io::basic_ios_all_saver< Char_type > ios_saver_; // so that ostream state is reset after control is returned to the caller
355+
};
356+
357+
// writes JSON Value to a stream, e.g.
358+
//
359+
// write_stream( value, os, pretty_print );
360+
//
361+
template< class Value_type, class Ostream_type >
362+
void write_stream( const Value_type& value, Ostream_type& os, unsigned int options = 0 )
363+
{
364+
os << std::dec;
365+
Generator< Value_type, Ostream_type >( value, os, options );
366+
}
367+
368+
// writes JSON Value to a stream, e.g.
369+
//
370+
// const string json_str = write( value, pretty_print );
371+
//
372+
template< class Value_type >
373+
typename Value_type::String_type write_string( const Value_type& value, unsigned int options = 0 )
374+
{
375+
typedef typename Value_type::String_type::value_type Char_type;
376+
377+
std::basic_ostringstream< Char_type > os;
378+
379+
write_stream( value, os, options );
380+
381+
return os.str();
382+
}
383+
}
384+
385+
#endif

‎src/kvsorter_main.cc

+64
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* kvsorter_main.cc
3+
*
4+
* Created on: Nov 21, 2012
5+
* Author: alex
6+
*
7+
* In memory key value sorter using stdin and stdout
8+
*/
9+
#include <cstdlib>
10+
#include <cstring>
11+
#include <cerrno>
12+
#include <stdio.h>
13+
#include <iostream>
14+
#include <string>
15+
#include <vector>
16+
#include <boost/algorithm/string.hpp>
17+
#include "headers/kv_struct.h"
18+
19+
using namespace std;
20+
21+
bool comparator(kv t1, kv t2);
22+
23+
int main(int args, char *argv[]) {
24+
25+
string input_line;
26+
vector<kv> data;
27+
while (getline(cin, input_line)) {
28+
29+
//split on tab
30+
vector<string> input;
31+
boost::split(input, input_line, boost::is_any_of("\t"));
32+
33+
//get <key> <value>
34+
string key = input[0];
35+
string value = input[1];
36+
37+
//build kv struct
38+
kv my_kv;
39+
my_kv.key = key;
40+
my_kv.value = value;
41+
42+
//populate data vector
43+
data.push_back(my_kv);
44+
}
45+
46+
//sort data vector with stl
47+
sort(data.begin(),data.end(), comparator);
48+
49+
//print sorted data to stdout
50+
vector<kv>::iterator it;
51+
for (it = data.begin(); it != data.end(); it++) {
52+
string key = (*it).key;
53+
string value = (*it).value;
54+
55+
printf("%s\t%s\n", key.c_str(), value.c_str());
56+
}
57+
58+
return 0;
59+
}
60+
61+
bool comparator(kv t1, kv t2)
62+
{
63+
return t1.key < t2.key ? true : false;
64+
}

‎src/mapper_spill_checker.hpp

+32
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* mapper_spill_checker.hpp
3+
*
4+
* Created on: Nov 23, 2012
5+
* Author: alex
6+
*/
7+
8+
#include "shared_mapper_data.cc"
9+
class mapper_spill_checker {
10+
private:
11+
shared_mapper_data *m_mapper_data;
12+
13+
public:
14+
// Constructor with id and the queue to use.
15+
mapper_spill_checker(shared_mapper_data *mapper_data):m_mapper_data(mapper_data) {
16+
17+
}
18+
19+
//check mapper data
20+
void operator ()() {
21+
while (true) {
22+
23+
m_mapper_data->scan();
24+
25+
26+
27+
// Make sure we can be interrupted
28+
boost::this_thread::interruption_point();
29+
boost::this_thread::yield();
30+
}
31+
}
32+
};

‎src/mapper_to_qfs_partitions_main.cc

+112
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
#include <cstdlib>
2+
#include <cstring>
3+
#include <cerrno>
4+
#include <stdio.h>
5+
#include <iostream>
6+
#include <string>
7+
#include <map>
8+
#include <list>
9+
#include <boost/crc.hpp>
10+
#include <boost/program_options.hpp>
11+
#include <boost/algorithm/string.hpp>
12+
#include <boost/thread.hpp>
13+
//add in_port_t type
14+
#include <libgearman/gearman.h>
15+
16+
#include "kfs/KfsClient.h"
17+
#include "kfs/KfsAttr.h"
18+
19+
#include "headers/kv_struct.h"
20+
#include "shared_mapper_data.cc"
21+
#include "mapper_spill_checker.hpp"
22+
23+
using namespace std;
24+
KFS::KfsClient *gKfsClient;
25+
26+
#define SPILL_THRESHOLD_BYTES 1048576 //1MB
27+
28+
int crc32(const string& my_string);
29+
int get_partition(string key, int num_partitions);
30+
31+
int main(int args, char *argv[]) {
32+
in_port_t qfs_meta_server_port;
33+
int partition_count;
34+
string qfs_meta_server_host, qfs_map_folder, job_id;
35+
36+
boost::program_options::options_description desc("Options");
37+
desc.add_options()("help", "Options related to the program.")
38+
("meta_server_host,s", boost::program_options::value<string>(&qfs_meta_server_host)->default_value( "localhost"), "Connect to the qfs meta server host")
39+
("meta_server_port,p", boost::program_options::value<in_port_t>(&qfs_meta_server_port)->default_value(20000), "Port number use for qfs meta server connection")
40+
("qfs_map_folder,f", boost::program_options::value<string>(&qfs_map_folder)->default_value("/tmp/"), "Intermediate folder for storing mapper output")
41+
("job_id", boost::program_options::value<string>(&job_id)->required(), "MR Job id")
42+
("partition_count,c", boost::program_options::value<int>(&partition_count)->default_value(4), "Number of mapped output partitions");
43+
44+
boost::program_options::variables_map vm;
45+
try {
46+
boost::program_options::store(
47+
boost::program_options::parse_command_line(args, argv, desc),
48+
vm);
49+
boost::program_options::notify(vm);
50+
} catch (exception &e) {
51+
cout << e.what() << endl;
52+
return EXIT_FAILURE;
53+
}
54+
55+
if (vm.count("help")) {
56+
cout << desc << endl;
57+
return EXIT_SUCCESS;
58+
}
59+
60+
//init
61+
string input_line;
62+
shared_mapper_data mapper_data(qfs_meta_server_host, qfs_meta_server_port, qfs_map_folder, job_id, partition_count, SPILL_THRESHOLD_BYTES);
63+
64+
//start mapper_data spill thread
65+
mapper_spill_checker c(&mapper_data);
66+
boost::thread t(c);
67+
68+
while (getline(cin, input_line)) {
69+
70+
//split on tab
71+
vector<string> input;
72+
boost::split(input, input_line, boost::is_any_of("\t"));
73+
74+
//get <key> <value>
75+
string key = input[0];
76+
string value = input[1];
77+
78+
//build kv struct
79+
kv my_kv;
80+
my_kv.key = key;
81+
my_kv.value = value;
82+
83+
//partition
84+
int partition = get_partition(my_kv.key, partition_count);
85+
86+
//save
87+
mapper_data.add(partition, my_kv);
88+
89+
}
90+
91+
//flush any remaining mapper_data
92+
mapper_data.flush_all();
93+
94+
//wait for thread
95+
t.interrupt();
96+
t.join();
97+
98+
return 0;
99+
}
100+
101+
//crc calculation
102+
int crc32(const string& my_string) {
103+
boost::crc_32_type result;
104+
result.process_bytes(my_string.data(), my_string.length());
105+
return result.checksum();
106+
}
107+
108+
//simple random hash partitioning
109+
int get_partition(string key, int num_partitions){
110+
return abs(crc32(key)) % num_partitions;
111+
}
112+

‎src/mapper_worker_main.cc

+189
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
/*
2+
============================================================================
3+
Name : mapper_worker_main.cpp
4+
Author : Alexander Hurd
5+
Version :
6+
Copyright :
7+
Description :
8+
============================================================================
9+
*/
10+
11+
#include <cerrno>
12+
#include <cstdio>
13+
#include <cstdlib>
14+
#include <cstring>
15+
#include <iostream>
16+
17+
#include <libgearman/gearman.h>
18+
#include <boost/program_options.hpp>
19+
#include <boost/lexical_cast.hpp>
20+
21+
#include "json_spirit_headers/json_spirit_reader_template.h"
22+
#include "headers/workloads.h"
23+
24+
#ifndef __INTEL_COMPILER
25+
#pragma GCC diagnostic ignored "-Wold-style-cast"
26+
#endif
27+
28+
using namespace std;
29+
using namespace json_spirit;
30+
31+
static void *mapper(gearman_job_st *job, void *context, size_t *result_size,
32+
gearman_return_t *ret_ptr);
33+
34+
string path_to_qfs_bin_tools, path_to_qfs_mapred_bin;
35+
int main(int args, char *argv[]) {
36+
uint32_t count;
37+
int timeout;
38+
in_port_t port;
39+
string host;
40+
41+
boost::program_options::options_description desc("Options");
42+
desc.add_options()("help", "Options related to the program.")("host,h",
43+
boost::program_options::value<string>(&host)->default_value(
44+
"localhost"), "Connect to the gearmand host")("port,p",
45+
boost::program_options::value<in_port_t>(&port)->default_value(
46+
GEARMAN_DEFAULT_TCP_PORT),
47+
"Port number use for gearmand connection")("count,c",
48+
boost::program_options::value<uint32_t>(&count)->default_value(0),
49+
"Number of jobs to run before exiting")("timeout,u",
50+
boost::program_options::value<int>(&timeout)->default_value(-1),
51+
"Timeout in milliseconds")("path_to_qfs_bin_tools",
52+
boost::program_options::value<string>(&path_to_qfs_bin_tools)->default_value(
53+
"/home/alex/qfs/build/release/bin/tools/"),
54+
"Path to qfs tools folder")("path_to_qfs_mapred_bin",
55+
boost::program_options::value<string>(&path_to_qfs_mapred_bin)->default_value(
56+
"/home/alex/workspace/qfs_mapred/src/"),
57+
"Path to qfs_mapred bin folder");
58+
59+
boost::program_options::variables_map vm;
60+
try {
61+
boost::program_options::store(
62+
boost::program_options::parse_command_line(args, argv, desc),
63+
vm);
64+
boost::program_options::notify(vm);
65+
} catch (exception &e) {
66+
cout << e.what() << endl;
67+
return EXIT_FAILURE;
68+
}
69+
70+
if (vm.count("help")) {
71+
cout << desc << endl;
72+
return EXIT_SUCCESS;
73+
}
74+
75+
gearman_worker_st worker;
76+
if (gearman_worker_create(&worker) == NULL) {
77+
cerr << "Memory allocation failure on worker creation." << endl;
78+
return EXIT_FAILURE;
79+
}
80+
81+
if (timeout >= 0)
82+
gearman_worker_set_timeout(&worker, timeout);
83+
84+
gearman_return_t ret;
85+
ret = gearman_worker_add_server(&worker, host.c_str(), port);
86+
if (ret != GEARMAN_SUCCESS) {
87+
cerr << gearman_worker_error(&worker) << endl;
88+
return EXIT_FAILURE;
89+
}
90+
91+
ret = gearman_worker_add_function(&worker, "mapper", 0, mapper, NULL);
92+
if (ret != GEARMAN_SUCCESS) {
93+
cerr << gearman_worker_error(&worker) << endl;
94+
return EXIT_FAILURE;
95+
}
96+
97+
while (1) {
98+
ret = gearman_worker_work(&worker);
99+
if (ret != GEARMAN_SUCCESS) {
100+
cerr << gearman_worker_error(&worker) << endl;
101+
break;
102+
}
103+
104+
if (count > 0) {
105+
count--;
106+
if (count == 0)
107+
break;
108+
}
109+
}
110+
111+
gearman_worker_free(&worker);
112+
113+
return EXIT_SUCCESS;
114+
}
115+
116+
static void *mapper(gearman_job_st *job, void *context, size_t *result_size,
117+
gearman_return_t *ret_ptr) {
118+
119+
const char *workload;
120+
workload = (const char *) gearman_job_workload(job);
121+
122+
//build payload
123+
string json_workload = (string) workload;
124+
Value value;
125+
read_string(json_workload, value);
126+
Object obj = value.get_obj();
127+
Mapper_Workload payload;
128+
for (Object::size_type i = 0; i != obj.size(); ++i) {
129+
const Pair& pair = obj[i];
130+
131+
const string& name = pair.name_;
132+
const Value& value = pair.value_;
133+
134+
if (name == "job_id") {
135+
payload.job_id = value.get_str();
136+
} else if (name == "qfs_meta_server_name") {
137+
payload.qfs_meta_server_name = value.get_str();
138+
} else if (name == "qfs_meta_server_port") {
139+
payload.qfs_meta_server_port = value.get_int();
140+
} else if (name == "qfs_file_input") {
141+
payload.qfs_file_input = value.get_str();
142+
} else if (name == "qfs_map_folder") {
143+
payload.qfs_map_folder = value.get_str();
144+
} else if (name == "python_mapper_function") {
145+
payload.python_mapper_function = value.get_str();
146+
} else if (name == "partition_count") {
147+
payload.partition_count = value.get_int();
148+
} else {
149+
assert( false);
150+
}
151+
}
152+
153+
//write maper function to file disk
154+
const char *python_mapper_file = tmpnam(NULL); // Get temp name
155+
FILE *fp = fopen(python_mapper_file, "w"); // Create the file
156+
fwrite(payload.python_mapper_function.c_str(), 1,
157+
payload.python_mapper_function.size(), fp);
158+
fclose(fp);
159+
160+
printf("Checking if processor is available...");
161+
if (!system(NULL)) {
162+
// *ret_ptr = GEARMAN_ERROR;
163+
// return;
164+
}
165+
166+
int r;
167+
string mapper_command = path_to_qfs_bin_tools + "cpfromqfs -s "
168+
+ payload.qfs_meta_server_name + " -p "
169+
+ boost::lexical_cast<string>(payload.qfs_meta_server_port) + " -k "
170+
+ payload.qfs_file_input + " -d - | python " + python_mapper_file
171+
+ " | " + path_to_qfs_mapred_bin + "mapper_to_qfs_partitions -s " + payload.qfs_meta_server_name
172+
+ " -p " + boost::lexical_cast<string>(payload.qfs_meta_server_port)
173+
+ " -f " + payload.qfs_map_folder + " --job_id " + payload.job_id
174+
+ " --partition_count "
175+
+ boost::lexical_cast<string>(payload.partition_count);
176+
r = system(mapper_command.c_str());
177+
178+
//output results if any
179+
cout << r;
180+
181+
//remove python script
182+
unlink(python_mapper_file);
183+
184+
*ret_ptr = GEARMAN_SUCCESS;
185+
186+
return NULL;
187+
188+
}
189+

‎src/qfs_atomic_writer.hpp

+103
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* qfs_atomic_writer.hpp
3+
*
4+
* Created on: Nov 23, 2012
5+
* Author: alex
6+
*/
7+
#include <map>
8+
#include <fstream>
9+
#include <fcntl.h>
10+
#include "kfs/KfsClient.h"
11+
#include "kfs/KfsAttr.h"
12+
#include "kfs/common/kfstypes.h"
13+
#include "shared_queue.hpp"
14+
#include "headers/mapper_data_struct.h"
15+
16+
using namespace std;
17+
18+
// Class that consumes objects from a queue
19+
class qfs_atomic_writer {
20+
private:
21+
shared_queue<mapper_data>* m_queue; // The queue to use
22+
string m_qfs_map_folder;
23+
string m_job_id;
24+
map<int, int> m_qfs_partition_fd;
25+
KFS::KfsClient *m_KfsClient;
26+
27+
public:
28+
qfs_atomic_writer(shared_queue<mapper_data>* queue,
29+
string qfs_meta_server_host, in_port_t qfs_meta_server_port,
30+
string qfs_map_folder, string job_id) :
31+
m_queue(queue), m_qfs_map_folder(qfs_map_folder), m_job_id(job_id) {
32+
33+
//connect to qfs
34+
m_KfsClient = KFS::Connect(qfs_meta_server_host, qfs_meta_server_port);
35+
if (!m_KfsClient) {
36+
cerr << "kfs client failed to initialize...exiting" << "\n";
37+
exit(-1);
38+
}
39+
}
40+
41+
~qfs_atomic_writer() {
42+
43+
//close partitions fds
44+
map<int, int>::iterator it;
45+
for (it = m_qfs_partition_fd.begin(); it != m_qfs_partition_fd.end();
46+
it++) {
47+
int fd = (*it).second;
48+
m_KfsClient->Close(fd);
49+
}
50+
}
51+
52+
// The thread function reads data from the queue
53+
void operator ()() {
54+
while (true) {
55+
56+
// Get the data from the queue
57+
mapper_data data = m_queue->Dequeue();
58+
59+
string my_data = data.data;
60+
int partition = data.partition;
61+
62+
//check for fd
63+
int fd;
64+
if (m_qfs_partition_fd.find(partition)
65+
== m_qfs_partition_fd.end()) {
66+
//init fd
67+
init_qfs_fd(partition);
68+
}
69+
70+
//get partition fd
71+
fd = m_qfs_partition_fd[partition];
72+
73+
//write!
74+
int res = m_KfsClient->AtomicRecordAppend(fd, my_data.c_str(), my_data.size());
75+
if (res != (int) my_data.size())
76+
cout << "Atomic write err" <<endl;
77+
78+
// Make sure we can be interrupted
79+
boost::this_thread::interruption_point();
80+
boost::this_thread::yield();
81+
82+
}
83+
}
84+
85+
void init_qfs_fd(int partition) {
86+
string mapper_partition_file = m_qfs_map_folder
87+
+ m_job_id + "_"
88+
+ boost::lexical_cast<string>(partition);
89+
90+
int fd;
91+
if ((fd = m_KfsClient->Open(mapper_partition_file.c_str(), O_APPEND))
92+
< 0) {
93+
cout << "Mapper Partition File Open on : " << mapper_partition_file
94+
<< " failed: " << KFS::ErrorCodeToStr(fd) << endl;
95+
exit(-1);
96+
}
97+
98+
//save
99+
m_qfs_partition_fd[partition] = fd;
100+
101+
}
102+
};
103+

‎src/qfs_mapred_submit_main.cc

+402
Large diffs are not rendered by default.

‎src/reducer_worker_main.cc

+187
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
/*
2+
============================================================================
3+
Name : reducer_worker.cpp
4+
Author : Alexander Hurd
5+
Version :
6+
Copyright :
7+
Description :
8+
============================================================================
9+
*/
10+
11+
#include <cerrno>
12+
#include <cstdio>
13+
#include <cstdlib>
14+
#include <cstring>
15+
#include <iostream>
16+
17+
#include <libgearman/gearman.h>
18+
#include <boost/program_options.hpp>
19+
#include <boost/lexical_cast.hpp>
20+
21+
#include "json_spirit_headers/json_spirit_reader_template.h"
22+
#include "headers/workloads.h"
23+
24+
#ifndef __INTEL_COMPILER
25+
#pragma GCC diagnostic ignored "-Wold-style-cast"
26+
#endif
27+
28+
using namespace std;
29+
using namespace json_spirit;
30+
31+
static void *reducer(gearman_job_st *job, void *context, size_t *result_size,
32+
gearman_return_t *ret_ptr);
33+
34+
string path_to_qfs_bin_tools;
35+
int main(int args, char *argv[]) {
36+
uint32_t count;
37+
int timeout;
38+
in_port_t port;
39+
string host;
40+
41+
boost::program_options::options_description desc("Options");
42+
desc.add_options()("help", "Options related to the program.")("host,h",
43+
boost::program_options::value<string>(&host)->default_value(
44+
"localhost"), "Connect to the gearmand host")("port,p",
45+
boost::program_options::value<in_port_t>(&port)->default_value(
46+
GEARMAN_DEFAULT_TCP_PORT),
47+
"Port number use for gearmand connection")("count,c",
48+
boost::program_options::value<uint32_t>(&count)->default_value(0),
49+
"Number of jobs to run before exiting")("timeout,u",
50+
boost::program_options::value<int>(&timeout)->default_value(-1),
51+
"Timeout in milliseconds")("path_to_qfs_bin_tools,p",
52+
boost::program_options::value<string>(&path_to_qfs_bin_tools)->default_value(
53+
"/home/alex/qfs/build/release/bin/tools/"),
54+
"Path to qfs tools folder");
55+
;
56+
57+
boost::program_options::variables_map vm;
58+
try {
59+
boost::program_options::store(
60+
boost::program_options::parse_command_line(args, argv, desc),
61+
vm);
62+
boost::program_options::notify(vm);
63+
} catch (exception &e) {
64+
cout << e.what() << endl;
65+
return EXIT_FAILURE;
66+
}
67+
68+
if (vm.count("help")) {
69+
cout << desc << endl;
70+
return EXIT_SUCCESS;
71+
}
72+
73+
gearman_worker_st worker;
74+
if (gearman_worker_create(&worker) == NULL) {
75+
cerr << "Memory allocation failure on worker creation." << endl;
76+
return EXIT_FAILURE;
77+
}
78+
79+
if (timeout >= 0)
80+
gearman_worker_set_timeout(&worker, timeout);
81+
82+
gearman_return_t ret;
83+
ret = gearman_worker_add_server(&worker, host.c_str(), port);
84+
if (ret != GEARMAN_SUCCESS) {
85+
cerr << gearman_worker_error(&worker) << endl;
86+
return EXIT_FAILURE;
87+
}
88+
89+
ret = gearman_worker_add_function(&worker, "reducer", 0, reducer, NULL);
90+
if (ret != GEARMAN_SUCCESS) {
91+
cerr << gearman_worker_error(&worker) << endl;
92+
return EXIT_FAILURE;
93+
}
94+
95+
while (1) {
96+
ret = gearman_worker_work(&worker);
97+
if (ret != GEARMAN_SUCCESS) {
98+
cerr << gearman_worker_error(&worker) << endl;
99+
break;
100+
}
101+
102+
if (count > 0) {
103+
count--;
104+
if (count == 0)
105+
break;
106+
}
107+
}
108+
109+
gearman_worker_free(&worker);
110+
111+
return EXIT_SUCCESS;
112+
}
113+
114+
static void *reducer(gearman_job_st *job, void *context, size_t *result_size,
115+
gearman_return_t *ret_ptr) {
116+
117+
const char *workload;
118+
workload = (const char *) gearman_job_workload(job);
119+
120+
//build payload
121+
string json_workload = (string) workload;
122+
Value value;
123+
read_string(json_workload, value);
124+
Object obj = value.get_obj();
125+
Reducer_Workload payload;
126+
for (Object::size_type i = 0; i != obj.size(); ++i) {
127+
const Pair& pair = obj[i];
128+
129+
const string& name = pair.name_;
130+
const Value& value = pair.value_;
131+
132+
if (name == "qfs_meta_server_name") {
133+
payload.qfs_meta_server_name = value.get_str();
134+
} else if (name == "qfs_meta_server_port") {
135+
payload.qfs_meta_server_port = value.get_int();
136+
} else if (name == "qfs_sorted_partition_file") {
137+
payload.qfs_sorted_partition_files = value.get_array();
138+
} else if (name == "qfs_output_file") {
139+
payload.qfs_output_file = value.get_str();
140+
} else if (name == "python_reducer_function") {
141+
payload.python_reducer_function = value.get_str();
142+
} else {
143+
assert( false);
144+
}
145+
}
146+
147+
//write reducer function to file disk
148+
const char *python_reducer_file = tmpnam(NULL); // Get temp name
149+
FILE *fp = fopen(python_reducer_file, "w"); // Create the file
150+
fwrite(payload.python_reducer_function.c_str(), 1,
151+
payload.python_reducer_function.size(), fp);
152+
fclose(fp);
153+
154+
printf("Checking if processor is available...");
155+
if (!system(NULL)) {
156+
// *ret_ptr = GEARMAN_ERROR;
157+
// return;
158+
}
159+
160+
//loop sorted partitions and reduce!
161+
for (int i = 0; i < payload.qfs_sorted_partition_files.size(); i++) {
162+
163+
string file = payload.qfs_sorted_partition_files[i].get_str();
164+
165+
int r;
166+
string mapper_command = path_to_qfs_bin_tools + "cpfromqfs -s "
167+
+ payload.qfs_meta_server_name + " -p "
168+
+ boost::lexical_cast<string>(payload.qfs_meta_server_port)
169+
+ " -k " + file
170+
+ " -d - | python " + python_reducer_file + " | " + path_to_qfs_bin_tools + "cptoqfs -s "
171+
+ payload.qfs_meta_server_name + " -p "
172+
+ boost::lexical_cast<string>(payload.qfs_meta_server_port)
173+
+ " -d - -k " + payload.qfs_output_file + "_" + boost::lexical_cast<string>(i);
174+
175+
r = system(mapper_command.c_str());
176+
177+
//output results if any
178+
cout << r;
179+
}
180+
181+
//remove python script
182+
unlink(python_reducer_file);
183+
184+
*ret_ptr = GEARMAN_SUCCESS;
185+
186+
return NULL;
187+
}

‎src/shared_mapper_data.cc

+141
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
* shared_mapper_data.cc
3+
*
4+
* Created on: Nov 22, 2012
5+
* Author: alex
6+
*/
7+
8+
#ifndef SHARED_MAPPER_DATA_H_
9+
#define SHARED_MAPPER_DATA_H_
10+
11+
#include <map>
12+
#include <list>
13+
#include <boost/thread.hpp>
14+
15+
#include "shared_queue.hpp"
16+
#include "qfs_atomic_writer.hpp"
17+
#include "headers/mapper_data_struct.h"
18+
#include "headers/kv_struct.h"
19+
20+
#define QFS_WRITER_THREADS 1
21+
22+
using namespace std;
23+
24+
class shared_mapper_data {
25+
26+
private:
27+
boost::thread_group m_writers;
28+
map<int, list<kv> > m_mapper_data;
29+
map<int, long> m_partition_sizes;
30+
int m_partition_count;
31+
long m_spill_threshold_kb;
32+
shared_queue<mapper_data> m_write_queue;
33+
boost::mutex m_mutex;
34+
35+
public:
36+
shared_mapper_data(string qfs_meta_server_host, in_port_t qfs_meta_server_port, string qfs_map_folder, string job_id, int partition_count,
37+
long spill_threshold_kb) :
38+
m_partition_count(partition_count), m_spill_threshold_kb(
39+
spill_threshold_kb) {
40+
41+
// Create qfs writers
42+
for (int i = 0; i < QFS_WRITER_THREADS; i++) {
43+
qfs_atomic_writer w(&m_write_queue, qfs_meta_server_host, qfs_meta_server_port, qfs_map_folder, job_id);
44+
m_writers.create_thread(w);
45+
}
46+
47+
}
48+
49+
~shared_mapper_data() {
50+
m_writers.interrupt_all();
51+
m_writers.join_all();
52+
}
53+
54+
//scan mapper partition data for spill threshold
55+
void scan() {
56+
57+
//get lock
58+
boost::unique_lock<boost::mutex> lock(m_mutex);
59+
60+
//scan partition size
61+
map<int, long>::iterator it;
62+
for (it = m_partition_sizes.begin(); it != m_partition_sizes.end();
63+
it++) {
64+
int partition = (*it).first;
65+
long size_bytes = (*it).second;
66+
67+
//spill to qfs if threshold is met
68+
if (size_bytes >= m_spill_threshold_kb){
69+
write_partition_to_qfs(partition);
70+
m_partition_sizes[partition] = 0;
71+
}
72+
}
73+
74+
//release
75+
//lock.release();
76+
}
77+
78+
//spill to qfs
79+
void write_partition_to_qfs(int partition) {
80+
81+
//get data from map
82+
list<kv> my_list = m_mapper_data[partition];
83+
string data;
84+
list<kv>::iterator it;
85+
for (it = my_list.begin(); it != my_list.end(); it++) {
86+
string key = (*it).key;
87+
string value = (*it).value;
88+
data.append(key + "\t" + value + "\n");
89+
}
90+
91+
//empty list
92+
m_mapper_data[partition].clear();
93+
94+
mapper_data my_data;
95+
my_data.data = data;
96+
my_data.partition = partition;
97+
98+
//add to write queue
99+
m_write_queue.Enqueue(my_data);
100+
101+
}
102+
103+
//add new data
104+
void add(int partition, kv my_kv) {
105+
106+
//get lock
107+
boost::unique_lock<boost::mutex> lock(m_mutex);
108+
109+
//add kv to partition
110+
m_mapper_data[partition].push_back(my_kv);
111+
112+
//increment partition size
113+
if (m_partition_sizes.find(partition) != m_partition_sizes.end()) {
114+
m_partition_sizes[partition] += my_kv.key.size()
115+
+ my_kv.value.size();
116+
} else {
117+
//init
118+
m_partition_sizes[partition] = my_kv.key.size()
119+
+ my_kv.value.size();
120+
}
121+
}
122+
123+
//flush all data to qfs
124+
void flush_all() {
125+
126+
//get lock
127+
boost::unique_lock<boost::mutex> lock(m_mutex);
128+
129+
//loop partitions
130+
map<int, list<kv> >::iterator it;
131+
for (it = m_mapper_data.begin(); it != m_mapper_data.end(); it++) {
132+
int partition = (*it).first;
133+
list<kv> map_list = (*it).second;
134+
135+
//atomic append list to partition file
136+
write_partition_to_qfs(partition);
137+
}
138+
}
139+
};
140+
141+
#endif /*SHARED_MAPPER_DATA_H_*/

‎src/shared_queue.hpp

+57
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* shared_queue.hpp
3+
*
4+
* Created on: Nov 23, 2012
5+
* Author: https://www.quantnet.com/threads/c-multithreading-in-boost.10028/
6+
*/
7+
8+
#ifndef SHARED_QUEUE_H_
9+
#define SHARED_QUEUE_H_
10+
11+
#include <queue>
12+
#include <boost/thread.hpp>
13+
14+
// Queue class that has thread synchronisation
15+
template<typename T>
16+
class shared_queue {
17+
private:
18+
std::queue<T> m_queue; // Use STL queue to store data
19+
boost::mutex m_mutex; // The mutex to synchronise on
20+
boost::condition_variable m_cond; // The condition to wait for
21+
22+
public:
23+
24+
// Add data to the queue and notify others
25+
void Enqueue(const T& data) {
26+
// Acquire lock on the queue
27+
boost::unique_lock<boost::mutex> lock(m_mutex);
28+
29+
// Add the data to the queue
30+
m_queue.push(data);
31+
32+
// Notify others that data is ready
33+
m_cond.notify_one();
34+
35+
} // Lock is automatically released here
36+
37+
// Get data from the queue. Wait for data if not available
38+
T Dequeue() {
39+
40+
// Acquire lock on the queue
41+
boost::unique_lock<boost::mutex> lock(m_mutex);
42+
43+
// When there is no data, wait till someone fills it.
44+
// Lock is automatically released in the wait and obtained
45+
// again after the wait
46+
while (m_queue.size() == 0)
47+
m_cond.wait(lock);
48+
49+
// Retrieve the data from the queue
50+
T result = m_queue.front();
51+
m_queue.pop();
52+
return result;
53+
54+
} // Lock is automatically released here
55+
};
56+
57+
#endif /*SHARED_QUEUE_H_*/

‎src/sorter_worker_main.cc

+169
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
/*
2+
* sorter_worker_main.cc
3+
*
4+
* Created on: Nov 21, 2012
5+
* Author: alex
6+
*/
7+
8+
#include <cerrno>
9+
#include <cstdio>
10+
#include <cstdlib>
11+
#include <cstring>
12+
#include <iostream>
13+
14+
#include <libgearman/gearman.h>
15+
#include <boost/program_options.hpp>
16+
#include <boost/lexical_cast.hpp>
17+
18+
#include "json_spirit_headers/json_spirit_reader_template.h"
19+
#include "headers/workloads.h"
20+
21+
#ifndef __INTEL_COMPILER
22+
#pragma GCC diagnostic ignored "-Wold-style-cast"
23+
#endif
24+
25+
using namespace std;
26+
using namespace json_spirit;
27+
28+
static void *sorter(gearman_job_st *job, void *context, size_t *result_size,
29+
gearman_return_t *ret_ptr);
30+
31+
string path_to_qfs_bin_tools, path_to_qfs_mapred_bin;
32+
int main(int args, char *argv[]) {
33+
uint32_t count;
34+
int timeout;
35+
in_port_t port;
36+
string host;
37+
38+
boost::program_options::options_description desc("Options");
39+
desc.add_options()("help", "Options related to the program.")("host,h",
40+
boost::program_options::value<string>(&host)->default_value(
41+
"localhost"), "Connect to the gearmand host")("port,p",
42+
boost::program_options::value<in_port_t>(&port)->default_value(
43+
GEARMAN_DEFAULT_TCP_PORT),
44+
"Port number use for gearmand connection")("count,c",
45+
boost::program_options::value<uint32_t>(&count)->default_value(0),
46+
"Number of jobs to run before exiting")("timeout,u",
47+
boost::program_options::value<int>(&timeout)->default_value(-1),
48+
"Timeout in milliseconds")("path_to_qfs_bin_tools,p",
49+
boost::program_options::value<string>(&path_to_qfs_bin_tools)->default_value(
50+
"/home/alex/qfs/build/release/bin/tools/"),
51+
"Path to qfs tools folder")("path_to_qfs_mapred_bin",
52+
boost::program_options::value<string>(&path_to_qfs_mapred_bin)->default_value(
53+
"/home/alex/workspace/qfs_mapred/src/"),
54+
"Path to qfs_mapred bin folder");
55+
56+
boost::program_options::variables_map vm;
57+
try {
58+
boost::program_options::store(
59+
boost::program_options::parse_command_line(args, argv, desc),
60+
vm);
61+
boost::program_options::notify(vm);
62+
} catch (exception &e) {
63+
cout << e.what() << endl;
64+
return EXIT_FAILURE;
65+
}
66+
67+
if (vm.count("help")) {
68+
cout << desc << endl;
69+
return EXIT_SUCCESS;
70+
}
71+
72+
gearman_worker_st worker;
73+
if (gearman_worker_create(&worker) == NULL) {
74+
cerr << "Memory allocation failure on worker creation." << endl;
75+
return EXIT_FAILURE;
76+
}
77+
78+
if (timeout >= 0)
79+
gearman_worker_set_timeout(&worker, timeout);
80+
81+
gearman_return_t ret;
82+
ret = gearman_worker_add_server(&worker, host.c_str(), port);
83+
if (ret != GEARMAN_SUCCESS) {
84+
cerr << gearman_worker_error(&worker) << endl;
85+
return EXIT_FAILURE;
86+
}
87+
88+
ret = gearman_worker_add_function(&worker, "sorter", 0, sorter, NULL);
89+
if (ret != GEARMAN_SUCCESS) {
90+
cerr << gearman_worker_error(&worker) << endl;
91+
return EXIT_FAILURE;
92+
}
93+
94+
while (1) {
95+
ret = gearman_worker_work(&worker);
96+
if (ret != GEARMAN_SUCCESS) {
97+
cerr << gearman_worker_error(&worker) << endl;
98+
break;
99+
}
100+
101+
if (count > 0) {
102+
count--;
103+
if (count == 0)
104+
break;
105+
}
106+
}
107+
108+
gearman_worker_free(&worker);
109+
110+
return EXIT_SUCCESS;
111+
}
112+
113+
static void *sorter(gearman_job_st *job, void *context, size_t *result_size,
114+
gearman_return_t *ret_ptr) {
115+
116+
const char *workload;
117+
workload = (const char *) gearman_job_workload(job);
118+
119+
//build payload
120+
string json_workload = (string) workload;
121+
Value value;
122+
read_string(json_workload, value);
123+
Object obj = value.get_obj();
124+
Sorter_Workload payload;
125+
for (Object::size_type i = 0; i != obj.size(); ++i) {
126+
const Pair& pair = obj[i];
127+
128+
const string& name = pair.name_;
129+
const Value& value = pair.value_;
130+
131+
if (name == "qfs_meta_server_name") {
132+
payload.qfs_meta_server_name = value.get_str();
133+
} else if (name == "qfs_meta_server_port") {
134+
payload.qfs_meta_server_port = value.get_int();
135+
} else if (name == "qfs_partition_file_input") {
136+
payload.qfs_partition_file_input = value.get_str();
137+
} else if (name == "qfs_partition_file_output") {
138+
payload.qfs_partition_file_output = value.get_str();
139+
} else {
140+
assert( false);
141+
}
142+
}
143+
144+
printf("Checking if processor is available...");
145+
if (!system(NULL)) {
146+
// *ret_ptr = GEARMAN_ERROR;
147+
// return;
148+
}
149+
150+
int r;
151+
string sort_command = path_to_qfs_bin_tools + "cpfromqfs -s "
152+
+ payload.qfs_meta_server_name + " -p "
153+
+ boost::lexical_cast<string>(payload.qfs_meta_server_port) + " -k "
154+
+ payload.qfs_partition_file_input
155+
+ " -d - | " + path_to_qfs_mapred_bin + "kvsorter | " + path_to_qfs_bin_tools + "cptoqfs -s " + payload.qfs_meta_server_name
156+
+ " -p " + boost::lexical_cast<string>(payload.qfs_meta_server_port)
157+
+ " -d - -k " + payload.qfs_partition_file_output + " -S";
158+
159+
r = system(sort_command.c_str());
160+
161+
//output results if any
162+
cout << r;
163+
164+
*ret_ptr = GEARMAN_SUCCESS;
165+
166+
return NULL;
167+
168+
}
169+

0 commit comments

Comments
 (0)
Please sign in to comment.