From 7e07a5baf90cefb5508fe813b5ecc60074098913 Mon Sep 17 00:00:00 2001 From: Uri Rosenberg Date: Sun, 5 Jul 2015 10:47:27 +0300 Subject: [PATCH 1/2] Added shutdown method for exit --- deepdist/deepdist.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/deepdist/deepdist.py b/deepdist/deepdist.py index 95f17a9..7a373f8 100644 --- a/deepdist/deepdist.py +++ b/deepdist/deepdist.py @@ -22,7 +22,7 @@ def __init__(self, model, master='127.0.0.1:5000', min_updates=0, max_updates=40 self.state = 'serving' self.served = 0 self.received = 0 - #self.server = None + self.server = '0.0.0.0' self.pmodel = None self.min_updates = min_updates self.max_updates = max_updates @@ -34,8 +34,9 @@ def __enter__(self): return self def __exit__(self, type, value, traceback): - # self.server.terminate() - pass # need to shut down server here + url = "http://%s:5000/shutdown"%self.server + response = urllib2.urlopen(url, '{}').read() + print"exit performed" def start(self): from flask import Flask, request @@ -91,6 +92,14 @@ def update_flask(): self.lock.release() return 'OK' + @app.route('/shutdown', methods=['POST']) + def shutdown(): + func = request.environ.get('werkzeug.server.shutdown') + if func is None: + raise RuntimeError('Not running with the Werkzeug Server') + func() + return 'Server shutting down...' + print 'Listening to 0.0.0.0:5000...' app.run(host='0.0.0.0', debug=True, threaded=True, use_reloader=False) From 86aaf68e64ca3d2661184f140594db5f47d02cb9 Mon Sep 17 00:00:00 2001 From: Uri Rosenberg Date: Sun, 5 Jul 2015 11:52:46 +0300 Subject: [PATCH 2/2] Update deepdist.py In order to support Neural networks implemented with anonymous functions added dumping using cloudpickle. cloudpickle makes it possible to serialize Python constructs not supported by the default pickle module from the Python standard library. --- deepdist/deepdist.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/deepdist/deepdist.py b/deepdist/deepdist.py index 7a373f8..1398ce3 100644 --- a/deepdist/deepdist.py +++ b/deepdist/deepdist.py @@ -1,5 +1,6 @@ import copy import cPickle as pickle +import cloudpickle as pickleDumper from multiprocessing import Process from rwlock import RWLock import socket @@ -61,7 +62,7 @@ def model_flask(): self.lock.release() self.lock.acquire_write() if not self.pmodel: - self.pmodel = pickle.dumps(self.model, -1) + self.pmodel = pickleDumper.dumps(self.model, -1) self.served += 1 pmodel = self.pmodel self.lock.release() @@ -131,6 +132,6 @@ def fetch_model(master='localhost:5000'): def send_gradient(gradient, master='localhost:5000'): if not gradient: return 'EMPTY' - request = urllib2.Request('http://%s/update' % master, pickle.dumps(gradient, -1), + request = urllib2.Request('http://%s/update' % master, pickleDumper.dumps(gradient, -1), headers={'Content-Type': 'application/deepdist'}) return urllib2.urlopen(request).read()