Skip to content

Commit

Permalink
[KVStore] Made kvstore can support multiple NICs (dmlc#1150)
Browse files Browse the repository at this point in the history
* API change of kvstore

* add demo for kvstore

* update

* remove duplicated log

* change queue size

* update

* update

* update

* update

* update

* update

* update

* update

* update

* fix lint

* change name

* update

* fix lint

* update

* update

* update

* update

* change message queue size to a python argument

* change default queue size to 2GB

* OMP_NUM_THREADS=1

* add multiple NICs support for kvstore

* test

* fix lint

* update

* update

* update

* update

* update

* update

* update

* fix lint

* fix lint

* update

* update

* update

* update
  • Loading branch information
aksnzhy authored Dec 30, 2019
1 parent f818415 commit 4f02bb7
Show file tree
Hide file tree
Showing 10 changed files with 320 additions and 111 deletions.
26 changes: 21 additions & 5 deletions examples/mxnet/dis_kvstore/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,52 @@
import dgl
import argparse
import mxnet as mx
import time

ID = []
ID.append(mx.nd.array([0,1], dtype='int64'))
ID.append(mx.nd.array([2,3], dtype='int64'))
ID.append(mx.nd.array([4,5], dtype='int64'))
ID.append(mx.nd.array([6,7], dtype='int64'))

DATA = []
DATA.append(mx.nd.array([[1.,1.,1.,],[1.,1.,1.,]]))
DATA.append(mx.nd.array([[2.,2.,2.,],[2.,2.,2.,]]))
DATA.append(mx.nd.array([[3.,3.,3.,],[3.,3.,3.,]]))
DATA.append(mx.nd.array([[4.,4.,4.,],[4.,4.,4.,]]))

edata_partition_book = {'edata':mx.nd.array([0,0,1,1,2,2,3,3], dtype='int64')}
ndata_partition_book = {'ndata':mx.nd.array([0,0,1,1,2,2,3,3], dtype='int64')}

def start_client():

time.sleep(3)

client = dgl.contrib.start_client(ip_config='ip_config.txt',
ndata_partition_book=ndata_partition_book,
edata_partition_book=edata_partition_book)

client.push(name='edata', id_tensor=ID[client.get_id()], data_tensor=mx.nd.array([[1.,1.,1.],[1.,1.,1.]]))
client.push(name='ndata', id_tensor=ID[client.get_id()], data_tensor=mx.nd.array([[2.,2.,2.],[2.,2.,2.]]))

tensor_edata = client.pull(name='edata', id_tensor=mx.nd.array([0,1,2,3,4,5,6,7], dtype='int64'))
tensor_ndata = client.pull(name='ndata', id_tensor=mx.nd.array([0,1,2,3,4,5,6,7], dtype='int64'))

print(tensor_edata)
client.barrier()

print(tensor_ndata)
client.barrier()

client.push(name='edata', id_tensor=ID[client.get_id()], data_tensor=DATA[client.get_id()])
client.push(name='ndata', id_tensor=ID[client.get_id()], data_tensor=DATA[client.get_id()])

client.barrier()

tensor_edata = client.pull(name='edata', id_tensor=mx.nd.array([0,1,2,3,4,5,6,7], dtype='int64'))
tensor_ndata = client.pull(name='ndata', id_tensor=mx.nd.array([0,1,2,3,4,5,6,7], dtype='int64'))

print(tensor_edata)

client.barrier()

print(tensor_ndata)

client.barrier()

if client.get_id() == 0:
Expand Down
10 changes: 8 additions & 2 deletions examples/mxnet/dis_kvstore/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,20 @@
edata_g2l.append({'edata':mx.nd.array([0,0,0,0,0,1,0,0], dtype='int64')})
edata_g2l.append({'edata':mx.nd.array([0,0,0,0,0,0,0,1], dtype='int64')})

DATA = []
DATA.append(mx.nd.array([[4.,4.,4.,],[4.,4.,4.,]]))
DATA.append(mx.nd.array([[3.,3.,3.,],[3.,3.,3.,]]))
DATA.append(mx.nd.array([[2.,2.,2.,],[2.,2.,2.,]]))
DATA.append(mx.nd.array([[1.,1.,1.,],[1.,1.,1.,]]))

def start_server(args):

dgl.contrib.start_server(
server_id=args.id,
ip_config='ip_config.txt',
num_client=4,
ndata={'ndata':mx.nd.array([[0.,0.,0.],[0.,0.,0.]])},
edata={'edata':mx.nd.array([[0.,0.,0.],[0.,0.,0.]])},
ndata={'ndata':DATA[args.id]},
edata={'edata':DATA[args.id]},
ndata_g2l=ndata_g2l[args.id],
edata_g2l=edata_g2l[args.id])

Expand Down
22 changes: 18 additions & 4 deletions examples/pytorch/dis_kvstore/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@
ID.append(th.tensor([4,5]))
ID.append(th.tensor([6,7]))

DATA = []
DATA.append(th.tensor([[1.,1.,1.,],[1.,1.,1.,]]))
DATA.append(th.tensor([[2.,2.,2.,],[2.,2.,2.,]]))
DATA.append(th.tensor([[3.,3.,3.,],[3.,3.,3.,]]))
DATA.append(th.tensor([[4.,4.,4.,],[4.,4.,4.,]]))

edata_partition_book = {'edata':th.tensor([0,0,1,1,2,2,3,3])}
ndata_partition_book = {'ndata':th.tensor([0,0,1,1,2,2,3,3])}

Expand All @@ -20,20 +26,28 @@ def start_client():
ndata_partition_book=ndata_partition_book,
edata_partition_book=edata_partition_book)

