Skip to content

Commit

Permalink
DeepDist
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkneumann committed Aug 24, 2014
1 parent e69006f commit 63ade13
Show file tree
Hide file tree
Showing 5 changed files with 19,690 additions and 11 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ Training deep belief networks requires extensive data and computation. [DeepDist
Quick start:
----

Training of [word2vec](https://code.google.com/p/word2vec/) model on [wikipedia](http://dumps.wikimedia.org/enwiki/) in 15 lines of code:
Training of a [word2vec](https://code.google.com/p/word2vec/) model on [wikipedia](http://dumps.wikimedia.org/enwiki/) in 15 lines of code:

```python
from deepdist import DeepDist
Expand Down
22 changes: 12 additions & 10 deletions deepdist/deepdist.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ def __init__(self, model, host='127.0.0.1:5000'):
self.state = 'serving'
self.served = 0
self.received = 0
self.master = socket.gethostbyname(socket.gethostname())

def __enter__(self):
Thread(target=self.start).start()
Expand All @@ -33,11 +32,11 @@ def start(self):
app = Flask(__name__)

@app.route('/')
def main_url():
def index():
return 'DeepDist'

@app.route('/model', methods=['GET', 'POST', 'PUT'])
def model_url():
def model_flask():
i = 0
while (self.state != 'serving') and (i < 20):
time.sleep(1)
Expand All @@ -47,18 +46,19 @@ def model_url():
self.served += 1
model = copy.deepcopy(self.model)
self.lock.release()

return pickle.dumps(model, -1)


@app.route('/update', methods=['GET', 'POST', 'PUT'])
def update_url():
def update_flask():
gradient = pickle.loads(request.data)

self.lock.acquire_write()
state = 'receiving'
self.received += 1

updated_model = self.descent(self.model, gradient)
self.descent(self.model, gradient)

if self.received >= self.served:
self.received = 0
Expand All @@ -75,19 +75,21 @@ def train(self, rdd, gradient, descent):

self.descent = descent

host = self.host # will be pickled by rdd.mapPartitions

def mapPartitions(data):
return (send_gradient(gradient(fetch_model(), data)))
return (send_gradient(gradient(fetch_model(host=host), data), host=host))

return rdd.mapPartitions(mapPartitions).collect()

def fetch_model():
request = urllib2.Request('http://%s/model' % self.host,
def fetch_model(host='localhost:5000'):
request = urllib2.Request('http://%s/model' % host,
headers={'Content-Type': 'application/deepdist'})
return pickle.loads(urllib2.urlopen(request).read())

def send_gradient(gradient):
def send_gradient(gradient, host='localhost:5000'):
if not gradient:
return 'EMPTY'
request = urllib2.Request('http://%s/update' % self.host, pickle.dumps(gradient, -1),
request = urllib2.Request('http://%s/update' % host, pickle.dumps(gradient, -1),
headers={'Content-Type': 'application/deepdist'})
return urllib2.urlopen(request).read()
99 changes: 99 additions & 0 deletions examples/adagrad_word2vec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
from deepdist import DeepDist
from gensim.models.word2vec import Vocab, Word2Vec
import numpy as np
import os
from pyspark import SparkContext

sc = SparkContext()

corpus = sc.textFile('enwiki').map(lambda s: s.split()).filter(lambda s: len(s) > 0)

print 'Building vocabulary...'
s = corpus \
.flatMap(lambda s: [(w, 1) for w in s]) \
.reduceByKey(lambda a, b: a+b) \
.filter(lambda x: x[1] >= 5) \
.map(lambda x: (x[1], x[0])) \
.sortByKey(False) \
.collect()

vocab = {}
for i, (c, w) in enumerate(s):
if i >= 1000000:
break
if (i + 1) % 100000 == 0:
print i+1
vocab[w] = Vocab(count=c)

def build_vocab(model, vocab):
model.word_count = long(0)
model.total_words = long(0)
model.vocab, model.index2word = {}, []
for word, v in vocab.iteritems():
if v.count >= model.min_count:
v.index = len(model.vocab)
model.index2word.append(word)
model.vocab[word] = v
model.total_words += v.count
print "total %i word types after removing those with count<%s" % (len(model.vocab), model.min_count)

if model.hs:
model.create_binary_tree()
if model.negative:
model.make_table()

model.precalc_sampling()
model.reset_weights()

model = Word2Vec()
build_vocab(model, vocab)

print 'Pretraining...'
for filename in os.listdir('enwiki')[:10]:
model.train([s.split() for s in open('enwiki/%s' % filename)])
print '...pretrained.'

model.ssyn0 = 0
model.ssyn1 = 0

def gradient(model, data):
syn0, syn1 = model.syn0.copy(), model.syn1.copy()
words = model.train(data, word_count=model.word_count, total_words=model.total_words)
update = {
'syn0': model.syn0 - syn0,
'syn1': model.syn1 - syn1,
'words': words - model.word_count
}
return update

def descent(model, update):
alpha = max(model.min_alpha, model.alpha * (1.0 - 1.0 * model.word_count / model.total_words))

syn0 = update['syn0'] / alpha
syn1 = update['syn1'] / alpha

model.ssyn0 += syn0 * syn0
model.ssyn1 += syn1 * syn1

alpha0 = alpha / (1e-6 + np.sqrt(model.ssyn0))
alpha1 = alpha / (1e-6 + np.sqrt(model.ssyn1))

model.syn0 += syn0 * alpha0
model.syn1 += syn1 * alpha1

model.word_count = long(model.word_count) + long(update['words'])

with DeepDist(model) as dd:

dd.train(corpus, gradient, descent)


del model.syn0norm
for row in model.accuracy('questions-words.txt'):
if row['section'] != 'total':
continue
print(' %i %.1f%% v%i %.1f' %
(row['correct'], 100.0 * row['correct'] / (row['incorrect'] + row['correct']),
model.version, 1.0 * row['correct'] / model.version))

print model.most_similar(positive=['woman', 'king'], negative=['man'])
Loading

0 comments on commit 63ade13

Please sign in to comment.