Skip to content

Commit

Permalink
Cleanup error reporting for ProcessGroupNCCL (pytorch#111979)
Browse files Browse the repository at this point in the history
Continuing some of the work from pytorch#108191, I realized majority of errors raised from ProcessGroupNCCL were just generic RuntimeError.

In this PR, I've added appropriate error types to all the exceptions raised from ProcessGroupNCCL.
Pull Request resolved: pytorch#111979
Approved by: https://github.com/fduwjj
  • Loading branch information
pritamdamania87 authored and pytorchmergebot committed Oct 26, 2023
1 parent 74adb4c commit b29c658
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 105 deletions.
3 changes: 3 additions & 0 deletions c10/util/Exception.h
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,9 @@ C10_API std::string GetExceptionString(const std::exception& e);
throw ::c10::err_type( \
{__func__, __FILE__, static_cast<uint32_t>(__LINE__)}, msg)

#define C10_BUILD_ERROR(err_type, msg) \
::c10::err_type({__func__, __FILE__, static_cast<uint32_t>(__LINE__)}, msg)

// Private helper macro for workaround MSVC misexpansion of nested macro
// invocations involving __VA_ARGS__. See
// https://stackoverflow.com/questions/5134523/msvc-doesnt-expand-va-args-correctly
Expand Down
2 changes: 1 addition & 1 deletion test/cpp/c10d/ProcessGroupNCCLErrorsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ TEST_F(ProcessGroupNCCLErrorsTest, testNCCLTimedoutErrorsBlocking) {
// Now run all reduce with errors.
pg.set_timedout_error();
work = pg.allreduce(tensors_);
EXPECT_THROW(work->wait(), std::runtime_error);
EXPECT_THROW(work->wait(), c10::DistBackendError);

// Communicators might be aborted here, further operations would fail.
}
Expand Down
38 changes: 19 additions & 19 deletions test/distributed/test_c10d_nccl.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ def tearDown(self):
def test_init_no_gpus(self):
store = c10d.FileStore(self.file.name, self.world_size)
with self.assertRaisesRegex(
RuntimeError, "ProcessGroupNCCL is only supported with GPUs, no GPUs found!"
ValueError, "ProcessGroupNCCL is only supported with GPUs, no GPUs found!"
):
c10d.ProcessGroupNCCL(store, self.rank, self.world_size)

Expand Down Expand Up @@ -407,7 +407,7 @@ def allreduce(tensors, op):
for op, err in zip((c10d.ReduceOp.BAND, c10d.ReduceOp.BOR, c10d.ReduceOp.BXOR),
("ReduceOp.BAND", "ReduceOp.BOR", "ReduceOp.BXOR")):
with self.assertRaisesRegex(
RuntimeError, "Cannot use " + err + " with NCCL"
ValueError, "Cannot use " + err + " with NCCL"
):
allreduce(tensors, op)

Expand Down Expand Up @@ -524,7 +524,7 @@ def reduce(xs, rootRank, rootTensor, op=None):
("ReduceOp.BAND", "ReduceOp.BOR", "ReduceOp.BXOR"),
):
with self.assertRaisesRegex(
RuntimeError, "Cannot use " + err + " with NCCL"
ValueError, "Cannot use " + err + " with NCCL"
):
reduce(tensors, self.rank, rt, op)

Expand Down Expand Up @@ -610,7 +610,7 @@ def allgather_base(output_t, input_t):

# anticipate an error
with self.assertRaisesRegex(
RuntimeError,
ValueError,
"output tensor size must be equal to world_size times input tensor size",
):
tensor = torch.tensor([self.rank]).cuda(local_device_id)
Expand All @@ -622,7 +622,7 @@ def allgather_base(output_t, input_t):

