Skip to content

Commit

Permalink
Major improvements to master-worker mode
Browse files Browse the repository at this point in the history
* Fixed all undefined symbol errors
* Implemented storage interface and THStorage class
* RPC improvements
* Code refactor
  • Loading branch information
apaszke committed Jan 31, 2017
1 parent d53eb52 commit 60d1852
Show file tree
Hide file tree
Showing 41 changed files with 1,290 additions and 254 deletions.
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ def run(self):
extra_compile_args += ['-DWITH_DISTRIBUTED']
main_sources += [
"torch/csrc/distributed/Module.cpp",
"torch/csrc/distributed/Tensor.cpp",
"torch/csrc/distributed/Storage.cpp",
"torch/csrc/distributed/utils.cpp"
]
include_dirs += [tmp_install_path + "/include/THD"]
Expand Down
39 changes: 39 additions & 0 deletions torch/csrc/Module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,24 @@ bool THCSPShortTensor_init(PyObject *module);
bool THCSPCharTensor_init(PyObject *module);
bool THCSPByteTensor_init(PyObject *module);

bool THDPDoubleStorage_init(PyObject *module);
bool THDPFloatStorage_init(PyObject *module);
//bool THDPHalfStorage_init(PyObject *module);
bool THDPLongStorage_init(PyObject *module);
bool THDPIntStorage_init(PyObject *module);
bool THDPShortStorage_init(PyObject *module);
bool THDPCharStorage_init(PyObject *module);
bool THDPByteStorage_init(PyObject *module);

bool THDPDoubleTensor_init(PyObject *module);
bool THDPFloatTensor_init(PyObject *module);
//bool THDPHalfTensor_init(PyObject *module);
bool THDPLongTensor_init(PyObject *module);
bool THDPIntTensor_init(PyObject *module);
bool THDPShortTensor_init(PyObject *module);
bool THDPCharTensor_init(PyObject *module);
bool THDPByteTensor_init(PyObject *module);

static std::vector<PyMethodDef> methods;

#ifdef WITH_DISTRIBUTED
Expand Down Expand Up @@ -811,6 +829,27 @@ PyMODINIT_FUNC PyInit__C()
Py_INCREF(has_cudnn);
ASSERT_TRUE(PyModule_AddObject(module, "has_cudnn", has_cudnn) == 0);

#ifdef WITH_DISTRIBUTED
// See comment on CUDA objects
ASSERT_TRUE(THDPDoubleStorage_init(module));
ASSERT_TRUE(THDPFloatStorage_init(module));
//ASSERT_TRUE(THDPHalfStorage_init(module));
ASSERT_TRUE(THDPLongStorage_init(module));
ASSERT_TRUE(THDPIntStorage_init(module));
ASSERT_TRUE(THDPShortStorage_init(module));
ASSERT_TRUE(THDPCharStorage_init(module));
ASSERT_TRUE(THDPByteStorage_init(module));

ASSERT_TRUE(THDPDoubleTensor_init(module));
ASSERT_TRUE(THDPFloatTensor_init(module));
//ASSERT_TRUE(THDPHalfTensor_init(module));
ASSERT_TRUE(THDPLongTensor_init(module));
ASSERT_TRUE(THDPIntTensor_init(module));
ASSERT_TRUE(THDPShortTensor_init(module));
ASSERT_TRUE(THDPCharTensor_init(module));
ASSERT_TRUE(THDPByteTensor_init(module));
#endif

THPDefaultGenerator = (THPGenerator*)THPGenerator_New();
ASSERT_TRUE(THPDefaultGenerator != nullptr);
ASSERT_TRUE(PyModule_AddObject(module, "default_generator", (PyObject*)THPDefaultGenerator) == 0);
Expand Down
44 changes: 40 additions & 4 deletions torch/csrc/distributed/Module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,31 @@ static std::unordered_map<std::string, THDChannelType> name2channel_type = {
{"tcp", THDChannelTCP},
};

