From 0d233346f84c1164aeeb8692615ba2600b6b437e Mon Sep 17 00:00:00 2001 From: co63oc Date: Thu, 19 Dec 2024 09:03:57 +0800 Subject: [PATCH] Fix --- .../fluid/operators/collective/recv_v2_op.cc | 10 - .../operators/collective/recv_v2_op.cu.cc | 307 ------------------ 2 files changed, 317 deletions(-) delete mode 100644 paddle/fluid/operators/collective/recv_v2_op.cu.cc diff --git a/paddle/fluid/operators/collective/recv_v2_op.cc b/paddle/fluid/operators/collective/recv_v2_op.cc index 3a9d84741c4105..8759217d0772b8 100644 --- a/paddle/fluid/operators/collective/recv_v2_op.cc +++ b/paddle/fluid/operators/collective/recv_v2_op.cc @@ -110,13 +110,3 @@ Reference: https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/usage/p2p.h namespace ops = paddle::operators; REGISTER_OP_WITHOUT_GRADIENT(recv_v2, ops::RecvOpV2, ops::RecvOpV2Maker); - -PD_REGISTER_STRUCT_KERNEL(recv_v2, - CPU, - ALL_LAYOUT, - ops::RecvOpV2CPUKernel, - float, - double, - int, - int64_t, - phi::dtype::float16) {} diff --git a/paddle/fluid/operators/collective/recv_v2_op.cu.cc b/paddle/fluid/operators/collective/recv_v2_op.cu.cc deleted file mode 100644 index d6fbfdf6f4eee9..00000000000000 --- a/paddle/fluid/operators/collective/recv_v2_op.cu.cc +++ /dev/null @@ -1,307 +0,0 @@ -/* Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. */ - -#include "paddle/fluid/operators/collective/recv_v2_op.h" - -#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) -#include "paddle/common/flags.h" -#include "paddle/fluid/platform/device/gpu/nccl_helper.h" -#include "paddle/phi/core/distributed/comm_context_manager.h" -#include "paddle/phi/core/distributed/nccl_comm_context.h" -#include "paddle/phi/core/platform/collective_helper.h" -COMMON_DECLARE_bool(dynamic_static_unified_comm); -#endif - -#include "paddle/fluid/distributed/collective/process_group.h" -#include "paddle/phi/api/include/tensor.h" - -namespace paddle { -namespace operators { - -#if (defined(PADDLE_WITH_RCCL) || defined(PADDLE_WITH_NCCL)) && \ - NCCL_VERSION_CODE >= 2703 -phi::DDim recv_shape_info(const phi::Place &place, - const gpuStream_t &stream, - platform::NCCLComm *comm, - phi::distributed::NCCLCommContext *comm_ctx, - const int &peer, - distributed::ProcessGroup *group) { - if (!group) { - PADDLE_ENFORCE_EQ( - ((stream != nullptr && comm != nullptr) || comm_ctx != nullptr), - true, - common::errors::InvalidArgument( - "NCCLComm and Stream should be provided if use NCCL " - "to send the shape info.")); - } - - phi::DataType shape_dtype = phi::DataType::INT32; - ncclDataType_t nccl_dtype = phi::ToNCCLDataType(shape_dtype); - - // step1: recv the shape size - phi::DenseTensor gpu_shape_size_tensor(shape_dtype); - if (!group) { - gpu_shape_size_tensor.Resize({1}); - gpu_shape_size_tensor.mutable_data(place, shape_dtype); - auto *gpu_data = gpu_shape_size_tensor.data(); - - if (comm_ctx) { - comm_ctx->Recv(&gpu_shape_size_tensor, 1, peer, stream); - } else { - PADDLE_ENFORCE_GPU_SUCCESS(phi::dynload::ncclRecv( - gpu_data, 1, nccl_dtype, peer, comm->comm(), stream)); - } - } - - // copy the shape size tensor to cpu - phi::DenseTensor *cpu_shape_size_tensor = new phi::DenseTensor(shape_dtype); - cpu_shape_size_tensor->Resize({1}); - cpu_shape_size_tensor->mutable_data(phi::CPUPlace(), shape_dtype); - if (group) { - std::vector shape_size_tensor; - shape_size_tensor.emplace_back(*cpu_shape_size_tensor); - auto shape_size_task = group->Recv(shape_size_tensor, peer); - } else { - framework::TensorCopySync( - gpu_shape_size_tensor, phi::CPUPlace(), cpu_shape_size_tensor); - } - auto *cpu_data = cpu_shape_size_tensor->data(); - int shape_size = cpu_data[0]; - VLOG(3) << "recv the shape size: " << shape_size << " from peer"; - - // step2: recv the shape - phi::DenseTensor gpu_shape_tensor(shape_dtype); - if (!group) { - gpu_shape_tensor.Resize({shape_size}); - gpu_shape_tensor.mutable_data(place, shape_dtype); - auto *gpu_shape_data = gpu_shape_tensor.data(); - if (comm_ctx) { - comm_ctx->Recv(&gpu_shape_tensor, shape_size, peer, stream); - } else { - PADDLE_ENFORCE_GPU_SUCCESS(phi::dynload::ncclRecv( - gpu_shape_data, shape_size, nccl_dtype, peer, comm->comm(), stream)); - } - } - - // copy the shape tensor to cpu - phi::DenseTensor *cpu_shape_tensor = new phi::DenseTensor(shape_dtype); - cpu_shape_tensor->Resize({shape_size}); - cpu_shape_tensor->mutable_data(phi::CPUPlace(), shape_dtype); - if (group) { - std::vector shape_tensor; - shape_tensor.emplace_back(*cpu_shape_tensor); - auto shape_task = group->Recv(shape_tensor, peer); - } else { - framework::TensorCopySync( - gpu_shape_tensor, phi::CPUPlace(), cpu_shape_tensor); - } - auto *cpu_shape_data = cpu_shape_tensor->data(); - std::vector all_shape; - for (int i = 0; i < shape_size; ++i) { - all_shape.emplace_back(cpu_shape_data[i]); - } - phi::DDim new_dim; - new_dim = new_dim.reshape(all_shape); - VLOG(3) << "recv the shape: (" << new_dim << ") from peer"; - - return new_dim; -} -#endif - -template -class RecvOpV2CUDAKernel : public framework::OpKernel { - public: - void Compute(const framework::ExecutionContext &ctx) const override { -#if (defined(PADDLE_WITH_RCCL) || defined(PADDLE_WITH_NCCL)) && \ - NCCL_VERSION_CODE >= 2703 - int rid = ctx.Attr("ring_id"); - bool dynamic_shape = ctx.Attr("dynamic_shape"); - PADDLE_ENFORCE_GE( - rid, - 0, - common::errors::InvalidArgument( - "The ring_id (%d) for recv_v2 op must be non-negative.", rid)); - - int peer = ctx.Attr("peer"); - PADDLE_ENFORCE_GE( - peer, - 0, - common::errors::InvalidArgument( - "The peer (%d) for recv_v2 op must be non-negative.", peer)); - - gpuStream_t stream = nullptr; - auto place = ctx.GetPlace(); - auto map = distributed::ProcessGroupMapFromGid::getInstance(); - if (map->has(rid)) { - // Use ProcessGroup - distributed::ProcessGroup *pg = map->get(rid); - std::vector out_tensor; - auto out_shape = ctx.Attr>("out_shape"); - auto out = ctx.Output("Out"); - // auto out_dims = out->dims(); - - if (dynamic_shape) { - VLOG(3) << "recv_v2 will use dynamic shape with send_v2 for switch"; - phi::DDim new_dim = recv_shape_info(ctx.GetPlace(), - /* gpuStream_t */ nullptr, - /* NCCLComm* */ nullptr, - /* NCCLCommContext* */ nullptr, - peer, - pg); - out->Resize(new_dim); - ctx.cuda_device_context().Alloc(out); - } else { - ctx.cuda_device_context().Alloc(out); - } - - out_tensor.emplace_back(*out); - auto task = pg->Recv(out_tensor, peer); - return; - } - - platform::NCCLComm *comm = nullptr; - phi::distributed::NCCLCommContext *comm_ctx = nullptr; - - const auto &comm_context_manager = - phi::distributed::CommContextManager::GetInstance(); - if (FLAGS_dynamic_static_unified_comm) { - PADDLE_ENFORCE_EQ(comm_context_manager.Has(std::to_string(rid)), - true, - common::errors::InvalidArgument( - "You choose to use new communication library by " - "setting environment " - "variable FLAGS_dynamic_static_unified_comm True. " - "But ring_id(%d) is " - "not found in comm_context_manager.", - std::to_string(rid))); - comm_ctx = static_cast( - comm_context_manager.Get(std::to_string(rid))); - PADDLE_ENFORCE_NE(comm_ctx, - nullptr, - common::errors::Unavailable( - "NCCLCommContext is nullptr, collective op should " - "has ring_id attr.")); - stream = comm_ctx->GetStream(); - VLOG(3) << "new comm_context_manager has rid " << rid; - } else { - comm = platform::NCCLCommContext::Instance().Get(rid, place); - PADDLE_ENFORCE_LT( - peer, - comm->nranks(), - common::errors::InvalidArgument("The value of peer (%d) you set must " - "be less than comm->nranks (%d).", - peer, - comm->nranks())); - stream = comm->stream(); - VLOG(3) << "old NCCLCommContext has rid " << rid; - } - - if (ctx.Attr("use_calc_stream")) { - // should ExecutionContext for calc stream. - stream = ctx.cuda_device_context().stream(); - } - int data_type = ctx.Attr("dtype"); - framework::proto::VarType::Type type = - framework::proto::VarType::Type(data_type); - ncclDataType_t dtype = platform::ToNCCLDataType(type); - - auto *out_var = ctx.OutputVar("Out"); - if (out_var->IsType()) { - PADDLE_ENFORCE_EQ( - dynamic_shape, - false, - common::errors::InvalidArgument("Dynamic shape for send/recv not " - "support DenseTensorArray for now.")); - auto out_array = out_var->GetMutable(); - for (size_t idx = 0; idx < out_array->size(); ++idx) { - VLOG(3) << "DenseTensorArray: idx(" << idx << ")"; - auto out = &out_array->at(idx); - auto out_dims = out->dims(); - ctx.cuda_device_context().Alloc(out); - auto numel = out->numel(); - if (comm_ctx) { - comm_ctx->Recv(out, numel, peer, stream); - } else { - PADDLE_ENFORCE_GPU_SUCCESS(phi::dynload::ncclRecv( - out->data(), numel, dtype, peer, comm->comm(), stream)); - VLOG(3) << "rank " << comm->rank() << " recv " - << common::product(out_dims) << " from " << peer; - } - } - return; - } - - auto out_shape = ctx.Attr>("out_shape"); - auto out = ctx.Output("Out"); - // auto out_dims = out->dims(); - auto numel = out->numel(); - - if (dynamic_shape) { - VLOG(3) << "recv_v2 will use dynamic shape with send_v2"; - phi::DDim new_dim = recv_shape_info(place, - stream, - comm, - comm_ctx, - peer, - /* ProcessGroup* */ nullptr); - out->Resize(new_dim); - numel = out->numel(); - ctx.cuda_device_context().Alloc(out); - } else { - ctx.cuda_device_context().Alloc(out); - } - if (comm_ctx) { - comm_ctx->Recv(out, numel, peer, stream); - } else { - comm = platform::NCCLCommContext::Instance().Get(rid, place); - PADDLE_ENFORCE_LT( - peer, - comm->nranks(), - common::errors::InvalidArgument("The value of peer (%d) you set must " - "be less than comm->nranks (%d).", - peer, - comm->nranks())); - PADDLE_ENFORCE_GPU_SUCCESS(phi::dynload::ncclRecv( - out->data(), numel, dtype, peer, comm->comm(), stream)); - VLOG(3) << "rank " << comm->rank() << " recv " - << common::product(out->dims()) << " from " << peer; - } -#else - PADDLE_THROW(common::errors::Unavailable( - "PaddlePaddle should be compiled with NCCL and " - "NCCL version >= 2.7.3 is needed.")); -#endif - } -}; - -} // namespace operators -} // namespace paddle - -namespace ops = paddle::operators; - -PD_REGISTER_STRUCT_KERNEL(recv_v2, - GPU, - ALL_LAYOUT, - ops::RecvOpV2CUDAKernel, - float, - double, -#if (NCCL_VERSION_CODE >= 21000 && CUDA_VERSION >= 11000) || \ - defined(PADDLE_WITH_HIP) - phi::dtype::bfloat16, -#endif - int, - int64_t, - int8_t, - phi::dtype::float16) { -}