client.push(name='edata', id_tensor=ID[client.get_id()], data_tensor=th.tensor([[1.,1.,1.],[1.,1.,1.]]))
client.push(name='ndata', id_tensor=ID[client.get_id()], data_tensor=th.tensor([[2.,2.,2.],[2.,2.,2.]]))

tensor_edata = client.pull(name='edata', id_tensor=th.tensor([0,1,2,3,4,5,6,7]))
tensor_ndata = client.pull(name='ndata', id_tensor=th.tensor([0,1,2,3,4,5,6,7]))

print(tensor_edata)
client.barrier()

print(tensor_ndata)
client.barrier()

client.push(name='edata', id_tensor=ID[client.get_id()], data_tensor=DATA[client.get_id()])
client.push(name='ndata', id_tensor=ID[client.get_id()], data_tensor=DATA[client.get_id()])

client.barrier()

tensor_edata = client.pull(name='edata', id_tensor=th.tensor([0,1,2,3,4,5,6,7]))
tensor_ndata = client.pull(name='ndata', id_tensor=th.tensor([0,1,2,3,4,5,6,7]))

print(tensor_edata)

client.barrier()

print(tensor_ndata)

client.barrier()

if client.get_id() == 0:
Expand Down
8 changes: 4 additions & 4 deletions examples/pytorch/dis_kvstore/ip_config.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
0 127.0.0.1 40050
1 127.0.0.1 40051
2 127.0.0.1 40052
3 127.0.0.1 40053
0 127.0.0.1 50050
1 127.0.0.1 50051
2 127.0.0.1 50052
3 127.0.0.1 50053
10 changes: 8 additions & 2 deletions examples/pytorch/dis_kvstore/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,20 @@
edata_g2l.append({'edata':th.tensor([0,0,0,0,0,1,0,0])})
edata_g2l.append({'edata':th.tensor([0,0,0,0,0,0,0,1])})

DATA = []
DATA.append(th.tensor([[4.,4.,4.,],[4.,4.,4.,]]))
DATA.append(th.tensor([[3.,3.,3.,],[3.,3.,3.,]]))
DATA.append(th.tensor([[2.,2.,2.,],[2.,2.,2.,]]))
DATA.append(th.tensor([[1.,1.,1.,],[1.,1.,1.,]]))

def start_server(args):

dgl.contrib.start_server(
server_id=args.id,
ip_config='ip_config.txt',
num_client=4,
ndata={'ndata':th.tensor([[0.,0.,0.],[0.,0.,0.]])},
edata={'edata':th.tensor([[0.,0.,0.],[0.,0.,0.]])},
ndata={'ndata':DATA[args.id]},
edata={'edata':DATA[args.id]},
ndata_g2l=ndata_g2l[args.id],
edata_g2l=edata_g2l[args.id])

Expand Down
51 changes: 40 additions & 11 deletions python/dgl/contrib/dis_kvstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@
from ..network import _send_kv_msg, _recv_kv_msg
from ..network import KVMsgType, KVStoreMsg