static bool THDPModule_loadClasses(PyObject *module_dict)
{
#define ASSERT_NOT_NULL(ptr) if (!(ptr)) { THPUtils_setError("couldn't load classes"); return false; }
ASSERT_NOT_NULL(THDPDoubleStorageClass = PyMapping_GetItemString(module_dict, (char*)"DoubleStorage"));
ASSERT_NOT_NULL(THDPFloatStorageClass = PyMapping_GetItemString(module_dict, (char*)"FloatStorage"));
//ASSERT_NOT_NULL(THDPHalfStorageClass = PyMapping_GetItemString(module_dict, (char*)"HalfStorage"));
ASSERT_NOT_NULL(THDPLongStorageClass = PyMapping_GetItemString(module_dict, (char*)"LongStorage"));
ASSERT_NOT_NULL(THDPIntStorageClass = PyMapping_GetItemString(module_dict, (char*)"IntStorage"));
ASSERT_NOT_NULL(THDPShortStorageClass = PyMapping_GetItemString(module_dict, (char*)"ShortStorage"));
ASSERT_NOT_NULL(THDPCharStorageClass = PyMapping_GetItemString(module_dict, (char*)"CharStorage"));
ASSERT_NOT_NULL(THDPByteStorageClass = PyMapping_GetItemString(module_dict, (char*)"ByteStorage"));

ASSERT_NOT_NULL(THDPDoubleTensorClass = PyMapping_GetItemString(module_dict, (char*)"DoubleTensor"));
//ASSERT_NOT_NULL(THDPHalfTensorClass = PyMapping_GetItemString(module_dict, (char*)"HalfTensor"));
ASSERT_NOT_NULL(THDPFloatTensorClass = PyMapping_GetItemString(module_dict, (char*)"FloatTensor"));
ASSERT_NOT_NULL(THDPLongTensorClass = PyMapping_GetItemString(module_dict, (char*)"LongTensor"));
ASSERT_NOT_NULL(THDPIntTensorClass = PyMapping_GetItemString(module_dict, (char*)"IntTensor"));
ASSERT_NOT_NULL(THDPShortTensorClass = PyMapping_GetItemString(module_dict, (char*)"ShortTensor"));
ASSERT_NOT_NULL(THDPCharTensorClass = PyMapping_GetItemString(module_dict, (char*)"CharTensor"));
ASSERT_NOT_NULL(THDPByteTensorClass = PyMapping_GetItemString(module_dict, (char*)"ByteTensor"));

return true;
#undef ASSERT_NOT_NULL
}

static std::unordered_map<PyObject*, THDReduceOp> obj2reduceop;
static std::unordered_map<PyObject*, THDGroup> obj2group;

Expand Down Expand Up @@ -230,13 +255,17 @@ PyObject* THDPModule_newGroup(PyObject *_unused, PyObject *args)
}

