Skip to content

Commit

Permalink
[AutoParallel] Support pipeline parallelism backward non-computation …
Browse files Browse the repository at this point in the history
…clip. (PaddlePaddle#58609)

* [AutoParallel] Support paddle.distributed.reshard construct GradNode,
which is needed for pipeline parallel.

* Fix problem of CI, and fix pp testcase as review comments advising.

* Fix including files problem.

* Polish paddle.distributed.reshard implementation according to review comments.

* Fix some problems.

* Polish code.

* Fix problem of failed testcase.

* Move reshard function to tensor_utils.h, as files in phi/core is
not allowed to include files in phi/api.

* Add forgetting file.

* Fix some compilation problem.

* Remove useless PADDLE_WITH_DISTRIBUTE conditional compilation.

* Remove useless PADDLE_WITH_DISTRIBUTE conditional compilation.

* Fix problem of WITH_PYTHON=OFF compilation option.

* Fix bug of conditional compilation.

* [AutoParallel] Support pipeline parallel backward. Both pp single
strategy and dp-mp-pp hybrid strategy are verified. As CI machine
only has 2 cards and dp-mp-pp strategy needs 9 GPU cards, such case
will be added in testcase later.

* Polish pipeline parallel backward implementation.

* Remove useless modification.

* Add MLP dp-mp-pp hybrid strategy testcase, it can't be run on
CI Machine now as it needs 8 gpus.

* Remove useless modification.

* Fix problem of Tensor double free and polish code.

* Fix problem of ReshardOutputPartialAxisToReplicated.

* Revert "Revert "[AutoParallel] Support pipeline parallelism backward non-computation clip. (PaddlePaddle#58449)" (PaddlePaddle#58601)"

This reverts commit 79e24ec.
  • Loading branch information
GhostScreaming authored Nov 2, 2023
1 parent 11c62e6 commit 3b44f88
Show file tree
Hide file tree
Showing 15 changed files with 325 additions and 101 deletions.
34 changes: 31 additions & 3 deletions paddle/fluid/eager/auto_code_generator/generator/eager_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,24 @@ class {} : public egr::GradNodeBase {{
}}
"""

FILL_ZERO_GRAD_TEMPLATE_BACKWARD = """
if (!IsRunAutoParallel()) {{
egr::EagerUtils::FillZeroForEmptyGradInput(&grads[{fwd_position}], input_metas[{fwd_position}]);
}}
"""

FILL_ZERO_PLAIN_GRAD_TEMPLATE_BACKWARD = """
if (!IsRunAutoParallel()) {{
egr::EagerUtils::FillZeroForEmptyGradInput(&grads[{fwd_position}][0], input_metas[{fwd_position}][0]);
}}
"""

FILL_ZERO_OPTIONAL_PLAIN_GRAD_TEMPLATE_BACKWARD = """
if (!IsRunAutoParallel()) {{
egr::EagerUtils::FillZeroForEmptyOptionalGradInput(&grads[{fwd_position}][0], input_metas[{fwd_position}][0]);
}}
"""

inplace_optional_out_type_map = {
"Tensor": "paddle::optional<paddle::Tensor>&",
"std::vector<Tensor>": "paddle::optional<std::vector<paddle::Tensor>>&",
Expand Down Expand Up @@ -2224,12 +2242,22 @@ def GenerateNodeDefinition(
) in backward_grad_inputs_map.items():
if name in self.optional_inputs:
if IsPlainTensorType(ttype):
fill_zero_str += f"{indent}egr::EagerUtils::FillZeroForEmptyOptionalGradInput(&grads[{fwd_position}][0], input_metas[{fwd_position}][0]);\n"
fill_zero_str += FILL_ZERO_OPTIONAL_PLAIN_GRAD_TEMPLATE_BACKWARD.format(
fwd_position=fwd_position
)
else:
if IsPlainTensorType(ttype):
fill_zero_str += f"{indent}egr::EagerUtils::FillZeroForEmptyGradInput(&grads[{fwd_position}][0], input_metas[{fwd_position}][0]);\n"
fill_zero_str += (
FILL_ZERO_PLAIN_GRAD_TEMPLATE_BACKWARD.format(
fwd_position=fwd_position
)
)
else:
fill_zero_str += f"{indent}egr::EagerUtils::FillZeroForEmptyGradInput(&grads[{fwd_position}], input_metas[{fwd_position}]);\n"
fill_zero_str += (
FILL_ZERO_GRAD_TEMPLATE_BACKWARD.format(
fwd_position=fwd_position
)
)

inplace_grad_input_str = ""
inplace_check_str = ""
Expand Down
26 changes: 20 additions & 6 deletions paddle/fluid/eager/grad_node_info.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,16 @@ void GradNodeBase::SetGradInMeta(const paddle::Tensor& fwd_out,
}

if (!fwd_out.initialized()) {
VLOG(7)
<< "Skip Configuring GradSlotMeta for uninitialized GradInput Tensor";
return;
if (fwd_out.defined() && fwd_out.is_dist_tensor() &&
phi::distributed::NeedComputationClipForPP(fwd_out.impl())) {
VLOG(3) << "Tensor " << fwd_out.name() << " is DistTensor,"
<< " and needs computation clip for pipeline parallel."
<< " Still SetGradInMeta for it.";
} else {
VLOG(7)
<< "Skip Configuring GradSlotMeta for uninitialized GradInput Tensor";
return;
}
}

const phi::DenseTensor* dense_tensor = nullptr;
Expand Down Expand Up @@ -183,9 +190,16 @@ void GradNodeBase::SetGradInMeta(const std::vector<paddle::Tensor>& fwd_out,
}

if (!fwd_out_tensor.initialized()) {
VLOG(7)
<< "Skip Configuring GradSlotMeta for uninitialized GradInput Tensor";
return;
if (fwd_out_tensor.defined() && fwd_out_tensor.is_dist_tensor() &&
!phi::distributed::NeedComputationClipForPP(fwd_out_tensor.impl())) {
VLOG(3) << "Tensor " << fwd_out_tensor.name() << " is DistTensor,"
<< " and needs computation clip for pipeline parallel."
<< " Still SetGradInMeta for it.";
} else {
VLOG(7) << "Skip Configuring GradSlotMeta for uninitialized GradInput "
"Tensor";
return;
}
}

// Record TensorMeta
Expand Down
1 change: 1 addition & 0 deletions paddle/fluid/eager/grad_node_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "paddle/fluid/eager/hooks.h"
#include "paddle/phi/api/all.h"
#include "paddle/phi/core/distributed/auto_parallel/dist_attr.h"
#include "paddle/phi/core/distributed/auto_parallel/reshard_utils.h"
#include "paddle/utils/test_macros.h"

namespace egr {
Expand Down
15 changes: 13 additions & 2 deletions paddle/fluid/eager/grad_tensor_holder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,19 @@ void GradTensorHolder::add(size_t slot_id,
const paddle::Tensor& t,
bool create_graph) {
if (!t.initialized()) {
VLOG(3) << "No need to do accumulate for uninitialized t.";
return;
if (t.defined() && t.is_dist_tensor() &&
phi::distributed::NeedComputationClipForPP(t.impl())) {
// Pipeline parallel still needs to construct GradNode graph
// to make DistTensor's global shape and DistAttr information flow.
// Skip grad accumulation will cause GradTensor disconnect to next
// GradNode.
VLOG(3) << "Do accumulate for uninitialized Tensor " << t.name()
<< " as it's DistTensor and it needs computation clip for "
"pipeline parallel.";
} else {
VLOG(3) << "No need to do accumulate for uninitialized t.";
return;
}
} // TODO(jiabin): Remove this when we fix all kernel.

PADDLE_ENFORCE(slot_id < buffer_.size(),
Expand Down
12 changes: 10 additions & 2 deletions paddle/fluid/eager/utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "paddle/fluid/eager/accumulation/accumulation_node.h"
#include "paddle/fluid/eager/api/utils/global_utils.h"
#include "paddle/fluid/eager/api/utils/hook_utils.h"
#include "paddle/fluid/eager/grad_node_info.h"
#include "paddle/fluid/eager/tensor_wrapper.h"

#include "paddle/phi/api/all.h"
Expand Down Expand Up @@ -654,22 +655,29 @@ std::string EagerUtils::TensorStr(const paddle::Tensor& t) {
std::string tensor_info_str = "";
if (t.defined()) {
if (t.is_dist_tensor()) {
const char* DIST_TENSOR_INFO_TEMPLATE =
"Type: %s, Dtype: %s, Place: %s, Is_defined: %s, Is_initialized: %s, "
"Shape: %s, DistAttr: %s";
auto dist_t =
std::static_pointer_cast<phi::distributed::DistTensor>(t.impl());
if (t.initialized()) {
tensor_info_str += paddle::string::Sprintf(
TENSOR_INFO_TEMPLATE,
DIST_TENSOR_INFO_TEMPLATE,
t.impl()->type_info().name(),
t.dtype(),
t.place().DebugString(),
dist_t->defined(),
dist_t->initialized(),
paddle::string::Sprintf(
"%s, Local Shape: %s", t.dims(), dist_t->local_dims()),
dist_t->dist_attr());
} else {
tensor_info_str += paddle::string::Sprintf(TENSOR_INFO_TEMPLATE,
tensor_info_str += paddle::string::Sprintf(DIST_TENSOR_INFO_TEMPLATE,
t.impl()->type_info().name(),
"Unknown",
"Unknown",
dist_t->defined(),
dist_t->initialized(),
t.dims(),
dist_t->dist_attr());
}
Expand Down
25 changes: 22 additions & 3 deletions paddle/phi/api/lib/api_gen_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -569,10 +569,29 @@ phi::distributed::DistTensor* SetKernelDistOutput(
}

std::shared_ptr<phi::distributed::DistTensor> CreateKernelDistOutput(
Tensor* out, const phi::distributed::TensorDistAttr& dist_attr) {
Tensor* out,
bool set_dist_output_as_tensor_impl,
const phi::distributed::ArgDistAttr& dist_attr) {
if (out) {
return std::make_shared<phi::distributed::DistTensor>(phi::DDim(),
dist_attr);
PADDLE_ENFORCE_EQ(
paddle::holds_alternative<phi::distributed::TensorDistAttr>(dist_attr),
true,
phi::errors::PreconditionNotMet("Arg must be a single TensorDistAttr"));
auto dist_output = std::make_shared<phi::distributed::DistTensor>(
phi::DDim(), paddle::get<0>(dist_attr));
if (set_dist_output_as_tensor_impl) {
VLOG(3) << "CreateKernelDistOutput function set generated output "
"dist_tensor as Tensor's impl";
if (out->is_dist_tensor()) {
VLOG(3)
<< "out is DistTensor, set its DistAttr to generated DistOutput.";
dist_output->unsafe_set_dist_attr(
std::static_pointer_cast<phi::distributed::DistTensor>(out->impl())
->dist_attr());
}
out->set_impl(dist_output);
}
return dist_output;
}
return nullptr;
}
Expand Down
3 changes: 2 additions & 1 deletion paddle/phi/api/lib/api_gen_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ phi::distributed::DistTensor* SetKernelDistOutput(

std::shared_ptr<phi::distributed::DistTensor> CreateKernelDistOutput(
Tensor* out,
const phi::distributed::TensorDistAttr& dist_attr =
bool set_dist_output_as_tensor_impl,
const phi::distributed::ArgDistAttr& dist_attr =
phi::distributed::TensorDistAttr());

std::shared_ptr<phi::distributed::DistTensor> CreateKernelDistOutput(
Expand Down
6 changes: 6 additions & 0 deletions paddle/phi/api/lib/data_transform.cc
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,12 @@ void ReshardOutputPartialAxisToReplicated(
if (out_tensor->dist_attr().is_partial()) {
auto dist_attr = out_tensor->dist_attr();
dist_attr.clean_partial_status();
if (!IsCurRankInMesh(out_tensor->dist_attr().process_mesh())) {
VLOG(6) << "DistTensor is not in mesh, just clear its partial status and "
"skip reshard it to replicated.";
out_tensor->unsafe_set_dist_attr(dist_attr);
return;
}
VLOG(6) << "FwdAPI Output P2R - "
<< ReshardDebugInfo(*out_tensor, dist_attr);
auto* func =
Expand Down
2 changes: 1 addition & 1 deletion paddle/phi/api/lib/tensor_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ PADDLE_API std::shared_ptr<phi::distributed::DistTensor> reshard(
"However it's %s",
typeid(input.impl().get()).name()));
auto dev_ctx = phi::distributed::GetDistTensorDeviceContext(
std::static_pointer_cast<phi::distributed::DistTensor>(input.impl()));
static_cast<phi::distributed::DistTensor*>(input.impl().get()));
auto input_tensor_impl = input.impl();
std::shared_ptr<phi::distributed::DistTensor> dist_out_ptr = nullptr;
if (input_tensor_impl) {
Expand Down
17 changes: 11 additions & 6 deletions paddle/phi/api/yaml/generator/dist_api_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@
// 1. InferSpmd (Infer DistAttr of Inputs&Outputs){}
// 2. Create API Output & Prepare Dist and Dense Output{}
// 3. Infer DistTensor's Global Shape{}\n
if (rank_is_in_current_mesh){{
if (rank_is_in_current_mesh) {{
// 4. Select Kernel{}
// 5. Reshard Input{}\n
// 6. PrepareData (DataTransform & Prepare Dense Input){}
// 7. Infer Local DenseTensor Meta{}
// 8. DenseTensor Kernel Call{}
// 9. Reshard Partial Output to Replicated (Temporary){}\n
}}\n
// 9. Reshard Partial Output to Replicated (Temporary){}\n
// 10. Set Output Dist Attr For Default Impl{}\n
// 11. Return
{}
Expand Down Expand Up @@ -233,7 +233,7 @@
"{}", {{kernel_backend, kernel_layout, kernel_data_type}});
const auto& kernel = kernel_result.kernel;
VLOG(6) << "{} kernel: " << kernel;
auto* dev_ctx = GetDeviceContextByBackend(kernel_result.has_fallback_cpu ? Backend::CPU : kernel_backend);
dev_ctx = GetDeviceContextByBackend(kernel_result.has_fallback_cpu ? Backend::CPU : kernel_backend);
"""

# 5. Reshard Input
Expand Down Expand Up @@ -360,9 +360,11 @@

# 9. Reshard Partial Output to Replicated
RESHARD_P2R_SINGLE_OUTPUT_TEMPLATE = """
ReshardOutputPartialAxisToReplicated(dev_ctx, dist_out);"""
dev_ctx = phi::distributed::GetDistTensorDeviceContext(dist_out);
ReshardOutputPartialAxisToReplicated(dev_ctx, dist_out);"""
RESHARD_P2R_MULTI_SINGLE_OUTPUT_TEMPLATE = """
ReshardOutputPartialAxisToReplicated(dev_ctx, dist_out_{});"""
dev_ctx = phi::distributed::GetDistTensorDeviceContext(dist_out_{idx});
ReshardOutputPartialAxisToReplicated(dev_ctx, dist_out_{idx});"""
UNSUPPORTED_RESHARD_OUTPUT_COMMENT_TEMPLATE = """
// API `{}` does not need to support ReshardOutput now."""

Expand Down Expand Up @@ -813,6 +815,7 @@ def generate_output_creation_code(self) -> str:
output_num = len(self.outputs['types'])
return_type = self.get_return_type_with_intermediate(self.inplace_flag)
output_creation_code = ""
output_creation_code += "\n phi::DeviceContext* dev_ctx = nullptr;"
if output_num == 1:
# api output generate
if self.need_to_generate_code_for_inplace_impl(0):
Expand Down Expand Up @@ -1399,7 +1402,9 @@ def generate_reshard_partial_out_to_replicated_code(self) -> str:
for i, out_type in enumerate(self.outputs['types']):
if out_type == 'Tensor':
reshard_p2r_code += (
RESHARD_P2R_MULTI_SINGLE_OUTPUT_TEMPLATE.format(i)
RESHARD_P2R_MULTI_SINGLE_OUTPUT_TEMPLATE.format(
idx=i
)
)
else:
self.vector_output_size_assertion_check()
Expand Down
59 changes: 44 additions & 15 deletions paddle/phi/api/yaml/generator/dist_bw_api_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
// 7. PrepareData (DataTransform & Prepare Dense Input){}
// 8. Infer Local DenseTensor Meta{}
// 9. DenseTensor Kernel Call{}
// 10. Reshard Partial Output to Replicated (Temporary){}\n
}}
// 10. Reshard Partial Output to Replicated (Temporary){}\n
// 11. Return
{}
}}
Expand All @@ -49,54 +49,82 @@
"""
SINGLE_OUT_CREATION_TEMPLATE_WITH_SPMD = """
std::shared_ptr<phi::distributed::DistTensor> shared_dist_out =
CreateKernelDistOutput({}, spmd_info.second[0]);
CreateKernelDistOutput({}, !rank_is_in_current_mesh, spmd_info.second[0]);
phi::distributed::DistTensor* dist_out = shared_dist_out.get();
phi::DenseTensor* dense_out = dist_out->unsafe_mutable_value();
if (dense_out && !rank_is_in_current_mesh && !dist_out->defined()) {{
*dense_out = phi::DenseTensor(
std::make_shared<phi::Allocation>(nullptr, 0, phi::distributed::GetDefaultPlace()),
phi::DenseTensorMeta());
}}
"""
SINGLE_OUT_CREATION_TEMPLATE = """
std::shared_ptr<phi::distributed::DistTensor> shared_dist_out =
CreateKernelDistOutput({});
CreateKernelDistOutput({}, !rank_is_in_current_mesh);
phi::distributed::DistTensor* dist_out = shared_dist_out.get();
phi::DenseTensor* dense_out = dist_out->unsafe_mutable_value();
"""
VECTOR_OUT_CREATION_TEMPLATE = """
auto dist_out = SetKernelDistOutput({name});
std::vector<phi::DenseTensor*> dense_out(dist_out.size());
for (size_t i=0; i<dist_out.size(); i++) {{
dense_out[i] = const_cast<phi::DenseTensor*>(&dist_out[i]->value());
if (dense_out && !rank_is_in_current_mesh && !dist_out->defined()) {{
*dense_out = phi::DenseTensor(
std::make_shared<phi::Allocation>(nullptr, 0, phi::distributed::GetDefaultPlace()),
phi::DenseTensorMeta());
}}
"""
VECTOR_OUT_CREATION_TEMPLATE = """
auto dist_out = SetKernelDistOutput({name});
std::vector<phi::DenseTensor*> dense_out(dist_out.size());
for (size_t i = 0; i < dist_out.size(); i++) {{
dense_out[i] = const_cast<phi::DenseTensor*>(&dist_out[i]->value());
for (size_t i=0; i<dist_out.size(); i++) {{
dense_out[i] = dist_out[i]->unsafe_mutable_value();
if (dense_out[i] && !rank_is_in_current_mesh && !dist_out[i]->defined()) {{
*dense_out[i] = phi::DenseTensor(
std::make_shared<phi::Allocation>(nullptr, 0, phi::distributed::GetDefaultPlace()),
phi::DenseTensorMeta());
}}
}}
"""
INPLACE_OUT_CREATION_TEMPLATE = """
*{} = {};
"""
MULTI_SINGLE_OUT_CREATION_TEMPLATE_NO_SPMD = """
auto dist_out_{idx} = SetKernelDistOutput({name});
auto dense_out_{idx} = dist_out_{idx}->unsafe_mutable_value();
auto dense_out_{idx} = dist_out_{idx} ? dist_out_{idx}->unsafe_mutable_value() : nullptr;
if (dense_out_{idx} && !rank_is_in_current_mesh && dist_out_{idx}->defined()) {{
*dense_out_{idx} = phi::DenseTensor(
std::make_shared<phi::Allocation>(nullptr, 0, phi::distributed::GetDefaultPlace()),
phi::DenseTensorMeta());
}}
"""
MULTI_SINGLE_OUT_CREATION_TEMPLATE_WITH_SPMD = """
std::shared_ptr<phi::distributed::DistTensor> shared_dist_out_{idx} =
CreateKernelDistOutput({name}, spmd_info.second[{idx}]);
CreateKernelDistOutput({name}, !rank_is_in_current_mesh, spmd_info.second[{idx}]);
phi::distributed::DistTensor* dist_out_{idx} = shared_dist_out_{idx}.get();
phi::DenseTensor* dense_out_{idx} = dist_out_{idx} ? dist_out_{idx}->unsafe_mutable_value() : nullptr;
if (dense_out_{idx} && !rank_is_in_current_mesh && dist_out_{idx}->defined()) {{
*dense_out_{idx} = phi::DenseTensor(
std::make_shared<phi::Allocation>(nullptr, 0, phi::distributed::GetDefaultPlace()),
phi::DenseTensorMeta());
}}
"""
MULTI_SINGLE_OUT_CREATION_TEMPLATE = """
std::shared_ptr<phi::distributed::DistTensor> shared_dist_out_{idx} =
CreateKernelDistOutput({name});
CreateKernelDistOutput({name}, !rank_is_in_current_mesh);
phi::distributed::DistTensor* dist_out_{idx} = shared_dist_out_{idx}.get();
phi::DenseTensor* dense_out_{idx} = dist_out_{idx} ? dist_out_{idx}->unsafe_mutable_value() : nullptr;
if (dense_out_{idx} && !rank_is_in_current_mesh && !dist_out_{idx}->defined()) {{
*dense_out_{idx} = phi::DenseTensor(
std::make_shared<phi::Allocation>(nullptr, 0, phi::distributed::GetDefaultPlace()),
phi::DenseTensorMeta());
}}
"""
MULTI_VECTOR_OUT_CREATION_TEMPLATE = """
auto dist_out_{i} = SetKernelDistOutput({name});
std::vector<phi::DenseTensor*> dense_out_{i}(dist_out_{i}.size());
for (size_t i = 0; i < dist_out_{i}.size(); i++) {{
dense_out_{i}[i] = const_cast<phi::DenseTensor*>(&dist_out_{i}[i]->value());
dense_out_{i}[i] = const_cast<phi::DenseTensor*>(&dist_out_{i}[i]->value());
if (dense_out_{i}[i] && !rank_is_in_current_mesh && !dist_out_{i}[i]->defined()) {{
*dense_out_{i}[i]= phi::DenseTensor(
std::make_shared<phi::Allocation>(nullptr, 0, phi::distributed::GetDefaultPlace()),
phi::DenseTensorMeta());
}}
}}
"""

Expand All @@ -117,6 +145,7 @@ def generate_output_creation_code(self) -> str:
# backward api only need to generate kernel outputs
output_num = len(self.outputs['types'])
output_creation_code = ""
output_creation_code += "\n phi::DeviceContext* dev_ctx = nullptr;"
if output_num == 1:
self.dist_output_args.append('dist_out')
self.dense_output_args.append('dense_out')
Expand Down
Loading

0 comments on commit 3b44f88

Please sign in to comment.