-
Notifications
You must be signed in to change notification settings - Fork 8
/
faiss_common.cpp
137 lines (122 loc) · 3.25 KB
/
faiss_common.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <grpc++/grpc++.h>
#include "faiss_logic.h"
Status FaissServiceImpl::Ping(ServerContext* context,
const ::faiss_server::PingRequest* request,
::faiss_server::PingResponse* response) {
response->set_payload("pong");
return Status::OK;
}
void FaissServiceImpl::PersistIndexPeriod(FaissServiceImpl *handle,const unsigned int duration) {
while (true) {
std::this_thread::sleep_for (std::chrono::seconds(duration));
if (NULL == handle) {
continue;
}
{
unique_readguard<WfirstRWLock> readlock(*(handle->m_lock));
auto *dbs = &(handle->dbs);
for (auto it = dbs->begin(); it != dbs->end(); it++) {
auto db = it->second;
db->persistIndex();
//db->status();
}
}
}
}
int FaissServiceImpl::InitServer() {
cudaSetDevice(0);
m_resources = new StandardGpuResources;
if (NULL == m_resources) {
return -1;
}
m_lock = new WfirstRWLock;
if (NULL == m_lock) {
return -1;
}
gpu_lock = new WfirstRWLock;
if (NULL == gpu_lock) {
return -1;
}
//加载本地已有的db
int rc = LoadLocalDBs();
if (0 != rc) {
return rc;
}
return 0;
}
int FaissServiceImpl::LoadLocalDBs() {
LOG(INFO) << "load local dbs";
MDB_txn *txn = NULL;
MDB_cursor *cursor;
int rc = mdb_txn_begin(m_env, NULL, MDB_RDONLY, &txn);
MDB_val key, data;
size_t len = 128;
char _key[len] = {'\0'};
key.mv_size = len;
key.mv_data = _key;
rc = mdb_cursor_open(txn, *m_dbi, &cursor);
int prefixLen = SPrefix.length();
while ((rc = mdb_cursor_get(cursor, &key, &data, MDB_NEXT)) == 0) {
std::ostringstream oss;
std::string keyStr, valStr;
keyStr.append((char*)key.mv_data, key.mv_size);
oss << "record:" << keyStr;
size_t pos = keyStr.find(SPrefix.c_str());
if (keyStr.length() < SPrefix.length() + 1 ||
pos == std::string::npos) {
oss << " msg:invalid key";
LOG(WARNING) << oss.str();
continue;
}
std::string dbName = keyStr.substr(pos + SPrefix.length());
oss << " db_name:" << dbName;
valStr.append((char*)data.mv_data, data.mv_size);
std::string modelPath, sizeStr;
pos = valStr.find(SDivide.c_str());
size_t maxSize = DefaultDBSize;
if (pos == std::string::npos) {
modelPath = valStr;
} else if (pos > 0) {
sizeStr = valStr.substr(pos + SDivide.length());
maxSize = atoi(sizeStr.c_str());
modelPath = valStr.substr(0, pos);
}
oss << " modelPath:" << modelPath
<< " maxSize:" << maxSize;
//插入新的db
FaissDB *db = new FaissDB(dbName, modelPath, maxSize, this->gpu_lock);
int rc = db->reload(m_resources);
oss << " res:" << rc;
if (rc == ErrorCode::OK) {
dbs[dbName.c_str()] = db;
LOG(INFO) << oss.str();
continue;
} else if (rc == ErrorCode::NOT_FOUND) {
//TODO 都不存在,则删除该条记录
oss << " error_msg:" << "db not exist";
LOG(WARNING) << oss.str();
continue;
} else {
oss << " error_msg:" << "internal error!";
LOG(ERROR) << oss.str();
return rc;
}
}
mdb_cursor_close(cursor);
mdb_txn_abort(txn);
return 0;
}
FaissServiceImpl::FaissServiceImpl():LmDB(SGlobalDBName,0),
m_resources(NULL), m_lock(NULL) {
int rc = InitServer();
if (rc != 0) {
LOG(FATAL) << "initialize FaissServiceImpl failed:" << rc;
exit(-1);
}
}
FaissServiceImpl::~FaissServiceImpl() {
delete m_resources;
}