PyObject* THDPModule_initExtension(PyObject *_unused, PyObject *args) {
if (PyTuple_GET_SIZE(args) != 2) {
THPUtils_invalidArguments(args, "initExtension", 1, "(reduce_op obj, group obj)");
if (PyTuple_GET_SIZE(args) != 3) {
THPUtils_invalidArguments(args, "initExtension", 1, "(bool is_master_worker, reduce_op obj, group obj)");
return NULL;
}

PyObject* reduce_op_obj = PyTuple_GET_ITEM(args, 0);
PyObject* group_obj = PyTuple_GET_ITEM(args, 1);
PyObject* is_master_worker_obj = PyTuple_GET_ITEM(args, 0);
PyObject* reduce_op_obj = PyTuple_GET_ITEM(args, 1);
PyObject* group_obj = PyTuple_GET_ITEM(args, 2);

THPUtils_assert(PyBool_Check(is_master_worker_obj), "first argument should be a bool");
bool is_master_worker = is_master_worker_obj == Py_True;

THPObjectPtr reduce_op;
#define REGISTER_REDUCE_OP(NAME) \
Expand All @@ -256,6 +285,13 @@ PyObject* THDPModule_initExtension(PyObject *_unused, PyObject *args) {
obj2group.emplace(group.get(), THDGroup##NAME);
REGISTER_GROUP(WORLD);
#undef REGISTER_GROUP

if (is_master_worker) {
PyObject *module = PyImport_ImportModule("torch.distributed");
THPUtils_assert(module, "class loader couldn't access torch.distributed module");
PyObject* module_dict = PyModule_GetDict(module);
if (!THDPModule_loadClasses(module_dict)) return NULL;
}
Py_RETURN_TRUE;
}

Expand Down
2 changes: 1 addition & 1 deletion torch/csrc/distributed/Storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

#ifdef _THP_CORE
#define THDPStorageType TH_CONCAT_3(THDP,Real,StorageType)
#define THDPStorageBaseStr TH_CONCAT_STRING_3(Cuda,Real,StorageBase)
#define THDPStorageBaseStr TH_CONCAT_STRING_3(Distributed,Real,StorageBase)
#endif

#include "override_macros.h"
Expand Down
2 changes: 2 additions & 0 deletions torch/csrc/distributed/THDP.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#include "Module.h"
#include "Storage.h"
#include "Tensor.h"
#ifdef _THP_CORE
#include "utils.h"
#endif

#endif
6 changes: 3 additions & 3 deletions torch/csrc/distributed/Tensor.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

#ifdef _THP_CORE
#define THDPTensorType TH_CONCAT_3(THDP,Real,TensorType)
#define THDPTensorBaseStr TH_CONCAT_STRING_3(THDP,Real,TensorBase)
#define THDPTensorBaseStr TH_CONCAT_STRING_3(Distributed,Real,TensorBase)
#define THDPTensor_stateless_(NAME) TH_CONCAT_4(THDP,Real,Tensor_stateless_,NAME)
#define THDPTensorStatelessType TH_CONCAT_3(THDP,Real,TensorStatelessType)
#define THDPTensorStateless TH_CONCAT_3(THDP,Real,TensorStateless)
Expand All @@ -34,7 +34,7 @@

#include "override_macros.h"

#define TH_GENERIC_FILE "torch/csrc/generic/Tensor.h"
#include <TH/THGenerateAllTypes.h>
#define THD_GENERIC_FILE "torch/csrc/generic/Tensor.h"
#include <THD/base/THDGenerateAllTypes.h>

#endif
1 change: 1 addition & 0 deletions torch/csrc/distributed/override_macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#define LIBRARY_STATE_NOARGS
#define LIBRARY_STATE
#define TH_GENERIC_FILE THD_GENERIC_FILE

#define THHostTensor TH_CONCAT_3(TH,Real,Tensor)
#define THHostTensor_(NAME) TH_CONCAT_4(TH,Real,Tensor_,NAME)
Expand Down
8 changes: 6 additions & 2 deletions torch/csrc/distributed/utils.cpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
#include <Python.h>
#include "utils.h"
#include "THDP.h"

#include "override_macros.h"

template<>
void THPPointer<THDTensorDescriptor>::free() {
if (ptr)
THDTensorDescriptor_free(ptr);
}

template class THPPointer<THDTensorDescriptor>;

#define THD_GENERIC_FILE "torch/csrc/generic/utils.cpp"
#include <THD/base/THDGenerateAllTypes.h>
8 changes: 4 additions & 4 deletions torch/distributed/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def init_process_group(backend):
_initialized = True
import torch.distributed.collectives as collectives
extend_scope(collectives)
assert torch._C._dist_init_extension(reduce_op)
assert torch._C._dist_init_extension(False, reduce_op, group)


def init_master_worker(backend):
Expand All @@ -27,8 +27,8 @@ def init_master_worker(backend):
torch._C._dist_init_master_worker(backend)
_initialized = True
import torch.distributed.collectives as collectives
# import torch.distributed.remote_types as remote_types
import torch.distributed.remote_types as remote_types
extend_scope(collectives)
# extend_scope(remote_types)
assert torch._C._dist_init_extension(reduce_op)
extend_scope(remote_types)
assert torch._C._dist_init_extension(True, reduce_op, group)

16 changes: 8 additions & 8 deletions torch/distributed/remote_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ class CharStorage(_DistributedBase, torch._C.DistributedCharStorageBase, _Storag
pass
class ByteStorage(_DistributedBase, torch._C.DistributedByteStorageBase, _StorageBase):
pass
class HalfStorage(_DistributedBase, torch._C.DistributedHalfStorageBase, _StorageBase):
pass
# class HalfStorage(_DistributedBase, torch._C.DistributedHalfStorageBase, _StorageBase):
# pass

class DoubleTensor(_DistributedBase, torch._C.DistributedDoubleTensorBase, _TensorBase):
def is_signed(self):
Expand Down Expand Up @@ -78,12 +78,12 @@ def is_signed(self):
@classmethod
def storage_type(cls):
return ByteStorage
class HalfTensor(_DistributedBase, torch._C.DistributedHalfTensorBase, _TensorBase):
def is_signed(self):
return True
@classmethod
def storage_type():
return HalfStorage
# class HalfTensor(_DistributedBase, torch._C.DistributedHalfTensorBase, _TensorBase):
# def is_signed(self):
# return True
# @classmethod
# def storage_type():
# return HalfStorage

torch._storage_classes.add(DoubleStorage)
torch._storage_classes.add(FloatStorage)
Expand Down
25 changes: 23 additions & 2 deletions torch/lib/THD/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,24 @@
CMAKE_MINIMUM_REQUIRED(VERSION 2.6)
CMAKE_MINIMUM_REQUIRED(VERSION 3.0)
SET(CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/cmake ${CMAKE_MODULE_PATH})

################################################################################
# Helper functions
################################################################################

FUNCTION(EXCLUDE_DIR list_name dir_name)
# A helper that excludes all files that contain dir_name in their file path
SET(local_list ${${list_name}})
FOREACH(source ${local_list})
IF(${source} MATCHES ${dir_name})
MESSAGE(STATUS "Excluding " ${source} " from the build")
LIST(REMOVE_ITEM local_list ${source})
ENDIF()
ENDFOREACH()
SET(${list_name} ${local_list} PARENT_SCOPE)
ENDFUNCTION()

################################################################################


FIND_PACKAGE(ZMQ REQUIRED)
FIND_PACKAGE(CPPZMQ REQUIRED)
Expand Down Expand Up @@ -34,9 +52,13 @@ IF(NOT MPI_FOUND)
LIST(REMOVE_ITEM test_cpp "${CMAKE_CURRENT_SOURCE_DIR}/test/data_channel_mpi_smoke.cpp")
ENDIF()

EXCLUDE_DIR(master_worker_cpp ".*/dispatch/.*\\.cpp$")

SET(all_cpp ${base_cpp} ${process_group_cpp} ${master_worker_cpp})
SET(all_h THD.h ${base_h} ${process_group_h} ${master_worker_h})

EXCLUDE_DIR(all_cpp ".*/generic/.*\\.cpp$")

INCLUDE_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR})
ADD_LIBRARY(THD SHARED ${all_cpp})
SET_PROPERTY(TARGET THD PROPERTY CXX_STANDARD 11)
Expand Down Expand Up @@ -80,6 +102,5 @@ INSTALL(TARGETS THD

FOREACH(HEADER ${all_h})
STRING(REGEX MATCH "(.*)[/\\]" DIR ${HEADER})
MESSAGE(STATUS ${DIR})
INSTALL(FILES ${HEADER} DESTINATION ${THD_INSTALL_INCLUDE_DIR}/THD/${DIR})
ENDFOREACH()
42 changes: 42 additions & 0 deletions torch/lib/THD/base/Storage.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#pragma once

#include "Type.hpp"

#include <cstddef>
#include <cstdint>
#include <initializer_list>
#include <type_traits>
#include <unordered_map>
#include <vector>

namespace thd {

struct Storage {
Storage() {};
Storage(const Storage& other) = delete;
Storage(Storage&& other) = delete;
virtual ~Storage() {};

virtual std::size_t elementSize() const = 0;
virtual std::size_t size() const = 0;
virtual void* data() = 0;
virtual const void* data() const = 0;
virtual Storage& retain() = 0;
virtual Storage& free() = 0;

virtual Storage& resize(long new_size) = 0;

virtual thd::Type type() const = 0;
};

template<typename real>
struct StorageScalarInterface : public Storage {
using scalar_type = real;
virtual StorageScalarInterface& fill(scalar_type value) = 0;
};

using FloatStorage = StorageScalarInterface<double>;
using IntStorage = StorageScalarInterface<long long>;

} // namespace thd

Loading

0 comments on commit 60d1852

Please sign in to comment.