forked from oneapi-src/oneCCL
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ccl_cpp_kvs.cpp
138 lines (105 loc) · 4.53 KB
/
ccl_cpp_kvs.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
/*
Copyright 2016-2020 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "oneapi/ccl/types.hpp"
#include "oneapi/ccl/aliases.hpp"
#include "oneapi/ccl/type_traits.hpp"
#include "oneapi/ccl/types_policy.hpp"
#include "oneapi/ccl/kvs_attr_ids.hpp"
#include "oneapi/ccl/kvs_attr_ids_traits.hpp"
#include "oneapi/ccl/kvs_attr.hpp"
#include "common/global/global.hpp"
#include "kvs_impl.hpp"
#ifdef CCL_ENABLE_STUB_BACKEND
#include "stub_kvs_impl.hpp"
#endif // CCL_ENABLE_STUB_BACKEND
namespace ccl {
base_kvs_impl::base_kvs_impl(const kvs::address_type& addr) : addr(addr) {}
native_kvs_impl::native_kvs_impl(const kvs_attr& attr) : base_kvs_impl() {
CCL_THROW_IF_NOT(ccl::global_data::env().backend == backend_mode::native,
"incorrect non-native backend is used");
inter_kvs.reset(new internal_kvs());
if (attr.is_valid<kvs_attr_id::ip_port>()) {
std::string server_address = attr.get<kvs_attr_id::ip_port>();
inter_kvs->set_server_address(server_address);
}
inter_kvs->kvs_main_server_address_reserve(get_addr().data());
inter_kvs->kvs_init(get_addr().data());
}
native_kvs_impl::native_kvs_impl(const kvs::address_type& addr, const kvs_attr& attr)
: base_kvs_impl(addr) {
CCL_THROW_IF_NOT(ccl::global_data::env().backend == backend_mode::native,
"incorrect non-native backend is used");
inter_kvs.reset(new internal_kvs());
if (attr.is_valid<kvs_attr_id::ip_port>()) {
std::string server_address = attr.get<kvs_attr_id::ip_port>();
inter_kvs->set_server_address(server_address);
}
inter_kvs->kvs_init(get_addr().data());
}
vector_class<char> native_kvs_impl::get(const string_class& key) {
CCL_THROW_IF_NOT(ccl::global_data::env().backend == backend_mode::native,
"incorrect non-native backend is used");
std::string ret;
CCL_THROW_IF_NOT(inter_kvs->kvs_get_value_by_name_key(prefix.c_str(), key.c_str(), ret) ==
KVS_STATUS_SUCCESS,
"kvs get failed");
return { ret.begin(), ret.end() };
}
void native_kvs_impl::set(const string_class& key, const vector_class<char>& data) {
CCL_THROW_IF_NOT(ccl::global_data::env().backend == backend_mode::native,
"incorrect non-native backend is used");
CCL_THROW_IF_NOT(!data.empty(), "data should have at least one element");
CCL_THROW_IF_NOT(data.back() == '\0', "data should have terminating symbol");
CCL_THROW_IF_NOT(data.data(), "data pointer should be non-null");
inter_kvs->kvs_set_value(prefix.c_str(), key.c_str(), data.data());
}
std::shared_ptr<internal_kvs> native_kvs_impl::get() const {
return inter_kvs;
}
namespace v1 {
kvs::address_type CCL_API kvs::get_address() const {
return pimpl->get_addr();
}
vector_class<char> CCL_API kvs::get(const string_class& key) {
return pimpl->get(key);
}
void CCL_API kvs::set(const string_class& key, const vector_class<char>& data) {
pimpl->set(key, data);
}
static base_kvs_impl* get_kvs_impl(const kvs::address_type& addr, const kvs_attr& attr) {
#ifdef CCL_ENABLE_STUB_BACKEND
if (ccl::global_data::env().backend == backend_mode::stub) {
// in case of stub kvs we use a limited functionality and ignore attr parameter
return new stub_kvs_impl(addr);
}
#endif // CCL_ENABLE_STUB_BACKEND
return new native_kvs_impl(addr, attr);
}
static base_kvs_impl* get_kvs_impl(const kvs_attr& attr) {
#ifdef CCL_ENABLE_STUB_BACKEND
if (ccl::global_data::env().backend == backend_mode::stub) {
// in case of stub kvs we use a limited functionality and ignore attr parameter
return new stub_kvs_impl();
}
#endif // CCL_ENABLE_STUB_BACKEND
return new native_kvs_impl(attr);
}
CCL_API kvs::kvs(const kvs::address_type& addr, const kvs_attr& attr)
: pimpl(get_kvs_impl(addr, attr)) {}
CCL_API const base_kvs_impl& kvs::get_impl() {
return *pimpl;
}
CCL_API kvs::kvs(const kvs_attr& attr) : pimpl(get_kvs_impl(attr)) {}
CCL_API kvs::~kvs() {}
} // namespace v1
} // namespace ccl