from .. import backend as F
from .._ffi.ndarray import empty_shared_mem

import os
import numpy as np
import dgl.backend as F
import socket
if os.name != 'nt':
import fcntl
import struct

def read_ip_config(filename):
"""Read networking configuration from file.
Expand Down Expand Up @@ -551,7 +555,7 @@ def connect(self):
# find local server nodes
for ID, addr in self._server_namebook.items():
server_ip, server_port = addr.split(':')
if client_ip == server_ip or server_ip == '127.0.0.1':
if server_ip in self._ip4_addr_list():
self._local_server_id.add(ID)

# send addr to server nodes
Expand Down Expand Up @@ -583,8 +587,8 @@ def connect(self):
for server_id in self._local_server_id:
shared_data = empty_shared_mem(tensor_name+str(server_id), False, shape, dtype)
dlpack = shared_data.to_dlpack()
self._data_store[tensor_name] = F.zerocopy_from_dlpack(dlpack)
self._has_data.add(tensor_name)
self._data_store[tensor_name+str(server_id)] = F.zerocopy_from_dlpack(dlpack)
self._has_data.add(tensor_name+str(server_id))


def push(self, name, id_tensor, data_tensor):
Expand Down Expand Up @@ -622,11 +626,11 @@ def push(self, name, id_tensor, data_tensor):
partial_data = data_tensor[start:end]

if server[idx] in self._local_server_id and self._close_shared_mem == False:
if (name+'-g2l-' in self._has_data) == True:
local_id = self._data_store[name+'-g2l-'][partial_id]
if (name+'-g2l-'+str(server[idx]) in self._has_data) == True:
local_id = self._data_store[name+'-g2l-'+str(server[idx])][partial_id]
else:
local_id = partial_id
self._push_handler(name+'-data-', local_id, data_tensor, self._data_store)
self._push_handler(name+'-data-'+str(server[idx]), local_id, data_tensor, self._data_store)
else:
msg = KVStoreMsg(
type=KVMsgType.PUSH,
Expand All @@ -642,7 +646,7 @@ def push(self, name, id_tensor, data_tensor):
def pull(self, name, id_tensor):
"""Pull message from KVServer.
Parameters
Parameters
----------
name : str
data name
Expand Down Expand Up @@ -676,11 +680,11 @@ def pull(self, name, id_tensor):
partial_id = id_tensor[start:end]

