Skip to content

Commit

Permalink
Support pika context tool (OpenAtomFoundation#186)
Browse files Browse the repository at this point in the history
* Support keys for select by type

* limit the number of parameter

* rename parameters

* fix code

* remove redundant code

* add pika_to_txt

* delete binary file

* close file

* add txt_to_pika

* add txt_to_pika

* add run time statistics

* bugfix

* Delete insert.h

* Delete insert.cc

* fix condition variable error

* fix variable

* add ttl option

* use uint64_t instead of int
  • Loading branch information
Leviathan1995 authored and zhangtianshuo committed Oct 13, 2017
1 parent 5c2842c commit 0cc4f0a
Show file tree
Hide file tree
Showing 12 changed files with 732 additions and 0 deletions.
42 changes: 42 additions & 0 deletions tools/pika_to_txt/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
GCC = g++
CPPFLAGS = -Wall -W -Wno-unused-parameter -DDEBUG -D__XDEBUG__ -g -O3 -std=c++11
OBJECT = pika_to_txt

include ../../make_config.mk
LIB_PATH = -L ../../third/nemo/lib/ \
-L ../../third/pink/pink/lib/ \
-L ../../third/slash/slash/lib/ \
-L ../../third/nemo-rocksdb/lib \
-L ../../third/rocksdb/

LIBS = -Wl,-Bstatic -lnemo -lrocksdb\
-Wl,-Bdynamic -lpthread\
-lrt \
-lpink \
-lslash \
-lnemodb \
-lrocksdb

LIBS += $(ROCKSDB_LDFLAGS)


INCLUDE_PATH = -I../../third/pink/ \
-I../../third/slash/ \
-I../../third/nemo/include/ \
-I../../third/nemo-rocksdb/include/ \
-I../../third/rocksdb/include/


.PHONY: all clean

all: $(OBJECT)
rm *.o

pika_to_txt : pika_to_txt.o scan.o write.o
$(GCC) $(CPPFLAGS) -o $@ $^ $(INCLUDE_PATH) $(LIB_PATH) $(LIBS)

%.o : %.cc
$(GCC) $(CPPFLAGS) -c $< -o $@ $(INCLUDE_PATH)

clean:
rm -rf $(OBJECT) $(OBJECT).o
70 changes: 70 additions & 0 deletions tools/pika_to_txt/pika_to_txt.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#include <iostream>
#include <chrono>

#include "scan.h"
#include "write.h"

using std::chrono::high_resolution_clock;

void Usage() {
std::cout << "Usage: " << std::endl;
std::cout << " ./pika_to_txt db_path [filename]" << std::endl;
std::cout << " example: ./pika_to_txt ~/db data.txt" << std::endl;
}

int main(int argc, char **argv) {
if (argc < 2 || argc > 3) {
Usage();
return 0;
}

high_resolution_clock::time_point start = high_resolution_clock::now();

std::string db_path = std::string(argv[1]);
std::string filename = "data.txt";
if (argc == 3) {
filename = argv[2];
}

if (db_path[db_path.length() - 1] != '/') {
db_path.append("/");
}
std::cout << db_path << std::endl;

// Init db
nemo::Options option;
option.write_buffer_size = 256 * 1024 * 1024; // 256M
option.target_file_size_base = 20 * 1024 * 1024; // 20M
nemo::Nemo *db = new nemo::Nemo(db_path, option);

// Init scan thread
WriteThread* write_thread = new WriteThread(filename);
ScanThread* scan_thread = new ScanThread(db, write_thread);

scan_thread->StartThread();
write_thread->StartThread();

scan_thread->JoinThread();

write_thread->Stop();
write_thread->JoinThread();

std::cout <<"Total " << scan_thread->Num() << " records has been scaned"<< std::endl;
std::cout <<"Total " << write_thread->Num() << " records hash been writed to file" << std::endl;

delete db;
delete write_thread;
delete scan_thread;

high_resolution_clock::time_point end = high_resolution_clock::now();
std::chrono::hours h = std::chrono::duration_cast<std::chrono::hours>(end - start);
std::chrono::minutes m = std::chrono::duration_cast<std::chrono::minutes>(end - start);
std::chrono::seconds s = std::chrono::duration_cast<std::chrono::seconds>(end - start);

std::cout << "====================================" << std::endl;
std::cout << "Running time :";
std::cout << h.count() << " hour " << m.count() - h.count() * 60 << " min " << s.count() - h.count() * 60 * 60 << " s\n" << std::endl;

return 0;
}

32 changes: 32 additions & 0 deletions tools/pika_to_txt/scan.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#include "scan.h"

ScanThread::~ScanThread() {
}

void ScanThread::Scandb() {
nemo::KIterator *it = db_->KScan("", "", -1, false);
std::string key, value;

while (it->Valid()) {
key = it->key();
value = it->value();
uint32_t key_len, value_len;
std::string data_info;
key_len = key.length();
slash::PutFixed32(&data_info, key_len);
data_info += key;
value_len = value.length();
slash::PutFixed32(&data_info, value_len);
data_info += value;

write_thread_->Load(data_info);
num_++;
it->Next();
}
delete it;
}

void *ScanThread::ThreadMain() {
Scandb();
return NULL;
}
32 changes: 32 additions & 0 deletions tools/pika_to_txt/scan.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#ifndef SCAN_H_
#define SCAN_H_

#include <iostream>
#include "nemo.h"
#include "write.h"
#include "slash/include/slash_coding.h"

class ScanThread : public pink::Thread {
public:
ScanThread(nemo::Nemo *db, WriteThread * write_thread) :
db_(db),
write_thread_(write_thread),
num_(0)
{
}

virtual ~ScanThread();
int Num() {
return num_;
}
private:
void Scandb();
nemo::Nemo* db_;
WriteThread* write_thread_;
int64_t num_;

void* ThreadMain();
};

#endif

49 changes: 49 additions & 0 deletions tools/pika_to_txt/write.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#include <iostream>
#include <fstream>

#include "write.h"

WriteThread::~WriteThread() {
}

void WriteThread::Load(std::string data) {
data_mutex_.Lock();
if (data_.size() < 100000) {
data_.push(data);
rsignal_.Signal();
data_mutex_.Unlock();
} else {
while (data_.size() > 100000) {
wsignal_.Wait();
}
data_.push(data);
rsignal_.Signal();
data_mutex_.Unlock();
}
}

void *WriteThread::ThreadMain() {
std::cout << "Start write to file" << std::endl;
std::fstream fout(filename_, std::ios::out|std::ios::binary);
while(!should_exit_ || QueueSize() != 0) {
data_mutex_.Lock();
while (data_.size() == 0 && !should_exit_) {
rsignal_.Wait();
}
data_mutex_.Unlock();

if (QueueSize() != 0) {
data_mutex_.Lock();
std::string data = data_.front();
data_.pop();
wsignal_.Signal();
data_mutex_.Unlock();
fout.write(data.c_str(), data.size());
num_++;
}
}
std::cout <<"Write to " << filename_ <<" complete";
fout.close();
return NULL;
}

50 changes: 50 additions & 0 deletions tools/pika_to_txt/write.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#ifndef WRITE_H_
#define WRITE_H_

#include <queue>
#include "pink/include/bg_thread.h"

class WriteThread : public pink::Thread {
public:
WriteThread(std::string& filename) :
should_exit_(false),
num_(0),
filename_(filename),
rsignal_(&data_mutex_),
wsignal_(&data_mutex_)
{
}
void Stop() {
should_exit_ = true;
data_mutex_.Lock();
rsignal_.Signal();
data_mutex_.Unlock();
}
~WriteThread();

void Load(std::string data);

int QueueSize() {
slash::MutexLock l(&data_mutex_);
int len = data_.size();
return len;
}

int Num() {
return num_;
}

bool should_exit_;
private:
int num_;
std::string filename_;
std::queue<std::string> data_;
slash::Mutex data_mutex_;
slash::CondVar rsignal_;
slash::CondVar wsignal_;

virtual void *ThreadMain();
};

#endif

42 changes: 42 additions & 0 deletions tools/txt_to_pika/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
GCC = g++
CPPFLAGS = -Wall -W -Wno-unused-parameter -DDEBUG -D__XDEBUG__ -g -O3 -std=c++11
OBJECT = txt_to_pika

include ../../make_config.mk
LIB_PATH = -L ../../third/nemo/lib/ \
-L ../../third/pink/pink/lib/ \
-L ../../third/slash/slash/lib/ \
-L ../../third/nemo-rocksdb/lib \
-L ../../third/rocksdb/

LIBS = -Wl,-Bstatic -lnemo -lrocksdb\
-Wl,-Bdynamic -lpthread\
-lrt \
-lpink \
-lslash \
-lnemodb \
-lrocksdb

LIBS += $(ROCKSDB_LDFLAGS)


INCLUDE_PATH = -I../../third/pink/ \
-I../../third/slash/ \
-I../../third/nemo/include/ \
-I../../third/nemo-rocksdb/include/ \
-I../../third/rocksdb/include/


.PHONY: all clean

all: $(OBJECT)
rm *.o

txt_to_pika : txt_to_pika.o scan.o sender.o
$(GCC) $(CPPFLAGS) -o $@ $^ $(INCLUDE_PATH) $(LIB_PATH) $(LIBS)

%.o : %.cc
$(GCC) $(CPPFLAGS) -c $< -o $@ $(INCLUDE_PATH)

clean:
rm -rf $(OBJECT) $(OBJECT).o
65 changes: 65 additions & 0 deletions tools/txt_to_pika/scan.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#include <iostream>
#include <fstream>
#include "scan.h"

ScanThread::~ScanThread() {
}

void ScanThread::ScanFile() {
log_info("Start to scan");

std::ifstream fout(filename_, std::ios::binary);
fout.seekg(0, std::ios::end);
uint64_t filesize = fout.tellg();
fout.seekg(0, std::ios::beg);

uint64_t index = 0;
while (index < filesize) {
std::string str_key, str_value, cmd;
uint32_t key_len;
fout.read(reinterpret_cast<char *>(&key_len), sizeof(uint32_t));
char * key = new char[key_len];
fout.read(key, key_len);
str_key.append(key, key_len);

uint32_t value_len;
fout.read(reinterpret_cast<char *>(&value_len), sizeof(uint32_t));
char * value = new char[value_len];
fout.read(value, value_len);
str_value.append(value, value_len);

pink::RedisCmdArgsType argv;
if (ttl_ > 0) {
argv.push_back("SETEX");
argv.push_back(str_key);
argv.push_back(std::to_string(ttl_));
argv.push_back(str_value);
} else {
argv.push_back("SET");
argv.push_back(str_key);
argv.push_back(str_value);
}

pink::SerializeRedisCommand(argv, &cmd);

DispatchCmd(cmd);
num_++;

delete []key;
delete []value;

index += key_len + value_len + sizeof(uint32_t) * 2;
}
fout.close();
}

void ScanThread::DispatchCmd(const std::string &cmd) {
senders_[thread_index_]->LoadCmd(cmd);
thread_index_ = (thread_index_ + 1) % senders_.size();
}

void *ScanThread::ThreadMain() {
ScanFile();
log_info("Scan file complete");
return NULL;
}
Loading

0 comments on commit 0cc4f0a

Please sign in to comment.