Skip to content

Commit

Permalink
Webclient changes peer list in real-time.
Browse files Browse the repository at this point in the history
  • Loading branch information
jonasnick committed May 8, 2014
1 parent 26646ed commit 7f55157
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 23 deletions.
38 changes: 30 additions & 8 deletions html/js/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ angular.module('app').controller('Market', ['$scope', function($scope) {
}

$scope.peers = [];
$scope.peerIds = [];
$scope.reviews = {};

$scope.toggleSidebar = function() {
Expand Down Expand Up @@ -80,7 +79,10 @@ angular.module('app').controller('Market', ['$scope', function($scope) {
var socket = new Connection(function(msg) {
switch(msg.type) {
case 'peer':
$scope.parse_peer(msg)
$scope.add_peer(msg)
break;
case 'peer_remove':
$scope.remove_peer(msg)
break;
case 'page':
$scope.parse_page(msg)
Expand Down Expand Up @@ -220,14 +222,34 @@ angular.module('app').controller('Market', ['$scope', function($scope) {
$scope.$apply();
}
}
$scope.parse_peer = function(msg) {

console.log('PARSE PEER: ',msg);
$scope.add_peer = function(msg) {

console.log('Add peer: ',msg);

if ($scope.peerIds.indexOf(msg.uri) == -1) {
$scope.peers.push(msg)
$scope.peerIds.push(msg.uri)
var index = $scope.peers.findIndex(function(element) {
return element.uri == msg.uri;
});

if(index == -1) {
/* it is a new peer */
$scope.peers.push(msg);
} else {
$scope.peers[index] = msg;
}
if (!$scope.$$phase) {
$scope.$apply();
}
}

$scope.remove_peer = function(msg) {

console.log('Remove peer: ',msg);

$scope.peers = $scope.peers.filter(function(element) {
return element.uri != msg.uri;
});

if (!$scope.$$phase) {
$scope.$apply();
}
Expand Down Expand Up @@ -265,7 +287,7 @@ angular.module('app').controller('Market', ['$scope', function($scope) {
});

msg.peers.forEach(function(peer) {
$scope.parse_peer(peer)
$scope.add_peer(peer)
});


Expand Down
8 changes: 6 additions & 2 deletions node/crypto2crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,9 @@ def create_peer(self, uri, pub):
#if not self.pubkey_exists(pub):
self._peers[uri] = CryptoPeerConnection(self, uri, pub)

# Call 'peer' callbacks on listeners
#self.trigger_callbacks('peer', self._peers[uri])
# Call 'peer' callbacks on listeners
self.trigger_callbacks('peer', self._peers[uri])

#else:
# print 'Pub Key is already in peer list'

Expand Down Expand Up @@ -138,9 +139,12 @@ def init_peer(self, msg):
if not self._peers[uri]._pub:
self._log.info("Setting public key for seed node")
self._peers[uri]._pub = pub.decode('hex')
self.trigger_callbacks('peer', self._peers[uri])

if (self._peers[uri]._pub != pub.decode('hex')):
self._log.info("Updating public key for node")
self._peers[uri]._pub = pub.decode('hex')
self.trigger_callbacks('peer', self._peers[uri])

if msg_type == 'hello_request':
# reply only if necessary
Expand Down
10 changes: 9 additions & 1 deletion node/p2p.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ def send_raw(self, serialized):
Thread(target=self._send_raw, args=(serialized,)).start()

def _send_raw(self, serialized):
# zmq sockets are not threadsafe, they have to run in a separate process
# pyzmq sockets are not threadsafe, they have to run in a separate process
queue = Queue()
# queue element is false if something went wrong and the peer
# has to be removed
Process(target=self._send_raw_process, args=(serialized,queue)).start()
if not queue.get():
self._log.info("Peer %s timed out." % self._address)
Expand Down Expand Up @@ -128,6 +130,12 @@ def remove_peer(self, uri):
self._log.info("Removing peer %s", uri )
try:
del self._peers[uri]
msg = {
'type': 'peer_remove',
'uri': uri
}
self.trigger_callbacks(msg['type'], msg)

except KeyError:
self._log.info("Peer %s was already removed", uri)

Expand Down
17 changes: 5 additions & 12 deletions node/ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def __init__(self, transport, node, handler):

# register on transport events to forward..
transport.add_callback('peer', self.on_node_peer)
transport.add_callback('peer_remove', self.on_node_remove_peer)
transport.add_callback('page', self.on_node_page)
transport.add_callback('all', self.on_node_message)

Expand Down Expand Up @@ -54,16 +55,6 @@ def send_opening(self):

self.send_to_client(None, message)

def check_peers(self):
for uri, peer in self._transport._peers.items():
peer_item = {'uri': uri}
if peer._pub:
peer_item['pubkey'] = peer._pub.encode('hex')
else:
peer_item['pubkey'] = 'unknown'
peers.append(peer_item)


# Requests coming from the client
def client_query_page(self, socket_handler, msg):
self._log.info("Message: ", msg)
Expand Down Expand Up @@ -110,10 +101,13 @@ def client_shout(self, socket_handler, msg):

# messages coming from "the market"
def on_node_peer(self, peer):
self.check_peers()
self._log.info("Add peer")
response = {'type': 'peer', 'pubkey': peer._pub.encode('hex'), 'uri': peer._address}
self.send_to_client(None, response)

def on_node_remove_peer(self, msg):
self.send_to_client(None, msg)

def on_node_page(self, page):
self.send_to_client(None, page)

Expand Down Expand Up @@ -161,7 +155,6 @@ def initialize(self, transport, node):
self._transport = transport

def open(self):
self._log.info("Websocket open")
self._log.info('Websocket open')
self._app_handler.send_opening()
with WebSocketHandler.listen_lock:
Expand Down

0 comments on commit 7f55157

Please sign in to comment.