Skip to content

Commit

Permalink
some pika tools
Browse files Browse the repository at this point in the history
  • Loading branch information
Axlgrep committed Mar 14, 2023
1 parent 0fc3f5a commit d9b0955
Show file tree
Hide file tree
Showing 123 changed files with 13,555 additions and 0 deletions.
38 changes: 38 additions & 0 deletions pika-tools/aof_to_pika/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
CXX = g++
CXXFLAGS = -Wall -W -DDEBUG -g -O0 -D__XDEBUG__ -fPIC -Wno-unused-function -std=c++11
OBJECT = aof_to_pika
SRC_DIR = ./src
OUTPUT = ./output

INCLUDE_PATH = -I./include -I../..

LIBS = -lpthread

.PHONY: all clean


BASE_OBJS := $(wildcard $(SRC_DIR)/*.cc)
BASE_OBJS += $(wildcard $(SRC_DIR)/*.c)
BASE_OBJS += $(wildcard $(SRC_DIR)/*.cpp)
OBJS = $(patsubst %.cc,%.o,$(BASE_OBJS))


all: $(OBJECT)
rm -rf $(OUTPUT)
mkdir $(OUTPUT)
mkdir $(OUTPUT)/bin
cp $(OBJECT) $(OUTPUT)/bin/
rm -rf $(OBJECT)
@echo "Success, go, go, go..."


$(OBJECT): $(OBJS)
$(CXX) $(CXXFLAGS) -o $@ $(OBJS) $(INCLUDE_PATH) $(LIBS)

$(OBJS): %.o : %.cc
$(CXX) $(CXXFLAGS) -c $< -o $@ $(INCLUDE_PATH)

clean:
rm -rf $(SRC_DIR)/*.o
rm -rf $(OUTPUT)/*
rm -rf $(OUTPUT)
17 changes: 17 additions & 0 deletions pika-tools/aof_to_pika/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
## README

#### Introduction
A tool to transfer data for redis from one to another, it also support any redis-like nosql like [pika](https://github.com/baotiao/pika). The transfor progress is based on redis aof file, which simply read the aof and batch send to the destination peer.

#### Feature

- Continuously read new content of aof as 'tail -f'
- Read response and keep statistics
- More efficiency

#### Usage

``` shell
make
cd output/bin && ./aof_to_pika -h # for more information
```
23 changes: 23 additions & 0 deletions pika-tools/aof_to_pika/include/aof_info.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#ifndef AOF_INFO
#define AOF_INFO

#include <string>

#define AOF_LOG_DEBUG 1
#define AOF_LOG_TRACE 2
#define AOF_LOG_INFO 3
#define AOF_LOG_WARN 4
#define AOF_LOG_ERR 5
#define AOF_LOG_FATAL 6

#define LOG_FATAL(content) info_print(AOF_LOG_FATAL, content)
#define LOG_ERR(content) info_print(AOF_LOG_ERR, content)
#define LOG_WARN(content) info_print(AOF_LOG_WARN, content)
#define LOG_INFO(content) info_print(AOF_LOG_INFO, content)
#define LOG_TRACE(content) info_print(AOF_LOG_TRACE, content)
#define LOG_DEBUG(content) info_print(AOF_LOG_DEBUG, content)

void set_info_level(int);
void info_print(int, const std::string&);
#endif

36 changes: 36 additions & 0 deletions pika-tools/aof_to_pika/include/aof_lock.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#ifndef AOF_LOCK_H
#define AOF_LOCK_H

class CondVar;

class Mutex {
public:
Mutex();
~Mutex();

void Lock();
void Unlock();
void AssertHeld() { }

private:
friend class CondVar;
pthread_mutex_t mu_;

// No copying
Mutex(const Mutex&);
void operator=(const Mutex&);
};

class CondVar {
public:
explicit CondVar(Mutex* mu);
~CondVar();
void Wait();
void Signal();
void SignalAll();
private:
pthread_cond_t cv_;
Mutex* mu_;
};

#endif // AOF_LOCK_H
52 changes: 52 additions & 0 deletions pika-tools/aof_to_pika/include/aof_sender.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#ifndef AOF_SENDER_H
#define AOF_SENDER_H

#include <string>
#include <deque>
#include "aof_lock.h"
#include "aof_info.h"

#define RM_NONE 0
#define RM_READBLE 1
#define RM_WRITABLE 2
#define RM_RECONN 4

#define READ_BUF_MAX 100
#define MSG_BLOCK_MAX 512 * 1024

typedef struct ConnInfo{
ConnInfo(){}
ConnInfo(const std::string &h, const std::string &p, const std::string &a):host_(h),port_(p),auth_(a){}
std::string host_;
std::string port_;
std::string auth_;
} ConnInfo;


class AOFSender
{
public:
AOFSender():buf_wcond_(&buf_mutex_), buf_rcond_(&buf_mutex_){ sockfd_ = -1; nsucc_ = nfail_ = 0; }
~AOFSender();
bool rconnect(const std::string&, const std::string&, const std::string&);
bool message_add(const std::string&);
bool process();
void print_result();

private:
int sockfd_;
int nsucc_, nfail_;
ConnInfo *conn_info_;
Mutex buf_mutex_;
CondVar buf_wcond_;
CondVar buf_rcond_;
std::deque<std::string> read_buffer_;
std::string to_send_;
std::string current_bulk_;
bool message_get_();
bool check_succ_(const std::string&, long&, long&);
int mask_wait_(int fd, int mask, long long milliseconds);
bool set_nonblock_(int fd);
};

#endif
19 changes: 19 additions & 0 deletions pika-tools/aof_to_pika/src/aof_info.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#include <iostream>
#include "aof_info.h"

short aof_info_level_ = AOF_LOG_INFO;

void set_info_level(int l){
aof_info_level_ = l;
}

void info_print(int l, const std::string &content) {
if (l > AOF_LOG_FATAL || l < AOF_LOG_DEBUG || content.empty()) return;

if (l < aof_info_level_) return;
if (l >= AOF_LOG_ERR)
std::cerr << content << std::endl;
else
std::cout << content << std::endl;
}

41 changes: 41 additions & 0 deletions pika-tools/aof_to_pika/src/aof_lock.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#include <pthread.h>
#include <cstdlib>
#include <string>
#include <iostream>

#include "aof_lock.h"


static void PthreadCall(const std::string &label, int result) {
if (result != 0) {
std::cout << "pthread " << label << " : " << result << std::endl;
abort();
}
}

Mutex::Mutex() { PthreadCall("init mutex", pthread_mutex_init(&mu_, NULL)); }

Mutex::~Mutex() { PthreadCall("destroy mutex", pthread_mutex_destroy(&mu_)); }

void Mutex::Lock() { PthreadCall("lock", pthread_mutex_lock(&mu_)); }

void Mutex::Unlock() { PthreadCall("unlock", pthread_mutex_unlock(&mu_)); }

CondVar::CondVar(Mutex* mu)
: mu_(mu) {
PthreadCall("init cv", pthread_cond_init(&cv_, NULL));
}

CondVar::~CondVar() { PthreadCall("destroy cv", pthread_cond_destroy(&cv_)); }

void CondVar::Wait() {
PthreadCall("wait", pthread_cond_wait(&cv_, &mu_->mu_));
}

void CondVar::Signal() {
PthreadCall("signal", pthread_cond_signal(&cv_));
}

void CondVar::SignalAll() {
PthreadCall("broadcast", pthread_cond_broadcast(&cv_));
}
Loading

0 comments on commit d9b0955

Please sign in to comment.