# anticipate an error
with self.assertRaisesRegex(
RuntimeError, "output tensor must have the same type as input tensor"
TypeError, "output tensor must have the same type as input tensor"
):
tensor = torch.tensor([self.rank], dtype=torch.float).cuda(local_device_id)
output_t = torch.empty((self.world_size + 1), dtype=torch.long).cuda(
Expand Down Expand Up @@ -731,15 +731,15 @@ def test_gather_checks(self):
for rank in range(self.world_size):
output_ts[idx].append(torch.tensor([-1]).cuda(gpu_idx))

with self.assertRaisesRegex(RuntimeError, "invalid root rank"):
with self.assertRaisesRegex(ValueError, "invalid root rank"):
opts = c10d.GatherOptions()
opts.rootRank = -1
pg.gather(output_ts, tensors, opts)

with self.assertRaisesRegex(TypeError, "incompatible function arguments"):
pg.gather(output_ts, tensors, 0)

with self.assertRaisesRegex(RuntimeError, "invalid root rank"):
with self.assertRaisesRegex(ValueError, "invalid root rank"):
opts = c10d.GatherOptions()
opts.rootRank = self.world_size
pg.gather(output_ts, tensors, opts)
Expand All @@ -753,7 +753,7 @@ def test_gather_checks(self):
pg.gather(output_ts, [], opts)

with self.assertRaisesRegex(
RuntimeError, "Tensors must be on distinct GPU devices"
ValueError, "Tensors must be on distinct GPU devices"
):
# init input
tensors2 = []
Expand Down Expand Up @@ -866,15 +866,15 @@ def test_scatter_checks(self):
for rank in range(self.world_size):
scatter_list[idx].append(torch.tensor([rank]).cuda(gpu_idx))

with self.assertRaisesRegex(RuntimeError, "invalid root rank"):
with self.assertRaisesRegex(ValueError, "invalid root rank"):
opts = c10d.ScatterOptions()
opts.rootRank = -1
pg.scatter(tensors, scatter_list, opts)

with self.assertRaisesRegex(TypeError, "incompatible function arguments"):
pg.scatter(tensors, scatter_list, 0)

with self.assertRaisesRegex(RuntimeError, "invalid root rank"):
with self.assertRaisesRegex(ValueError, "invalid root rank"):
opts = c10d.ScatterOptions()
opts.rootRank = self.world_size
pg.scatter(tensors, scatter_list, opts)
Expand All @@ -900,7 +900,7 @@ def reduce_scatter_base(output_t, input_t):

# anticipate an error
with self.assertRaisesRegex(
RuntimeError,
ValueError,
"input tensor must be the same size as output size times world size",
):
input_t = torch.tensor([self.rank]).cuda(local_device_id)
Expand All @@ -912,7 +912,7 @@ def reduce_scatter_base(output_t, input_t):

# anticipate an error
with self.assertRaisesRegex(
RuntimeError, "input tensor must be the same type as the output tensor."
TypeError, "input tensor must be the same type as the output tensor."
):
tensor = torch.tensor([self.rank], dtype=torch.float).cuda(local_device_id)
output_t = torch.empty((self.world_size + 1), dtype=torch.long).cuda(
Expand Down Expand Up @@ -1116,7 +1116,7 @@ def test_send_recv(self):
# Test with non-contiguous tensors.
send_tensor_view = send_tensor.t()
if self.rank == 0:
with self.assertRaisesRegex(RuntimeError, 'Tensors must be contiguous'):
with self.assertRaisesRegex(ValueError, 'Tensors must be contiguous'):
dist.send(send_tensor_view, 1)

@requires_nccl()
Expand Down Expand Up @@ -1243,13 +1243,13 @@ def test_nccl_propagate_error_reason(self):

if self.rank != 0:
# Time out due to rank 0 not calling into allreduce.
with self.assertRaises(RuntimeError):
with self.assertRaises(dist.DistBackendError):
pg.allreduce([inp]).wait(timedelta(seconds=5))

# Now when nonzero rank attempts to use communicator, original failure reason should be logged.j
try:
pg.allreduce([torch.ones(2).cuda(self.rank)]).wait()
except RuntimeError as e:
except dist.DistBackendError as e:
self.assertTrue("aborted" in str(e))
else:
self.fail("Expected error to be raised!")
Expand Down Expand Up @@ -2783,7 +2783,7 @@ def _test_nccl_errors_blocking(self, func):
process_group.allreduce(torch.rand(10).cuda(self.rank))
if self.rank == 0:
work = process_group.allreduce(torch.rand(10).cuda(self.rank))
with self.assertRaisesRegex(RuntimeError, self.blocking_wait_error_msg):
with self.assertRaisesRegex(dist.DistBackendError, self.blocking_wait_error_msg):
# Operation would time out in blocking mode.
work.wait(timeout=timedelta(seconds=self.op_timeout_sec))
# Run some GPU operations to make sure cuda has not gotten stuck.
Expand Down Expand Up @@ -2852,7 +2852,7 @@ def test_nccl_blocking_wait_with_barrier(self):
)
process_group.barrier().wait()
if self.rank == 0:
with self.assertRaisesRegex(RuntimeError, self.blocking_wait_error_msg):
with self.assertRaisesRegex(dist.DistBackendError, self.blocking_wait_error_msg):
# This should timeout
process_group.barrier().wait(timeout=timedelta(seconds=self.op_timeout_sec))

Expand Down Expand Up @@ -2890,7 +2890,7 @@ def test_nccl_timeout(self):
if self.rank == 0:
# This should timeout in about 1 second.
# Watchdog may abort timed out work resulting in NCCL error instead of operation timed out.
with self.assertRaisesRegex(RuntimeError, self.blocking_wait_error_msg):
with self.assertRaisesRegex(DistBackendError, self.blocking_wait_error_msg):
process_group.allreduce(torch.rand(10).cuda(self.rank)).wait(timeout=failed_collective_timeout)
# Now do a barrier to tell other rank to go ahead.
pg_gloo.barrier().wait()
Expand Down Expand Up @@ -3093,7 +3093,7 @@ def test_nccl_barrier_timeout(self):
store = c10d.FileStore(self.file_name, self.world_size)
if self.rank == 0:
with self.assertRaisesRegex(
RuntimeError, "Health check failure"
DistBackendError, "Health check failure"
):
c10d.init_process_group(
backend="nccl",
Expand Down
Loading

0 comments on commit b29c658

Please sign in to comment.