Skip to content

Commit

Permalink
add uilt file
Browse files Browse the repository at this point in the history
  • Loading branch information
laofan13 committed Jul 15, 2023
1 parent 2995f49 commit aa08bf2
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 115 deletions.
Binary file modified code/alimama/alimama_search_service
Binary file not shown.
154 changes: 39 additions & 115 deletions code/alimama/alimama_search_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@
#include <string>
#include <chrono>

#include <arpa/inet.h>
#include <ifaddrs.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include "utils.h"
#include "config.h"

#include <etcd/Client.hpp>
#include <grpcpp/ext/proto_server_reflection_plugin.h>
Expand All @@ -38,103 +36,26 @@ using alimama::proto::Request;
using alimama::proto::Response;
using alimama::proto::SearchService;

std::string getLocalIP() {
struct ifaddrs *ifAddrStruct = NULL;
void *tmpAddrPtr = NULL;
std::string localIP;
getifaddrs(&ifAddrStruct);
while (ifAddrStruct != NULL) {
if (ifAddrStruct->ifa_addr->sa_family == AF_INET) {
tmpAddrPtr = &((struct sockaddr_in *)ifAddrStruct->ifa_addr)->sin_addr;
char addressBuffer[INET_ADDRSTRLEN];
inet_ntop(AF_INET, tmpAddrPtr, addressBuffer, INET_ADDRSTRLEN);
std::string interfaceName(ifAddrStruct->ifa_name);
if (interfaceName == "en0" || interfaceName == "eth0") {
return addressBuffer;
}
}
ifAddrStruct = ifAddrStruct->ifa_next;
}
return "";
}

struct Options {
int node_id;
int node_num;
int cpu;
uint64_t mempry;
};

std::ostream& operator<<(std::ostream& os, Options& options) {
os << "node information: {" << std::endl;
os << '\t' << "node_id: " << options.node_id << std::endl
<< '\t' << "node_num: " << options.node_num << std::endl
<< '\t' << "mempry: " << options.mempry << std::endl
<< '\t' << "cpu: " << options.cpu << std::endl;
os << "}"<< std::endl;
return os;
}

Options loadENV() {
Options options;
const char* env_p = std::getenv("NODE_ID");
if(env_p)
options.node_id = std::stoi(env_p) - 1;
else{
options.node_id = 0;
}
// std::cout<< "NODE_ID: " << options.node_id << std::endl;
env_p = std::getenv("NODE_NUM");
if(env_p)
options.node_num = std::stoi(env_p);
else{
options.node_num = 1;
}
// std::cout<< "NODE_NUM: " << options.node_num << std::endl;
env_p = std::getenv("MEMORY");
if(env_p)
options.mempry = std::stoull(env_p) << 30; //4G
else{
options.mempry = 2 << 30; //2G
}
// std::cout<< "MEMORY: " << options.mempry << std::endl;
env_p = std::getenv("CPU");
if(env_p != nullptr)
options.cpu = std::stoi(env_p);
else{
options.cpu = 2;
}
// std::cout<< "CPU: " << options.cpu << std::endl;
return options;
}


// ETCD
const std::string ECTD_URL = "http://etcd:2379";
const std::string modelservice_key = "/services/searchservice";
const std::string filename = "/data/raw_data.csv";
const size_t initialSize = 1e5;
constexpr size_t buf_size = 1 << 30;

