From 014537bfe431d3acc3ec5626fa048df38fe038f3 Mon Sep 17 00:00:00 2001 From: danleifeng Date: Wed, 18 Dec 2024 14:46:19 +0800 Subject: [PATCH 1/2] add env for gpups; fix cache table; test=develop --- .../distributed/ps/service/brpc_ps_client.cc | 4 ++- .../distributed/ps/service/brpc_ps_client.h | 4 ++- .../fluid/distributed/ps/service/ps_client.h | 4 ++- .../distributed/ps/service/ps_local_client.cc | 9 ++++- .../distributed/ps/service/ps_local_client.h | 4 ++- .../distributed/ps/table/ctr_dymf_accessor.cc | 6 ++-- .../distributed/ps/table/ctr_dymf_accessor.h | 4 +-- .../distributed/ps/table/ssd_sparse_table.cc | 12 +++---- paddle/fluid/distributed/ps/wrapper/fleet.cc | 6 ++-- paddle/fluid/distributed/ps/wrapper/fleet.h | 4 ++- paddle/fluid/framework/fleet/ps_gpu_wrapper.h | 35 ++++++++++++++++++- paddle/fluid/pybind/fleet_py.cc | 1 + python/paddle/distributed/fleet/__init__.py | 1 + python/paddle/distributed/fleet/fleet.py | 21 +++++++++++ python/paddle/distributed/ps/the_one_ps.py | 6 ++++ 15 files changed, 101 insertions(+), 20 deletions(-) diff --git a/paddle/fluid/distributed/ps/service/brpc_ps_client.cc b/paddle/fluid/distributed/ps/service/brpc_ps_client.cc index a724e55be391b6..cb38f07dc68ea7 100644 --- a/paddle/fluid/distributed/ps/service/brpc_ps_client.cc +++ b/paddle/fluid/distributed/ps/service/brpc_ps_client.cc @@ -439,7 +439,9 @@ int FlClientBrpcClosure::check_response(size_t request_idx, int cmd_id) { return 0; } -std::future BrpcPsClient::PrintTableStat(uint32_t table_id) { +std::future BrpcPsClient::PrintTableStat(uint32_t table_id, + uint16_t pass_id, + size_t threshold) { size_t request_call_num = _server_channels.size(); DownpourBrpcClosure *closure = new DownpourBrpcClosure( request_call_num, [request_call_num, table_id](void *done) { diff --git a/paddle/fluid/distributed/ps/service/brpc_ps_client.h b/paddle/fluid/distributed/ps/service/brpc_ps_client.h index b7f4f894cb656d..3ce8ffbadfe604 100644 --- a/paddle/fluid/distributed/ps/service/brpc_ps_client.h +++ b/paddle/fluid/distributed/ps/service/brpc_ps_client.h @@ -273,7 +273,9 @@ class BrpcPsClient : public PSClient { size_t num, bool is_training); - virtual std::future PrintTableStat(uint32_t table_id); + virtual std::future PrintTableStat(uint32_t table_id, + uint16_t pass_id, + size_t threshold); virtual std::future Barrier(size_t table_id, uint32_t barrier_type); diff --git a/paddle/fluid/distributed/ps/service/ps_client.h b/paddle/fluid/distributed/ps/service/ps_client.h index 54f8ee89965245..634e3a42ea468c 100644 --- a/paddle/fluid/distributed/ps/service/ps_client.h +++ b/paddle/fluid/distributed/ps/service/ps_client.h @@ -181,7 +181,9 @@ class PSClient { return fut; } - virtual std::future PrintTableStat(uint32_t table_id) = 0; + virtual std::future PrintTableStat(uint32_t table_id, + uint16_t pass_id, + size_t threshold) = 0; virtual std::future SaveCacheTable(uint32_t table_id UNUSED, uint16_t pass_id UNUSED, size_t threshold UNUSED) { diff --git a/paddle/fluid/distributed/ps/service/ps_local_client.cc b/paddle/fluid/distributed/ps/service/ps_local_client.cc index ade250260146f1..461a262c2130ff 100644 --- a/paddle/fluid/distributed/ps/service/ps_local_client.cc +++ b/paddle/fluid/distributed/ps/service/ps_local_client.cc @@ -256,11 +256,18 @@ ::std::future PsLocalClient::PullSparsePtr( return done(); } -::std::future PsLocalClient::PrintTableStat(uint32_t table_id) { +::std::future PsLocalClient::PrintTableStat(uint32_t table_id, + uint16_t pass_id, + size_t threshold) { auto* table_ptr = GetTable(table_id); std::pair ret = table_ptr->PrintTableStat(); VLOG(0) << "table id: " << table_id << ", feasign size: " << ret.first << ", mf size: " << ret.second; + // > 50亿,40%内存 + if (static_cast(ret.first) > threshold) { + VLOG(0) << "run cache table"; + table_ptr->CacheTable(pass_id); + } return done(); } diff --git a/paddle/fluid/distributed/ps/service/ps_local_client.h b/paddle/fluid/distributed/ps/service/ps_local_client.h index 36694e22fed271..c341e96188d299 100644 --- a/paddle/fluid/distributed/ps/service/ps_local_client.h +++ b/paddle/fluid/distributed/ps/service/ps_local_client.h @@ -87,7 +87,9 @@ class PsLocalClient : public PSClient { const std::vector>& keys2rank_vec, const uint16_t& dim_id = 0); - virtual ::std::future PrintTableStat(uint32_t table_id); + virtual ::std::future PrintTableStat(uint32_t table_id, + uint16_t pass_id, + size_t threshold); virtual ::std::future SaveCacheTable(uint32_t table_id, uint16_t pass_id, diff --git a/paddle/fluid/distributed/ps/table/ctr_dymf_accessor.cc b/paddle/fluid/distributed/ps/table/ctr_dymf_accessor.cc index a0f495cf8cb68b..0c5ea90895f4c5 100644 --- a/paddle/fluid/distributed/ps/table/ctr_dymf_accessor.cc +++ b/paddle/fluid/distributed/ps/table/ctr_dymf_accessor.cc @@ -232,7 +232,7 @@ void CtrDymfAccessor::UpdateStatAfterSave(float* value, int param) { int32_t CtrDymfAccessor::Create(float** values, size_t num) { for (size_t value_item = 0; value_item < num; ++value_item) { float* value = values[value_item]; -#ifdef PADDLE_WITH_PSLIB +#if defined(PADDLE_WITH_PSLIB) || defined(PADDLE_WITH_HETERPS) common_feature_value.UnseenDays(value) = 0; common_feature_value.PassId(value) = 0; #else @@ -385,7 +385,7 @@ std::string CtrDymfAccessor::ParseToString(const float* v, int param) { int CtrDymfAccessor::ParseFromString(const std::string& str, float* value) { auto ret = paddle::string::str_to_float(str.data(), value); -#ifdef PADDLE_WITH_PSLIB +#if defined(PADDLE_WITH_PSLIB) || defined(PADDLE_WITH_HETERPS) float unseen_day = value[common_feature_value.UnseenDaysIndex()]; common_feature_value.UnseenDays(value) = (uint16_t)(unseen_day); common_feature_value.PassId(value) = 0; @@ -437,7 +437,7 @@ void CtrDymfAccessor::UpdateTimeDecay(float* value, bool is_update_seen_day) { } } -#ifdef PADDLE_WITH_PSLIB +#if defined(PADDLE_WITH_PSLIB) || defined(PADDLE_WITH_HETERPS) bool CtrDymfAccessor::SaveMemCache(float* value, int param, double global_cache_threshold, diff --git a/paddle/fluid/distributed/ps/table/ctr_dymf_accessor.h b/paddle/fluid/distributed/ps/table/ctr_dymf_accessor.h index ff97f6e487cd58..70060e42a6dcba 100644 --- a/paddle/fluid/distributed/ps/table/ctr_dymf_accessor.h +++ b/paddle/fluid/distributed/ps/table/ctr_dymf_accessor.h @@ -88,7 +88,7 @@ class CtrDymfAccessor : public ValueAccessor { // 根据mf_dim计算的总byte数 int Size(int mf_dim) { return (Dim(mf_dim)) * sizeof(float); } -#ifdef PADDLE_WITH_PSLIB +#if defined(PADDLE_WITH_PSLIB) || defined(PADDLE_WITH_HETERPS) uint16_t& PassId(float* val) { uint16_t* int16_val = reinterpret_cast(val + UnseenDaysIndex()); @@ -258,7 +258,7 @@ class CtrDymfAccessor : public ValueAccessor { void SetDayId(int day_id) override; -#ifdef PADDLE_WITH_PSLIB +#if defined(PADDLE_WITH_PSLIB) || defined(PADDLE_WITH_HETERPS) // 根据pass_id和show_threshold阈值来判断cache到ssd bool SaveMemCache(float* value, int param, diff --git a/paddle/fluid/distributed/ps/table/ssd_sparse_table.cc b/paddle/fluid/distributed/ps/table/ssd_sparse_table.cc index ab64acf14798d7..4ad06f838aa727 100644 --- a/paddle/fluid/distributed/ps/table/ssd_sparse_table.cc +++ b/paddle/fluid/distributed/ps/table/ssd_sparse_table.cc @@ -265,7 +265,7 @@ int32_t SSDSparseTable::PullSparsePtr(int shard_id, } _value_accessor->UpdateTimeDecay(ret->data(), true); -#ifdef PADDLE_WITH_PSLIB +#if defined(PADDLE_WITH_PSLIB) || defined(PADDLE_WITH_HETERPS) _value_accessor->UpdatePassId(ret->data(), pass_id); #endif int pull_data_idx = cur_ctx->batch_index[idx]; @@ -280,7 +280,7 @@ int32_t SSDSparseTable::PullSparsePtr(int shard_id, ret = itr.value_ptr(); // int pull_data_idx = keys[i].second; _value_accessor->UpdateTimeDecay(ret->data(), true); -#ifdef PADDLE_WITH_PSLIB +#if defined(PADDLE_WITH_PSLIB) || defined(PADDLE_WITH_HETERPS) _value_accessor->UpdatePassId(ret->data(), pass_id); #endif pull_values[i] = reinterpret_cast(ret); @@ -332,7 +332,7 @@ int32_t SSDSparseTable::PullSparsePtr(int shard_id, ret = &feature_value; } _value_accessor->UpdateTimeDecay(ret->data(), true); -#ifdef PADDLE_WITH_PSLIB +#if defined(PADDLE_WITH_PSLIB) || defined(PADDLE_WITH_HETERPS) _value_accessor->UpdatePassId(ret->data(), pass_id); #endif int pull_data_idx = cur_ctx->batch_index[idx]; @@ -2945,7 +2945,7 @@ int32_t SSDSparseTable::LoadWithBinary(const std::string& path, int param) { abort(); } last_k = k; -#ifdef PADDLE_WITH_PSLIB +#if defined(PADDLE_WITH_PSLIB) || defined(PADDLE_WITH_HETERPS) _value_accessor->UpdatePassId(convert_value, 0); #endif rocksdb::Status status = sst_writer.Put( @@ -2963,7 +2963,7 @@ int32_t SSDSparseTable::LoadWithBinary(const std::string& path, int param) { } } else { auto& feature_value = shard[k]; -#ifdef PADDLE_WITH_PSLIB +#if defined(PADDLE_WITH_PSLIB) || defined(PADDLE_WITH_HETERPS) _value_accessor->UpdatePassId(convert_value, 0); #endif feature_value.resize(dim); @@ -3051,7 +3051,7 @@ std::pair SSDSparseTable::PrintTableStat() { int32_t SSDSparseTable::CacheTable(uint16_t pass_id) { std::lock_guard guard(_table_mutex); - VLOG(0) << "cache_table"; + VLOG(0) << "cache_table, pass_id:" << pass_id; std::atomic count{0}; std::vector> tasks; diff --git a/paddle/fluid/distributed/ps/wrapper/fleet.cc b/paddle/fluid/distributed/ps/wrapper/fleet.cc index 089b538e75ed6a..93787e06504510 100644 --- a/paddle/fluid/distributed/ps/wrapper/fleet.cc +++ b/paddle/fluid/distributed/ps/wrapper/fleet.cc @@ -815,8 +815,10 @@ void FleetWrapper::RecvAndSaveTable(const uint64_t table_id, } } -void FleetWrapper::PrintTableStat(const uint64_t table_id) { - auto ret = worker_ptr_->PrintTableStat(table_id); +void FleetWrapper::PrintTableStat(const uint64_t table_id, + uint32_t pass_id, + size_t threshold) { + auto ret = worker_ptr_->PrintTableStat(table_id, pass_id, threshold); ret.wait(); int32_t err_code = ret.get(); if (err_code == -1) { diff --git a/paddle/fluid/distributed/ps/wrapper/fleet.h b/paddle/fluid/distributed/ps/wrapper/fleet.h index 116b8cdf4c177c..a116d641999532 100644 --- a/paddle/fluid/distributed/ps/wrapper/fleet.h +++ b/paddle/fluid/distributed/ps/wrapper/fleet.h @@ -241,7 +241,9 @@ class FleetWrapper { // barrier with barrier table void BarrierWithTable(uint32_t barrier_type); - void PrintTableStat(const uint64_t table_id); + void PrintTableStat(const uint64_t table_id, + uint32_t pass_id, + size_t threshold); void SaveCacheTable(const uint64_t table_id, uint16_t pass_id, size_t threshold); diff --git a/paddle/fluid/framework/fleet/ps_gpu_wrapper.h b/paddle/fluid/framework/fleet/ps_gpu_wrapper.h index b6db486fbfa781..d663b1f5d9a600 100644 --- a/paddle/fluid/framework/fleet/ps_gpu_wrapper.h +++ b/paddle/fluid/framework/fleet/ps_gpu_wrapper.h @@ -14,8 +14,8 @@ limitations under the License. */ #pragma once #ifdef PADDLE_WITH_HETERPS - #include +#include #include #include #include @@ -130,6 +130,22 @@ class AfsWrapper { VLOG(1) << "AfsWrapper Init" << handle_ << " ret: " << ret << " fs_name :" << fs_name << " fs_user :" << fs_user << " pass_wd :" << pass_wd << " conf :" << conf; +#if defined(PADDLE_WITH_PSCORE) && defined(PADDLE_WITH_HETERPS) && \ + defined(PADDLE_WITH_NCCL) + const char* launch_mode = std::getenv("NCCL_LAUNCH_MODE"); + if (launch_mode != nullptr) { + if (std::string(launch_mode) == "PARALLEL") { + VLOG(0) << "heterps-mode can only set NCCL_LAUNCH_MODE=GROUP for no " + "hang, will change from PARALLEL to GROUP"; + int res = setenv("NCCL_LAUNCH_MODE", "GROUP", 1); + PADDLE_ENFORCE_EQ(res, 0); + } + } else { + VLOG(0) << "heterps-mode can only set NCCL_LAUNCH_MODE=GROUP for no hang"; + int res = setenv("NCCL_LAUNCH_MODE", "GROUP", 1); + PADDLE_ENFORCE_EQ(res, 0); + } +#endif return ret; } @@ -390,6 +406,23 @@ class PSGPUWrapper { if (s_instance_ != NULL && is_initialized_ == false) { VLOG(3) << "PSGPUWrapper Begin InitializeGPU"; is_initialized_ = true; +#if defined(PADDLE_WITH_PSCORE) && defined(PADDLE_WITH_HETERPS) && \ + defined(PADDLE_WITH_NCCL) + const char* launch_mode = std::getenv("NCCL_LAUNCH_MODE"); + if (launch_mode != nullptr) { + if (std::string(launch_mode) == "PARALLEL") { + VLOG(0) << "heterps-mode can only set NCCL_LAUNCH_MODE GROUP for no " + "hang, will change from PARALLEL to GROUP"; + int res = setenv("NCCL_LAUNCH_MODE", "GROUP", 1); + PADDLE_ENFORCE_EQ(res, 0); + } + } else { + VLOG(0) + << "heterps-mode can only set NCCL_LAUNCH_MODE GROUP for no hang"; + int res = setenv("NCCL_LAUNCH_MODE", "GROUP", 1); + PADDLE_ENFORCE_EQ(res, 0); + } +#endif resource_ = std::make_shared(dev_ids); resource_->enable_p2p(); keys_tensor.resize(resource_->total_device()); diff --git a/paddle/fluid/pybind/fleet_py.cc b/paddle/fluid/pybind/fleet_py.cc index b4fd77840d7fde..dca698bd5fd125 100644 --- a/paddle/fluid/pybind/fleet_py.cc +++ b/paddle/fluid/pybind/fleet_py.cc @@ -81,6 +81,7 @@ void BindDistFleetWrapper(py::module* m) { .def("pull_fl_strategy", &FleetWrapper::PullFlStrategy) .def("revert", &FleetWrapper::Revert) .def("set_date", &FleetWrapper::SetDate) + .def("print_table_stat", &FleetWrapper::PrintTableStat) .def("check_save_pre_patch_done", &FleetWrapper::CheckSavePrePatchDone); } diff --git a/python/paddle/distributed/fleet/__init__.py b/python/paddle/distributed/fleet/__init__.py index 7c83bd89189da6..51e00204c6ea52 100755 --- a/python/paddle/distributed/fleet/__init__.py +++ b/python/paddle/distributed/fleet/__init__.py @@ -99,6 +99,7 @@ load_inference_model = fleet.load_inference_model load_one_table = fleet.load_one_table set_date = fleet.set_date +print_table_stat = fleet.print_table_stat minimize = fleet.minimize distributed_model = distributed_model shrink = fleet.shrink diff --git a/python/paddle/distributed/fleet/fleet.py b/python/paddle/distributed/fleet/fleet.py index 5991d4da9ddfb3..1c3f2a86d72aad 100755 --- a/python/paddle/distributed/fleet/fleet.py +++ b/python/paddle/distributed/fleet/fleet.py @@ -1419,6 +1419,27 @@ def set_date(self, table_id: int, day_id: str) -> None: """ self._runtime_handle._set_date(table_id, str(day_id)) + @is_non_distributed_check + @inited_runtime_handler + def print_table_stat(self, table_id: int, pass_id: int, threshold: float): + """ + Print stat info of table_id for gpups table, format: tableid, feasign size, mf size. + + Args: + + table_id (int): The id of table. + pass_id (int): The id of pass. + threshold (float): The threshold of print. + + Examples: + + .. code-block:: text + + fleet.print_table_stat(0,6,600000) + + """ + self._runtime_handle._print_table_stat(table_id, pass_id, threshold) + @is_non_distributed_check @inited_runtime_handler def shrink(self, threshold: int | None = None) -> None: diff --git a/python/paddle/distributed/ps/the_one_ps.py b/python/paddle/distributed/ps/the_one_ps.py index c0d845da32d4e6..c34aca1cc49215 100755 --- a/python/paddle/distributed/ps/the_one_ps.py +++ b/python/paddle/distributed/ps/the_one_ps.py @@ -1760,6 +1760,12 @@ def _set_date(self, table_id, day_id): self._worker.set_date(table_id, day_id) fleet.util.barrier() + def _print_table_stat(self, table_id, pass_id, threshold): + fleet.util.barrier() + if self.role_maker._is_first_worker(): + self._worker.print_table_stat(table_id, pass_id, threshold) + fleet.util.barrier() + def _shrink(self, threshold=None): if threshold is not None: warnings.warn( From e7c1dd8d5f1ed3ec10092eb97f2ec35f5387b1f0 Mon Sep 17 00:00:00 2001 From: danleifeng Date: Wed, 18 Dec 2024 19:23:02 +0800 Subject: [PATCH 2/2] add env for gpups; fix cache table; test=develop --- paddle/fluid/framework/fleet/ps_gpu_wrapper.h | 31 +++++-------------- 1 file changed, 7 insertions(+), 24 deletions(-) diff --git a/paddle/fluid/framework/fleet/ps_gpu_wrapper.h b/paddle/fluid/framework/fleet/ps_gpu_wrapper.h index d663b1f5d9a600..12989aae40ab3e 100644 --- a/paddle/fluid/framework/fleet/ps_gpu_wrapper.h +++ b/paddle/fluid/framework/fleet/ps_gpu_wrapper.h @@ -130,22 +130,6 @@ class AfsWrapper { VLOG(1) << "AfsWrapper Init" << handle_ << " ret: " << ret << " fs_name :" << fs_name << " fs_user :" << fs_user << " pass_wd :" << pass_wd << " conf :" << conf; -#if defined(PADDLE_WITH_PSCORE) && defined(PADDLE_WITH_HETERPS) && \ - defined(PADDLE_WITH_NCCL) - const char* launch_mode = std::getenv("NCCL_LAUNCH_MODE"); - if (launch_mode != nullptr) { - if (std::string(launch_mode) == "PARALLEL") { - VLOG(0) << "heterps-mode can only set NCCL_LAUNCH_MODE=GROUP for no " - "hang, will change from PARALLEL to GROUP"; - int res = setenv("NCCL_LAUNCH_MODE", "GROUP", 1); - PADDLE_ENFORCE_EQ(res, 0); - } - } else { - VLOG(0) << "heterps-mode can only set NCCL_LAUNCH_MODE=GROUP for no hang"; - int res = setenv("NCCL_LAUNCH_MODE", "GROUP", 1); - PADDLE_ENFORCE_EQ(res, 0); - } -#endif return ret; } @@ -411,16 +395,15 @@ class PSGPUWrapper { const char* launch_mode = std::getenv("NCCL_LAUNCH_MODE"); if (launch_mode != nullptr) { if (std::string(launch_mode) == "PARALLEL") { - VLOG(0) << "heterps-mode can only set NCCL_LAUNCH_MODE GROUP for no " - "hang, will change from PARALLEL to GROUP"; - int res = setenv("NCCL_LAUNCH_MODE", "GROUP", 1); - PADDLE_ENFORCE_EQ(res, 0); + PADDLE_THROW(common::errors::Unavailable( + "on heterps-mode you must export NCCL_LAUNCH_MODE=GROUP for no " + "hang, but received [%s]", + launch_mode)); } } else { - VLOG(0) - << "heterps-mode can only set NCCL_LAUNCH_MODE GROUP for no hang"; - int res = setenv("NCCL_LAUNCH_MODE", "GROUP", 1); - PADDLE_ENFORCE_EQ(res, 0); + PADDLE_THROW( + common::errors::Unavailable("on heterps-mode you must export " + "NCCL_LAUNCH_MODE=GROUP for no hang")); } #endif resource_ = std::make_shared(dev_ids);