Skip to content

Commit

Permalink
add pika_to_redis (OpenAtomFoundation#137)
Browse files Browse the repository at this point in the history
* Support delbackup command

* bugfix

* fix logical errors

* set the default value of expiry time is 0

* Create pika_to_redis.cc

* Create migrator_thread.cc

* Create migrator_thread.h

* Create parse_thread.cc

* Create parse_thread.h

* Create sender_thread.cc

* Create sender_thread.h

* Create Makefile

* modify Makefile

* add AUTH and bugfix

* modify interface

* bugfix

* bugfix

* delete parse thread

* bugfix

* bugfix

* add auth

* bugfix

* bugfix

* update

* pika_to_redis

* delete sender_thread

* remove redundant code

* update Makefile

* decrease lock scope

* format code

* format code

* fix multikey

* optimize Makefile

* fix expire bug

* add reconnect

* optimize log

* update usage()

* set the level of log is WARN

* fix conflict
  • Loading branch information
Leviathan1995 authored and KernelMaker committed Jul 31, 2017
1 parent 5301dd9 commit 64f6b2b
Show file tree
Hide file tree
Showing 6 changed files with 626 additions and 0 deletions.
41 changes: 41 additions & 0 deletions tools/pika_to_redis/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
GCC = g++
CPPFLAGS = -Wall -W -Wno-unused-parameter -DDEBUG -D__XDEBUG__ -g -O3 -std=c++11
OBJECT = pika_to_redis

LIB_PATH = -L ../../third/nemo/output/lib/ \
-L ../../third/pink/pink/lib/ \
-L ../../third/slash/slash/lib/ \
-L ../../third/nemo/3rdparty/nemo-rocksdb/output/lib \

LIBS = -Wl,-Bstatic -lnemo -lrocksdb\
-Wl,-Bdynamic -lpthread\
-lsnappy \
-lrt \
-lz \
-lbz2 \
-lpink \
-lslash \
-lnemodb \
-lrocksdb \


INCLUDE_PATH = -I../../third/pink/ \
-I../../third/slash/ \
-I../../third/nemo/include/ \
-I../../third/nemo/3rdparty/nemo-rocksdb/include/ \
-I../../third/nemo/3rdparty/nemo-rocksdb/rocksdb/include/ \


.PHONY: all clean

all: $(OBJECT)
rm *.o

pika_to_redis : pika_to_redis.o migrator_thread.o sender.o
$(GCC) $(CPPFLAGS) -o $@ $^ $(INCLUDE_PATH) $(LIB_PATH) $(LIBS)

%.o : %.cc
$(GCC) $(CPPFLAGS) -c $< -o $@ $(INCLUDE_PATH)

clean:
rm -rf $(OBJECT) $(OBJECT).o
72 changes: 72 additions & 0 deletions tools/pika_to_redis/migrator_thread.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#include "migrator_thread.h"

MigratorThread::~MigratorThread() {
}

void MigratorThread::MigrateDB(const char type) {
if (type == nemo::DataType::kKv) {
nemo::KIterator *it = db_->KScan("", "", -1, false);
std::string key, value;

while (it->Valid()) {
key = it->key();
value = it->value();
pink::RedisCmdArgsType argv;
std::string cmd;

int64_t ttl;
db_->TTL(key, &ttl);

argv.push_back("SET");
argv.push_back(key);
argv.push_back(value);
if (ttl > 0) {
argv.push_back("EX");
argv.push_back(std::to_string(ttl));
}

it->Next();
pink::SerializeRedisCommand(argv, &cmd);
PlusNum();
cmd = 'k' + cmd;
DispatchKey(cmd);
}
delete it;
} else {
char c_type = 'a';
switch (type) {
case nemo::DataType::kHSize:
c_type = 'h';
break;
case nemo::DataType::kSSize:
c_type = 's';
break;
case nemo::DataType::kLMeta:
c_type = 'l';
break;
case nemo::DataType::kZSize:
c_type = 'z';
break;
}
rocksdb::Iterator *it = db_->Scanbytype(c_type);
std::string key_start = "a";
key_start[0] = type;
it->Seek(key_start);
for (; it->Valid(); it->Next()) {
PlusNum();
DispatchKey(it->key().ToString());
}
}
}

void MigratorThread::DispatchKey(const std::string &key) {
senders_[thread_index_]->LoadKey(key);
thread_index_ = (thread_index_ + 1) % thread_num_;
}

void *MigratorThread::ThreadMain() {
MigrateDB(type_);
should_exit_ = true;
log_info("%c keys have been dispatched completly", static_cast<char>(type_));
return NULL;
}
47 changes: 47 additions & 0 deletions tools/pika_to_redis/migrator_thread.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#ifndef MIGRATOR_THREAD_H_
#define MIGRATOR_THREAD_H_

#include <iostream>
#include "nemo.h"
#include "pink/include/redis_cli.h"
#include "sender.h"

class MigratorThread : public pink::Thread {
public:
MigratorThread(nemo::Nemo *db, std::vector<Sender *> senders, char type, int thread_num) :
db_(db),
senders_(senders),
type_(type),
thread_num_(thread_num),
thread_index_(0),
num_(0)
{
}

int64_t num() {
slash::MutexLock l(&num_mutex_);
return num_;
}

virtual ~ MigratorThread();
bool should_exit_;
private:
nemo::Nemo *db_;
std::vector<Sender *> senders_;
char type_;
int thread_num_;
int thread_index_;

void MigrateDB(const char type);
void DispatchKey(const std::string &key);

int64_t num_;
slash::Mutex num_mutex_;

void PlusNum() {
slash::MutexLock l(&num_mutex_);
++num_;
}
virtual void *ThreadMain();
};
#endif
132 changes: 132 additions & 0 deletions tools/pika_to_redis/pika_to_redis.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
#include <iostream>
#include <string>
#include <sstream>
#include "pink/include/redis_cli.h"
#include "nemo.h"
#include "sender.h"
#include "migrator_thread.h"

using std::chrono::high_resolution_clock;
using std::chrono::milliseconds;

const int64_t kTestPoint = 500000;
const int64_t kTestNum = LLONG_MAX;
const int64_t kDataSetNum = 5;

std::string db_path;
std::string ip;
int port;
size_t thread_num;
std::string password;

std::vector<Sender*> senders;
std::vector<MigratorThread*> migrators;
nemo::Nemo *db;

void PrintConf() {
std::cout << "db_path : " << db_path << std::endl;
std::cout << "redis ip : " << ip << std::endl;
std::cout << "redis port : " << port << std::endl;
std::cout << "thread_num : " << thread_num << std::endl;
std::cout << "====================================" << std::endl;

}

void Usage() {
std::cout << "Usage: " << std::endl;
std::cout << " ./pika_to_redis db_path ip port thread_number [password]\n";
std::cout << " example: ./pika_to_redis ~/db 127.0.0.1 6379 8 123456\n";
}

int main(int argc, char **argv)
{
if (argc < 5) {
Usage();
return -1;
}
high_resolution_clock::time_point start = high_resolution_clock::now();

db_path = std::string(argv[1]);
ip = std::string(argv[2]);
port = atoi(argv[3]);
thread_num = atoi(argv[4]);
if (argc == 6) {
password = std::string(argv[5]);
}

if (db_path[db_path.length() - 1] != '/') {
db_path.append("/");
}

PrintConf();

// Init db
nemo::Options option;
option.write_buffer_size = 256 * 1024 * 1024; // 256M
option.target_file_size_base = 20 * 1024 * 1024; // 20M
db = new nemo::Nemo(db_path ,option);

// Open Options
rocksdb::Options open_options_;
open_options_.create_if_missing = true;
open_options_.write_buffer_size = 256 * 1024 * 1024; // 256M
open_options_.max_manifest_file_size = 64*1024*1024;
open_options_.max_log_file_size = 512*1024*1024;
open_options_.keep_log_file_num = 10;


// Init SenderThread
for (size_t i = 0; i < thread_num; i++) {
Sender *sender = new Sender(db, ip, port, password);
senders.push_back(sender);
}

migrators.push_back(new MigratorThread(db, senders, nemo::DataType::kKv, thread_num));
migrators.push_back(new MigratorThread(db, senders, nemo::DataType::kHSize, thread_num));
migrators.push_back(new MigratorThread(db, senders, nemo::DataType::kSSize, thread_num));
migrators.push_back(new MigratorThread(db, senders, nemo::DataType::kLMeta, thread_num));
migrators.push_back(new MigratorThread(db, senders, nemo::DataType::kZSize, thread_num));

// start threads
for (size_t i = 0; i < kDataSetNum; i++) {
migrators[i]->StartThread();
}
for (size_t i = 0; i < thread_num; i++) {
senders[i]->StartThread();
}


for(size_t i = 0; i < kDataSetNum; i++) {
migrators[i]->JoinThread();
}
for(size_t i = 0; i < thread_num; i++) {
senders[i]->Stop();
}
for (size_t i = 0; i < thread_num; i++) {
senders[i]->JoinThread();
}


int64_t replies = 0, records = 0;
for (size_t i = 0; i < kDataSetNum; i++) {
records += migrators[i]->num();
delete migrators[i];
}
for (size_t i = 0; i < thread_num; i++) {
replies += senders[i]->elements();
delete senders[i];
}

high_resolution_clock::time_point end = high_resolution_clock::now();
std::chrono::hours h = std::chrono::duration_cast<std::chrono::hours>(end - start);
std::chrono::minutes m = std::chrono::duration_cast<std::chrono::minutes>(end - start);
std::chrono::seconds s = std::chrono::duration_cast<std::chrono::seconds>(end - start);

std::cout << "====================================" << std::endl;
std::cout << "Running time :";
std::cout << h.count() << " hour " << m.count() - h.count() * 60 << " min " << s.count() - h.count() * 60 * 60 << " s\n";
std::cout << "Total records : " << records << " have been Scaned\n";
std::cout << "Total replies : " << replies << " received from redis server\n";
delete db;
return 0;
}
Loading

0 comments on commit 64f6b2b

Please sign in to comment.