forked from vesoft-inc/nebula
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathClusterIdMan.h
127 lines (115 loc) · 4.44 KB
/
ClusterIdMan.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
/* Copyright (c) 2019 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/
#ifndef META_CLUSTERIDMAN_H_
#define META_CLUSTERIDMAN_H_
#include "base/Base.h"
#include "fs/FileUtils.h"
#include "kvstore/Common.h"
#include "kvstore/KVStore.h"
#include "meta/processors/Common.h"
#include <folly/synchronization/Baton.h>
namespace nebula {
namespace meta {
/**
* This class manages clusterId used for meta server and storage server.
* */
class ClusterIdMan {
public:
static ClusterID create(const std::string& metaAddrs) {
std::hash<std::string> hash_fn;
auto clusterId = hash_fn(metaAddrs);
uint64_t mask = 0x7FFFFFFFFFFFFFFF;
clusterId &= mask;
LOG(INFO) << "Create ClusterId " << clusterId;
return clusterId;
}
static bool persistInFile(ClusterID clusterId, const std::string& filename) {
auto dirname = fs::FileUtils::dirname(filename.c_str());
if (!fs::FileUtils::makeDir(dirname)) {
LOG(ERROR) << "Failed mkdir " << dirname;
return false;
}
if (fs::FileUtils::remove(filename.c_str())) {
LOG(INFO) << "Remove the existed file " << filename;
}
int fd = ::open(filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644);
if (fd < 0) {
LOG(ERROR) << "Open file error, file " << filename << ", error " << strerror(errno);
return false;
}
int bytes = ::write(fd, reinterpret_cast<const char*>(&clusterId), sizeof(ClusterID));
if (bytes != sizeof(clusterId)) {
LOG(ERROR) << "Write clusterId failed!";
::close(fd);
return false;
}
LOG(INFO) << "Persiste clusterId " << clusterId << " succeeded!";
::close(fd);
return true;
}
static ClusterID getClusterIdFromFile(const std::string& filename) {
LOG(INFO) << "Try to open " << filename;
int fd = ::open(filename.c_str(), O_RDONLY);
if (fd < 0) {
LOG(WARNING) << "Open file failed, error " << strerror(errno);
return 0;
}
ClusterID clusterId = 0;
int len = ::read(fd, reinterpret_cast<char*>(&clusterId), sizeof(ClusterID));
if (len != sizeof(ClusterID)) {
LOG(ERROR) << "Get clusterId failed!";
::close(fd);
return 0;
}
LOG(INFO) << "Get clusterId: " << clusterId;
::close(fd);
return clusterId;
}
static ClusterID getClusterIdFromKV(kvstore::KVStore* kv, const std::string& key) {
CHECK_NOTNULL(kv);
std::string value;
auto code = kv->get(0, 0, key, &value);
if (code == kvstore::ResultCode::ERR_KEY_NOT_FOUND) {
LOG(INFO) << "There is no clusterId existed in kvstore!";
return 0;
} else if (code == kvstore::ResultCode::SUCCEEDED) {
if (value.size() != sizeof(ClusterID)) {
LOG(ERROR) << "Bad clusterId " << value;
return 0;
}
return *reinterpret_cast<const ClusterID*>(value.data());
} else {
LOG(ERROR) << "Error in kvstore, err " << static_cast<int32_t>(code);
return 0;
}
}
static bool persistInKV(kvstore::KVStore* kv,
const std::string& key,
ClusterID clusterId) {
CHECK_NOTNULL(kv);
std::vector<kvstore::KV> data;
data.emplace_back(key,
std::string(reinterpret_cast<char*>(&clusterId), sizeof(ClusterID)));
bool ret = true;
folly::Baton<true, std::atomic> baton;
kv->asyncMultiPut(0, 0, std::move(data), [&](kvstore::ResultCode code) {
if (code != kvstore::ResultCode::SUCCEEDED) {
LOG(ERROR) << "Put failed, error "
<< static_cast<int32_t>(code);
ret = false;
} else {
LOG(INFO) << "Put key " << key
<< ", val " << clusterId;
}
baton.post();
});
baton.wait();
return ret;
}
};
} // namespace meta
} // namespace nebula
#endif // META_CLUSTERIDMAN_H_