-
Notifications
You must be signed in to change notification settings - Fork 8
/
FC_kron.py
104 lines (80 loc) · 2.66 KB
/
FC_kron.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
"""
Copyright (c) 2014, Austin R. Benson, David F. Gleich,
Purdue University, and Stanford University.
All rights reserved.
This file is part of MRNMF and is under the BSD 2-Clause License,
which can be found in the LICENSE file in the root directory, or at
http://opensource.org/licenses/BSD-2-Clause
"""
import dumbo
import sys
import os
import random
import numpy as np
import struct
import util
import mrnmf
"""
Form the Kronecker product of the Flow Cytometry data matrix.
First, put the data file in HDFS:
hadoop fs -put data/FC_40k.txt FC_40k.txt
Now, run the script:
dumbo start FC_kron.py -mat FC_40k.txt -hadoop $HADOOP_INSTALL
"""
# create the global options structure
gopts = util.GlobalOptions()
class Map:
def __init__(self):
self.mat = []
try:
f = open('FC_40k.txt', 'r')
except:
f = open('data/FC_40k.txt', 'r')
for line in f:
row = [float(v) for v in line.split()[1:]]
self.mat.append(row)
f.close()
self.mat = np.array(self.mat)
print >>sys.stderr, self.mat.shape
def __call__(self, key, value):
value = value.split()
curr_key = int(value[0])
value = [float(v) for v in value[1:]]
# number of rows per record
split_size = 1000
total = self.mat.shape[0] / split_size
for j in xrange(total):
if j == total - 1:
B = np.kron(np.array(value), self.mat[split_size * j:])
else:
B = np.kron(np.array(value), self.mat[split_size * j:split_size * (j + 1)])
print >>sys.stderr, B.shape
keys = [split_size * j + i + 1 for i in xrange(B.shape[0])]
key = (curr_key, keys)
yield key, B.tolist()
def runner(job):
options=[('numreducetasks', '0'), ('nummaptasks', '40')]
job.additer(mapper=Map(), reducer=mrnmf.ID_REDUCER, opts=options)
def starter(prog):
print "running starter!"
mypath = os.path.dirname(__file__)
print "my path: " + mypath
# set the global opts
gopts.prog = prog
mat = prog.delopt('mat')
if not mat:
return "'mat' not specified'"
prog.addopt('memlimit','8g')
prog.addopt('file',os.path.join(mypath,'util.py'))
prog.addopt('file',os.path.join(mypath,'mrnmf.py'))
prog.addopt('file',os.path.join(mypath,'data/FC_40k.txt'))
prog.addopt('input', mat)
matname,matext = os.path.splitext(mat)
output = prog.getopt('output')
if not output:
prog.addopt('output','FC_kron.bseq')
prog.addopt('overwrite','yes')
prog.addopt('jobconf','mapred.output.compress=true')
gopts.save_params()
if __name__ == '__main__':
dumbo.main(runner, starter)