if server[idx] in self._local_server_id and self._close_shared_mem == False:
if (name+'-g2l-' in self._has_data) == True:
local_id = self._data_store[name+'-g2l-'][partial_id]
if (name+'-g2l-'+str(server[idx]) in self._has_data) == True:
local_id = self._data_store[name+'-g2l-'+str(server[idx])][partial_id]
else:
local_id = partial_id
local_data[server[idx]] = self._pull_handler(name+'-data-', local_id, self._data_store)
local_data[server[idx]] = self._pull_handler(name+'-data-'+str(server[idx]), local_id, self._data_store)
else:
msg = KVStoreMsg(
type=KVMsgType.PULL,
Expand Down Expand Up @@ -798,6 +802,31 @@ def _get_local_addr(self):
return IP + ':' + str(port)


def _get_ip_address(self, NICname):
"""Return IP by given a NIC name
"""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
return socket.inet_ntoa(fcntl.ioctl(
s.fileno(),
0x8915, # SIOCGIFADDR
struct.pack('256s', NICname[:15].encode("UTF-8"))
)[20:24])


def _ip4_addr_list(self):
"""Return a set of IPv4 address
"""
nic = set()

for ix in socket.if_nameindex():
name = ix[1]
ip = self._get_ip_address(name)

nic.add(ip)

return nic


def _takeId(self, elem):
"""Used by sort
"""
Expand Down
4 changes: 4 additions & 0 deletions tests/mxnet/ip_config.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
0 127.0.0.1 50050
1 127.0.0.1 50051
2 127.0.0.1 50052
3 127.0.0.1 50053
119 changes: 119 additions & 0 deletions tests/mxnet/test_kvstore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import dgl
import argparse
import mxnet as mx
import time
import backend as F

from multiprocessing import Process

ID = []
ID.append(mx.nd.array([0,1], dtype='int64'))
ID.append(mx.nd.array([2,3], dtype='int64'))
ID.append(mx.nd.array([4,5], dtype='int64'))
ID.append(mx.nd.array([6,7], dtype='int64'))

DATA = []
DATA.append(mx.nd.array([[1.,1.,1.,],[1.,1.,1.,]]))
DATA.append(mx.nd.array([[2.,2.,2.,],[2.,2.,2.,]]))
DATA.append(mx.nd.array([[3.,3.,3.,],[3.,3.,3.,]]))
DATA.append(mx.nd.array([[4.,4.,4.,],[4.,4.,4.,]]))

edata_partition_book = {'edata':mx.nd.array([0,0,1,1,2,2,3,3], dtype='int64')}
ndata_partition_book = {'ndata':mx.nd.array([0,0,1,1,2,2,3,3], dtype='int64')}

ndata_g2l = []
edata_g2l = []

ndata_g2l.append({'ndata':mx.nd.array([0,1,0,0,0,0,0,0], dtype='int64')})
ndata_g2l.append({'ndata':mx.nd.array([0,0,0,1,0,0,0,0], dtype='int64')})
ndata_g2l.append({'ndata':mx.nd.array([0,0,0,0,0,1,0,0], dtype='int64')})
ndata_g2l.append({'ndata':mx.nd.array([0,0,0,0,0,0,0,1], dtype='int64')})

edata_g2l.append({'edata':mx.nd.array([0,1,0,0,0,0,0,0], dtype='int64')})
edata_g2l.append({'edata':mx.nd.array([0,0,0,1,0,0,0,0], dtype='int64')})
edata_g2l.append({'edata':mx.nd.array([0,0,0,0,0,1,0,0], dtype='int64')})
edata_g2l.append({'edata':mx.nd.array([0,0,0,0,0,0,0,1], dtype='int64')})

def start_client(flag):
time.sleep(3)

client = dgl.contrib.start_client(ip_config='ip_config.txt',
ndata_partition_book=ndata_partition_book,
edata_partition_book=edata_partition_book,
close_shared_mem=flag)

client.push(name='edata', id_tensor=ID[client.get_id()], data_tensor=DATA[client.get_id()])
client.push(name='ndata', id_tensor=ID[client.get_id()], data_tensor=DATA[client.get_id()])

client.barrier()

tensor_edata = client.pull(name='edata', id_tensor=mx.nd.array([0,1,2,3,4,5,6,7], dtype='int64'))
tensor_ndata = client.pull(name='ndata', id_tensor=mx.nd.array([0,1,2,3,4,5,6,7], dtype='int64'))


target_tensor = mx.nd.array([[1., 1., 1.],
[1., 1., 1.],
[2., 2., 2.],
[2., 2., 2.],
[3., 3., 3.],
[3., 3., 3.],
[4., 4., 4.],
[4., 4., 4.]])

assert F.array_equal(tensor_edata, target_tensor)

assert F.array_equal(tensor_ndata, target_tensor)

client.barrier()

if client.get_id() == 0:
client.shut_down()

def start_server(server_id, num_client):

dgl.contrib.start_server(
server_id=server_id,
ip_config='ip_config.txt',
num_client=num_client,
ndata={'ndata':mx.nd.array([[0.,0.,0.],[0.,0.,0.]])},
edata={'edata':mx.nd.array([[0.,0.,0.],[0.,0.,0.]])},
ndata_g2l=ndata_g2l[server_id],
edata_g2l=edata_g2l[server_id])

if __name__ == '__main__':

# server process
p0 = Process(target=start_server, args=(0, 4))
p1 = Process(target=start_server, args=(1, 4))
p2 = Process(target=start_server, args=(2, 4))
p3 = Process(target=start_server, args=(3, 4))

# client process
p4 = Process(target=start_client, args=(True,))
p5 = Process(target=start_client, args=(True,))
p6 = Process(target=start_client, args=(False,))
p7 = Process(target=start_client, args=(False,))


# start server process
p0.start()
p1.start()
p2.start()
p3.start()

# start client process
p4.start()
p5.start()
p6.start()
p7.start()


p0.join()
p1.join()
p2.join()
p3.join()

p4.join()
p5.join()
p6.join()
p7.join()
Loading

0 comments on commit 4f02bb7

Please sign in to comment.