class SearchServiceImpl final : public SearchService::Service {
public:
SearchServiceImpl(Options options, std::string server_address):
options_(options),
server_address_(server_address),
etcd_(ECTD_URL),
hash_tables(initialSize)
{
etcd_(ECTD_URL)
{

}

~SearchServiceImpl() {
etcd_.rm(modelservice_key);
if(datas_)
delete [] datas_;
}

int init() {
// 记录开始时间
auto start = std::chrono::high_resolution_clock::now();
if(load_data() != 0) {
if(load_csv_data() != 0) {
std::cout << "fiald to load data " << filename << std::endl;
return -1;
}
Expand All @@ -147,6 +68,15 @@ class SearchServiceImpl final : public SearchService::Service {
std::cout << "读取文件所花费的时间:" << duration << " 毫秒" << std::endl;
std::cout << "文件行数:" << data_num << std::endl;

// 排序
// start = std::chrono::high_resolution_clock::now();
std::sort(datas_, datas_ + data_num, [] (const RawData & l, const RawData & r) {
return l.adgroup_id < r.adgroup_id;
});
// end = std::chrono::high_resolution_clock::now();
// duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
// std::cout << "排序数据所花费的时间:" << duration << " 毫秒" << std::endl;

// for(auto & pair: hash_tables) {
// std::cout << "keyword:" << pair.first << std::endl;
// for(auto & data: pair.second) {
Expand All @@ -159,9 +89,9 @@ class SearchServiceImpl final : public SearchService::Service {
// std::cout <<std::endl;
// }


// 将服务地址注册到etcd中
auto response = etcd_.set(modelservice_key, server_address_).get();
// std::string val = std::to_string(data_num) + "|" + std::to_string(datas.capacity());
auto response = etcd_.set(modelservice_key, server_address_ ).get();
if (response.is_ok()) {
std::cout << "Service registration successful.\n";
} else {
Expand All @@ -173,34 +103,34 @@ class SearchServiceImpl final : public SearchService::Service {
}

private:
int load_data() {
int load_csv_data() {// alloc memory
datas_ = new RawData[total_data_num];

std::ifstream csv_file(filename);
char *buffer = new char[buf_size];
csv_file.rdbuf()->pubsetbuf(buffer, buf_size);
char buf[65536];
csv_file.rdbuf()->pubsetbuf(buf, 65536);

if (!csv_file.is_open()) {
std::cout << "fiald to open " << filename << std::endl;
delete [] buffer;
return -1;
}


int64_t key_word = 0;
std::string line, cell;
while (std::getline(csv_file, line)) {
int n = line.size();
int i = 0;
key_word = 0;
RawData rawData;
RawData & rawData = datas_[data_num];
// keywrod
while(i < n && line[i] != '\t') {
key_word = key_word * 10 + (line[i] - '0');
i++;
}

// load
// if(key_word % options_.node_num != options_.node_id)
// continue;
// load controll
if(key_word % options_.node_num != options_.node_id)
continue;

while(i < n && line[i] == '\t')
i++;
Expand Down Expand Up @@ -274,15 +204,9 @@ class SearchServiceImpl final : public SearchService::Service {
++i;
}

if(hash_tables.find(key_word) == hash_tables.end()) {
hash_tables[key_word] = {};
}
hash_tables[key_word].push_back(std::move(rawData));
data_num++;
}

// free buf
delete [] buffer;
// 关闭文件
csv_file.close();
return 0;
Expand All @@ -301,17 +225,17 @@ class SearchServiceImpl final : public SearchService::Service {
std::vector<RawData> searchDatas;
std::vector<int> indexs;
int j = 0;
for(int i = 0; i < request->keywords_size(); i++) {
auto keyword = request->keywords(i);
if(hash_tables.find(keyword) != hash_tables.end()) {
for(auto & data: hash_tables[keyword]) {
if(data.timings_mask & (1 << hour)) {
searchDatas.push_back(data);
indexs.push_back(j++);
}
}
}
}
// for(int i = 0; i < request->keywords_size(); i++) {
// auto keyword = request->keywords(i);
// if(hash_tables.find(keyword) != hash_tables.end()) {
// for(auto & data: hash_tables[keyword]) {
// if(data.timings_mask & (1 << hour)) {
// searchDatas.push_back(data);
// indexs.push_back(j++);
// }
// }
// }
// }

// 2. 分数计算
// 预估点击率 = 商品向量 和 用户_关键词向量 的余弦距离
Expand Down Expand Up @@ -369,7 +293,7 @@ class SearchServiceImpl final : public SearchService::Service {
float vec1, vec2;
};

std::unordered_map<int64_t, std::vector<RawData>> hash_tables;
RawData* datas_;
uint64_t data_num = 0;
};

Expand Down
81 changes: 81 additions & 0 deletions code/alimama/utils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#include <ostream>
#include <string>
#include <arpa/inet.h>
#include <ifaddrs.h>
#include <netinet/in.h>
#include <sys/socket.h>


std::string getLocalIP() {
struct ifaddrs *ifAddrStruct = NULL;
void *tmpAddrPtr = NULL;
std::string localIP;
getifaddrs(&ifAddrStruct);
while (ifAddrStruct != NULL) {
if (ifAddrStruct->ifa_addr->sa_family == AF_INET) {
tmpAddrPtr = &((struct sockaddr_in *)ifAddrStruct->ifa_addr)->sin_addr;
char addressBuffer[INET_ADDRSTRLEN];
inet_ntop(AF_INET, tmpAddrPtr, addressBuffer, INET_ADDRSTRLEN);
std::string interfaceName(ifAddrStruct->ifa_name);
if (interfaceName == "en0" || interfaceName == "eth0") {
return addressBuffer;
}
}
ifAddrStruct = ifAddrStruct->ifa_next;
}
return "";
}

struct Options {
int node_id;
int node_num;
int cpu;
uint64_t mempry;
};

std::ostream& operator<<(std::ostream& os, Options& options) {
os << "node information: {" << std::endl;
os << '\t' << "node_id: " << options.node_id << std::endl
<< '\t' << "node_num: " << options.node_num << std::endl
<< '\t' << "mempry: " << options.mempry << std::endl
<< '\t' << "cpu: " << options.cpu << std::endl;
os << "}"<< std::endl;
return os;
}

Options loadENV() {
Options options;
const char* env_p = std::getenv("NODE_ID");
if(env_p)
options.node_id = std::stoi(env_p) - 1;
else{
options.node_id = 0;
}
// std::cout<< "NODE_ID: " << options.node_id << std::endl;
env_p = std::getenv("NODE_NUM");
if(env_p)
options.node_num = std::stoi(env_p);
else{
options.node_num = 1;
}
// std::cout<< "NODE_NUM: " << options.node_num << std::endl;
env_p = std::getenv("MEMORY");
if(env_p)
options.mempry = std::stoull(env_p) << 30; //4G
else{
options.mempry = 2 << 30; //2G
}
// std::cout<< "MEMORY: " << options.mempry << std::endl;
env_p = std::getenv("CPU");
if(env_p != nullptr)
options.cpu = std::stoi(env_p);
else{
options.cpu = 2;
}
// std::cout<< "CPU: " << options.cpu << std::endl;
return options;
}

// void parserRawData() {

// }

0 comments on commit aa08bf2

Please sign in to comment.