Skip to content

Commit

Permalink
[KVstore] Improve user infterface (dmlc#1454)
Browse files Browse the repository at this point in the history
* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update

* update
  • Loading branch information
aksnzhy authored Apr 20, 2020
1 parent 1abe87f commit 5efd1c4
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 56 deletions.
149 changes: 93 additions & 56 deletions python/dgl/contrib/dis_kvstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ def read_ip_config(filename):
[server_id]:[machine_id, ip, port, group_count]
{0:'[0, 172.31.40.143, 30050, 2],
1:'[0, 172.31.40.143, 30051, 2],
2:'[1, 172.31.36.140, 30050, 2],
3:'[1, 172.31.36.140, 30051, 2],
4:'[2, 172.31.47.147, 30050, 2],
5:'[2, 172.31.47.147, 30051, 2],
6:'[3, 172.31.30.180, 30050, 2],
7:'[3, 172.31.30.180, 30051, 2]}
{0:[0, '172.31.40.143', 30050, 2],
1:[0, '172.31.40.143', 30051, 2],
2:[1, '172.31.36.140', 30050, 2],
3:[1, '172.31.36.140', 30051, 2],
4:[2, '172.31.47.147', 30050, 2],
5:[2, '172.31.47.147', 30051, 2],
6:[3, '172.31.30.180', 30050, 2],
7:[3, '172.31.30.180', 30051, 2]}
"""
assert len(filename) > 0, 'filename cannot be empty.'

Expand Down Expand Up @@ -189,23 +189,61 @@ def set_global2local(self, name, global2local=None):
self._data_store[name+'-g2l-'] = F.zerocopy_from_dlpack(dlpack)
self._data_store[name+'-g2l-'][:] = global2local[:]
# write data information to temp file that can be read by other processes
self._write_data_shape(name+'-g2l-shape', global2local)
self._open_file_list.append(name+'-g2l-shape')
self._write_data_shape(name+'-g2l-shape-'+str(self._machine_id), global2local)
self._open_file_list.append(name+'-g2l-shape-'+str(self._machine_id))
else: # Read shared-tensor
while True:
if (os.path.exists(name+'-g2l-shape')):
if (os.path.exists(name+'-g2l-shape-'+str(self._machine_id))):
time.sleep(2) # wait writing finish
break
else:
time.sleep(2) # wait until the file been created
data_shape = self._read_data_shape(name+'-g2l-shape')
data_shape = self._read_data_shape(name+'-g2l-shape-'+str(self._machine_id))
shared_data = empty_shared_mem(name+'-g2l-', False, data_shape, 'int64')
dlpack = shared_data.to_dlpack()
self._data_store[name+'-g2l-'] = F.zerocopy_from_dlpack(dlpack)

self._has_data.add(name+'-g2l-')


def set_partition_book(self, name, partition_book=None):
"""Partition book contains the data mapping of global ID to machine ID.
Parameters
----------
name : str
data name
partition_book : list or tensor (mx.ndarray or torch.tensor)
Mapping global ID to target machine ID.
Note that, if the partition_book is None KVClient will read shared-tensor by name.
"""
assert len(name) > 0, 'name connot be empty.'

if partition_book is not None: # Create shared-tensor
if isinstance(partition_book, list):
partition_book = F.tensor(partition_book)
shared_data = empty_shared_mem(name+'-part-', True, partition_book.shape, 'int64')
dlpack = shared_data.to_dlpack()
self._data_store[name+'-part-'] = F.zerocopy_from_dlpack(dlpack)
self._data_store[name+'-part-'][:] = partition_book[:]
self._write_data_shape(name+'-part-shape-'+str(self._machine_id), partition_book)
self._open_file_list.append(name+'-part-shape-'+str(self._machine_id))
else: # Read shared-tensor
while True:
if (os.path.exists(name+'-part-shape-'+str(self._machine_id))):
time.sleep(2) # wait writing finish
break
else:
time.sleep(2) # wait until the file been created
data_shape = self._read_data_shape(name+'-part-shape-'+str(self._machine_id))
shared_data = empty_shared_mem(name+'-part-', False, data_shape, 'int64')
dlpack = shared_data.to_dlpack()
self._data_store[name+'-part-'] = F.zerocopy_from_dlpack(dlpack)

self._has_data.add(name+'-part-')


def init_data(self, name, data_tensor=None):
"""Initialize data tensor on KVServe.
Expand All @@ -225,15 +263,15 @@ def init_data(self, name, data_tensor=None):
dlpack = shared_data.to_dlpack()
self._data_store[name+'-data-'] = F.zerocopy_from_dlpack(dlpack)
self._data_store[name+'-data-'][:] = data_tensor[:]
self._write_data_shape(name+'-data-shape', data_tensor)
self._open_file_list.append(name+'-data-shape')
self._write_data_shape(name+'-data-shape-'+str(self._machine_id), data_tensor)
self._open_file_list.append(name+'-data-shape-'+str(self._machine_id))
else: # Read shared-tensor
while True:
if (os.path.exists(name+'-data-shape')):
if (os.path.exists(name+'-data-shape-'+str(self._machine_id))):
break
else:
time.sleep(2) # wait until the file been created
data_shape = self._read_data_shape(name+'-data-shape')
data_shape = self._read_data_shape(name+'-data-shape-'+str(self._machine_id))
shared_data = empty_shared_mem(name+'-data-', False, data_shape, 'float32')
dlpack = shared_data.to_dlpack()
self._data_store[name+'-data-'] = F.zerocopy_from_dlpack(dlpack)
Expand Down Expand Up @@ -592,6 +630,7 @@ def __init__(self, server_namebook, queue_size=20*1024*1024*1024, net_type='sock
self._has_data = set()
# This is used to store local data, which can share memory with local KVServer.
self._data_store = {}
self._data_name_list = []
# Server information
self._server_namebook = server_namebook
self._server_count = len(server_namebook)
Expand Down Expand Up @@ -629,44 +668,6 @@ def __del__(self):
os.remove(file)


def set_partition_book(self, name, partition_book=None):
"""Partition book contains the data mapping of global ID to machine ID.
Parameters
----------
name : str
data name
partition_book : list or tensor (mx.ndarray or torch.tensor)
Mapping global ID to target machine ID.
Note that, if the partition_book is None KVClient will read shared-tensor by name.
"""
assert len(name) > 0, 'name connot be empty.'

if partition_book is not None: # Create shared-tensor
if isinstance(partition_book, list):
partition_book = F.tensor(partition_book)
shared_data = empty_shared_mem(name+'-part-', True, partition_book.shape, 'int64')
dlpack = shared_data.to_dlpack()
self._data_store[name+'-part-'] = F.zerocopy_from_dlpack(dlpack)
self._data_store[name+'-part-'][:] = partition_book[:]
self._write_data_shape(name+'-part-shape', partition_book)
self._open_file_list.append(name+'-part-shape')
else: # Read shared-tensor
while True:
if (os.path.exists(name+'-part-shape')):
time.sleep(2) # wait writing finish
break
else:
time.sleep(2) # wait until the file been created
data_shape = self._read_data_shape(name+'-part-shape')
shared_data = empty_shared_mem(name+'-part-', False, data_shape, 'int64')
dlpack = shared_data.to_dlpack()
self._data_store[name+'-part-'] = F.zerocopy_from_dlpack(dlpack)

self._has_data.add(name+'-part-')


def connect(self):
"""Connect to all the KVServer nodes
Expand Down Expand Up @@ -715,15 +716,17 @@ def connect(self):
if data != '':
tensor_name, dtype = self._deserialize_shared_tensor(data)
while True:
if (os.path.exists(tensor_name+'shape')):
if (os.path.exists(tensor_name+'shape-'+str(self._machine_id))):
time.sleep(2) # wait writing finish
break
else:
time.sleep(2) # wait until the file been created
shape = self._read_data_shape(tensor_name+'shape')
shape = self._read_data_shape(tensor_name+'shape-'+str(self._machine_id))
shared_data = empty_shared_mem(tensor_name, False, shape, dtype)
dlpack = shared_data.to_dlpack()
self._data_store[tensor_name] = F.zerocopy_from_dlpack(dlpack)
if '-data-' in tensor_name:
self._data_name_list.append(tensor_name[0:-6])
self._has_data.add(tensor_name)

print("KVClient %d connect to kvstore successfully!" % self.get_id())
Expand Down Expand Up @@ -774,6 +777,40 @@ def get_machine_id(self):
return self._machine_id


def get_data_name_list(self):
"""Get all the data name
Return
------
list of str
name list
"""
return self._data_name_list


def get_data_meta(self, name):
"""Get meta data (data_type, data_shape, partition_book) of the target shared-tensor
Parameter
---------
name : str
data name
Return
------
tuple
(data_type, data_shape, partition_book)
"""
assert len(name) > 0, 'name cannot be empty.'
assert name + '-data-' in self._has_data, 'Data (%s) does not exist!' % name

data_type = F.dtype(self._data_store[name+'-data-'])
data_shape = F.shape(self._data_store[name+'-data-'])
partition_book = self._data_store[name+'-part-']

return (data_type, data_shape, partition_book)


def push(self, name, id_tensor, data_tensor):
"""Push data to KVServer.
Expand Down
73 changes: 73 additions & 0 deletions tests/compute/test_kvstore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import backend as F
import numpy as np
import scipy as sp
import dgl
from dgl import utils
from dgl.contrib import KVServer
from dgl.contrib import KVClient
from numpy.testing import assert_array_equal

import os
import time

num_entries = 10
dim_size = 3

server_namebook = {0:[0, '127.0.0.1', 30070, 1]}

data_0 = F.zeros((num_entries, dim_size), F.float32, F.cpu())
g2l_0 = F.arange(0, num_entries)
partition_0 = F.zeros(num_entries, F.int64, F.cpu())

data_1 = F.zeros((num_entries*2, dim_size), F.float32, F.cpu())
g2l_1 = F.arange(0, num_entries*2)
partition_1 = F.zeros(num_entries*2, F.int64, F.cpu())

def start_server():
my_server = KVServer(server_id=0, server_namebook=server_namebook, num_client=1)

my_server.set_global2local(name='data_0', global2local=g2l_0)
my_server.set_global2local(name='data_1', global2local=g2l_1)
my_server.set_partition_book(name='data_0', partition_book=partition_0)
my_server.set_partition_book(name='data_1', partition_book=partition_1)
my_server.init_data(name='data_0', data_tensor=data_0)
my_server.init_data(name='data_1', data_tensor=data_1)

my_server.start()


def start_client():
my_client = KVClient(server_namebook=server_namebook)
my_client.connect()

name_list = my_client.get_data_name_list()
assert len(name_list) == 2
assert 'data_0' in name_list
assert 'data_1' in name_list

meta_0 = my_client.get_data_meta('data_0')
assert meta_0[0] == F.float32
assert_array_equal(meta_0[2], partition_0)

meta_1 = my_client.get_data_meta('data_1')
assert meta_1[0] == F.float32
assert_array_equal(meta_1[2], partition_1)

my_client.push(name='data_0', id_tensor=F.tensor([0, 1, 2]), data_tensor=F.tensor([[1.,1.,1.],[2.,2.,2.],[3.,3.,3.]]))

res = my_client.pull(name='data_0', id_tensor=F.tensor([0, 1, 2]))

target = F.tensor([[1.,1.,1.],[2.,2.,2.],[3.,3.,3.]])

assert_array_equal(res, target)

my_client.shut_down()


if __name__ == '__main__':
pid = os.fork()
if pid == 0:
start_server()
else:
time.sleep(2) # wait trainer start
start_client()

0 comments on commit 5efd1c4

Please sign in to comment.