Skip to content

Commit

Permalink
add tool:binlog parser
Browse files Browse the repository at this point in the history
  • Loading branch information
wenduo committed Jun 29, 2016
1 parent 5260510 commit 9f3cb39
Show file tree
Hide file tree
Showing 12 changed files with 1,325 additions and 6 deletions.
4 changes: 2 additions & 2 deletions include/pika_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ enum RecordType {
static const size_t kBlockSize = 64 * 1024;

/*
* Header is Type(1 byte), length (2 bytes)
* Header is Type(1 byte), length (3 bytes), time (4 bytes)
*/
static const size_t kHeaderSize = 1 + 3;
static const size_t kHeaderSize = 1 + 3 + 4;

/*
* the size of memory when we use memory mode
Expand Down
10 changes: 8 additions & 2 deletions src/pika_binlog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,10 @@ Status Binlog::EmitPhysicalRecord(RecordType t, const char *ptr, size_t n, int *

char buf[kHeaderSize];

uint64_t now = slash::NowMicros();
uint64_t now;
struct timeval tv;
gettimeofday(&tv, NULL);
now = tv.tv_sec;
buf[0] = static_cast<char>(n & 0xff);
buf[1] = static_cast<char>((n & 0xff00) >> 8);
buf[2] = static_cast<char>(n >> 16);
Expand Down Expand Up @@ -335,7 +338,10 @@ Status Binlog::AppendBlank(slash::WritableFile *file, uint64_t len) {
}

char buf[kBlockSize];
uint64_t now = slash::NowMicros();
uint64_t now;
struct timeval tv;
gettimeofday(&tv, NULL);
now = tv.tv_sec;
buf[0] = static_cast<char>(n & 0xff);
buf[1] = static_cast<char>((n & 0xff00) >> 8);
buf[2] = static_cast<char>(n >> 16);
Expand Down
2 changes: 1 addition & 1 deletion src/pika_binlog_sender_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ uint64_t PikaBinlogSenderThread::get_next(bool &is_error) {
const uint32_t a = static_cast<uint32_t>(header[0]) & 0xff;
const uint32_t b = static_cast<uint32_t>(header[1]) & 0xff;
const uint32_t c = static_cast<uint32_t>(header[2]) & 0xff;
const unsigned int type = header[3];
const unsigned int type = header[7];
const uint32_t length = a | (b << 8) | (c << 16);

if (type == kFullType) {
Expand Down
2 changes: 1 addition & 1 deletion third/slash
Submodule slash updated from 8cccf5 to b75f13
100 changes: 100 additions & 0 deletions tools/binlog_tools/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@

RPATH = /usr/local/pika20/lib/
LFLAGS = -Wl,-rpath=$(RPATH)


UNAME := $(shell if [ -f "/etc/redhat-release" ]; then echo "CentOS"; else echo "Ubuntu"; fi)

OSVERSION := $(shell cat /etc/redhat-release | cut -d "." -f 1 | awk '{print $$NF}')

ifeq ($(UNAME), Ubuntu)
SO_DIR = $(CURDIR)/lib/ubuntu
TOOLS_DIR = $(CURDIR)/tools/ubuntu
else ifeq ($(OSVERSION), 5)
SO_DIR = $(CURDIR)/lib/5.4
TOOLS_DIR = $(CURDIR)/tools/5.4
else
SO_DIR = $(CURDIR)/lib/6.2
TOOLS_DIR = $(CURDIR)/tools/6.2
endif

CXX = g++

ifeq ($(__REL), 1)
CXXFLAGS = -O2 -g -pipe -fPIC -W -Wwrite-strings -Wpointer-arith -Wreorder -Wswitch -Wsign-promo -Wredundant-decls -Wformat -Wall -Wno-unused-parameter -D_GNU_SOURCE -D__STDC_FORMAT_MACROS -std=c++11 -gdwarf-2 -Wno-redundant-decls
else
CXXFLAGS = -O0 -g -pg -pipe -fPIC -W -Wwrite-strings -Wpointer-arith -Wreorder -Wswitch -Wsign-promo -Wredundant-decls -Wformat -Wall -Wno-unused-parameter -D_GNU_SOURCE -D__STDC_FORMAT_MACROS -std=c++11 -Wno-redundant-decls
endif

OBJECT = binlog_parser binlog_sender
SRC_DIR = .
THIRD_PATH = ../../third

INCLUDE_PATH = -I../../include/ \
-I../../src/ \
-I$(THIRD_PATH)/glog/src/ \
-I$(THIRD_PATH)/slash/output/include/ \
-I$(THIRD_PATH)/pink/output/include/

LIB_PATH = -L./ \
-L$(THIRD_PATH)/slash/output/lib/ \
-L$(THIRD_PATH)/pink/output/lib/ \
-L$(THIRD_PATH)/glog/.libs/


LIBS = -lpthread \
-lglog \
-lslash \
-lpink

GLOG = $(THIRD_PATH)/glog/.libs/libglog.so.0
PINK = $(THIRD_PATH)/pink/output/lib/libpink.a
SLASH = $(THIRD_PATH)/slash/output/lib/libslash.a

.PHONY: all clean


BASE_SRCS := $(wildcard $(SRC_DIR)/*.cc)
BASE_SRCS += $(wildcard $(SRC_DIR)/*.c)
BASE_SRCS += $(wildcard $(SRC_DIR)/*.cpp)

MAIN_SRCS = $(patsubst %,./%.cc,$(OBJECT))
FUNC_SRCS = $(filter-out $(MAIN_SRCS),$(BASE_SRCS))
FUNC_OBJS = $(patsubst %.cc,%.o,$(FUNC_SRCS))
MAIN_OBJS = $(patsubst %.cc,%.o,$(MAIN_SRCS))


all: $(OBJECT)
@echo "UNAME : $(UNAME)"
@echo "SO_DIR : $(SO_DIR)"
@echo "TOOLS_DIR: $(TOOLS_DIR)"
@echo "Success, go, go, go..."


$(OBJECT): $(GLOG) $(PINK) $(SLASH) $(FUNC_OBJS) $(MAIN_OBJS)
$(CXX) $(CXXFLAGS) -o $@ $@.o $(FUNC_OBJS) $(INCLUDE_PATH) $(LIB_PATH) $(LFLAGS) $(LIBS)


$(SLASH):
make -C $(THIRD_PATH)/slash/

$(PINK):
make -C $(THIRD_PATH)/pink/

$(FUNC_OBJS): %.o : %.cc
$(CXX) $(CXXFLAGS) -c $< -o $@ $(INCLUDE_PATH)
$(MAIN_OBJS): %.o : %.cc
$(CXX) $(CXXFLAGS) -c $< -o $@ $(INCLUDE_PATH)
$(OBJS): %.o : %.cc
$(CXX) $(CXXFLAGS) -c $< -o $@ $(INCLUDE_PATH)

$(TOBJS): %.o : %.cc
$(CXX) $(CXXFLAGS) -c $< -o $@ $(INCLUDE_PATH)

$(GLOG):
cd $(THIRD_PATH)/glog; if [ ! -f ./Makefile ]; then ./configure; fi; make; echo '*' > $(CURDIR)/third/glog/.gitignore; cp $(CURDIR)/third/glog/.libs/libglog.so.0 $(SO_DIR);

clean:
rm -rf $(SRC_DIR)/*.o
rm -rf $(OBJECT)

158 changes: 158 additions & 0 deletions tools/binlog_tools/binlog.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
#include "binlog.h"

#include <iostream>
#include <string>
#include <stdint.h>
#include <signal.h>
#include <unistd.h>

#include <glog/logging.h>

#include "slash_mutex.h"


/*
* Binlog
*/
Binlog::Binlog(const std::string& binlog_path, const int file_size) :
consumer_num_(0),
item_num_(0),
version_(NULL),
queue_(NULL),
versionfile_(NULL),
pro_num_(0),
pool_(NULL),
exit_all_consume_(false),
binlog_path_(binlog_path),
file_size_(file_size) {

// To intergrate with old version, we don't set mmap file size to 100M;
//slash::SetMmapBoundSize(file_size);
//slash::kMmapBoundSize = 1024 * 1024 * 100;

Status s;

slash::CreateDir(binlog_path_);

filename = binlog_path_ + kBinlogPrefix;
const std::string manifest = binlog_path_ + kManifest;
std::string profile;

if (!slash::FileExists(manifest)) {
DLOG(INFO) << "Binlog: Manifest file not exist";

profile = NewFileName(filename, pro_num_);
s = slash::NewWritableFile(profile, &queue_);
if (!s.ok()) {
LOG(WARNING) << "Binlog: new " << filename << " " << s.ToString();
}

s = slash::NewRWFile(manifest, &versionfile_);
if (!s.ok()) {
LOG(WARNING) << "Binlog: new versionfile error " << s.ToString();
}

version_ = new Version(versionfile_);
version_->StableSave();
} else {

s = slash::NewRWFile(manifest, &versionfile_);
if (s.ok()) {
version_ = new Version(versionfile_);
version_->Init();
pro_num_ = version_->pro_num_;

// Debug
//version_->debug();
} else {
LOG(WARNING) << "Binlog: open versionfile error";
}

profile = NewFileName(filename, pro_num_);
slash::AppendWritableFile(profile, &queue_, version_->pro_offset_);
}

InitLogFile();
}

Binlog::~Binlog() {
delete version_;
delete versionfile_;

delete queue_;
}

void Binlog::InitLogFile() {
assert(queue_ != NULL);

uint64_t filesize = queue_->Filesize();
block_offset_ = filesize % kBlockSize;
}

Status Binlog::GetProducerStatus(uint32_t* filenum, uint64_t* pro_offset) {
slash::RWLock(&(version_->rwlock_), false);

*filenum = version_->pro_num_;
*pro_offset = version_->pro_offset_;
//*filenum = version_->pro_num();
//*pro_offset = version_->pro_offset();

return Status::OK();
}


std::string NewFileName(const std::string name, const uint32_t current) {
char buf[256];
snprintf(buf, sizeof(buf), "%s%u", name.c_str(), current);
return std::string(buf);
}

/*
* Version
*/
Version::Version(slash::RWFile *save)
: pro_offset_(0),
pro_num_(0),
save_(save) {
assert(save_ != NULL);

pthread_rwlock_init(&rwlock_, NULL);
}

Version::~Version() {
StableSave();
pthread_rwlock_destroy(&rwlock_);
}

Status Version::StableSave() {
char *p = save_->GetData();
memcpy(p, &pro_offset_, sizeof(uint64_t));
p += 20;
//memcpy(p, &con_offset_, sizeof(uint64_t));
//p += 8;
//memcpy(p, &item_num_, sizeof(uint32_t));
//p += 4;
memcpy(p, &pro_num_, sizeof(uint32_t));
//p += 4;
//memcpy(p, &con_num_, sizeof(uint32_t));
//p += 4;
return Status::OK();
}

Status Version::Init() {
Status s;
if (save_->GetData() != NULL) {
memcpy((char*)(&pro_offset_), save_->GetData(), sizeof(uint64_t));
//memcpy((char*)(&con_offset_), save_->GetData() + 8, sizeof(uint64_t));
memcpy((char*)(&item_num_), save_->GetData() + 16, sizeof(uint32_t));
memcpy((char*)(&pro_num_), save_->GetData() + 20, sizeof(uint32_t));
//memcpy((char*)(&con_num_), save_->GetData() + 24, sizeof(uint32_t));
// DLOG(INFO) << "Version Init pro_offset "<< pro_offset_ << " itemnum " << item_num << " pro_num " << pro_num_ << " con_num " << con_num_;
return Status::OK();
} else {
return Status::Corruption("version init error");
}
}



Loading

0 comments on commit 9f3cb39

Please sign in to comment.