Skip to content

Commit

Permalink
fix gtid check
Browse files Browse the repository at this point in the history
  • Loading branch information
wodesuck committed Jan 3, 2017
1 parent b70e29c commit da97840
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 37 deletions.
5 changes: 2 additions & 3 deletions phxbinlogsvr/core/handler/event_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ EventManager *EventManager::GetGlobalEventManager(const Option *option) {

EventManager::EventManager(const Option *option) {
StorageManager *storage_manager = StorageManager::GetGlobalStorageManager(option);

event_storage_ = storage_manager->GetEventStorage();
event_monitor_ = new EventMonitor(option, storage_manager);
event_monitor_ = new EventMonitor(option);
option_ = option;
}

Expand Down Expand Up @@ -129,7 +128,7 @@ int EventManager::GetEvents(string *data, uint32_t want_num) {
}

void EventManager::Notify() {
event_storage_->Notify();
event_storage_->Notify();
}

}
17 changes: 13 additions & 4 deletions phxbinlogsvr/core/handler/event_monitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "master_monitor.h"
#include "event_storage.h"
#include "storage_manager.h"
#include "mysql_manager.h"

#include "phxcomm/phx_log.h"
#include "phxbinlogsvr/statistics/phxbinlog_stat.h"
Expand All @@ -28,9 +29,10 @@ using phxsql::ColorLogInfo;

namespace phxbinlog {

EventMonitor::EventMonitor(const Option *option, StorageManager *storage_manager) {
storage_manager_ = storage_manager;
EventMonitor::EventMonitor(const Option *option) {
option_ = option;
storage_manager_ = StorageManager::GetGlobalStorageManager(option);
master_manager_ = MasterManager::GetGlobalMasterManager(option);
stop_ = false;

Run();
Expand All @@ -47,12 +49,19 @@ int EventMonitor::CheckRunningStatus() {
if (option_->GetBinLogSvrConfig()->IsForceMakingCheckPoint()) {
ret = MasterMonitor::GetMySQLMaxGTIDList(option_, &gtid_list);
} else {
MasterManager *mastermanager = MasterManager::GetGlobalMasterManager(option_);
vector < string > member_iplist;
mastermanager->GetMemberIPList(&member_iplist);
master_manager_->GetMemberIPList(&member_iplist);

ret = MasterMonitor::GetGlobalMySQLMaxGTIDList(option_, member_iplist, &gtid_list);
}

if(master_manager_->IsMaster()) {
for (auto &gtid : gtid_list) {
gtid = MySqlManager::ReduceGtidByOne(gtid);
LogVerbose("%s master get new gtid %s",__func__, gtid.c_str());
}
}

LogVerbose("%s get gtid list ret %d", __func__, ret);
if (ret == OK) {
ret = storage_manager_->MakeCheckPoint(gtid_list);
Expand Down
3 changes: 2 additions & 1 deletion phxbinlogsvr/core/handler/event_monitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ class StorageManager;
class MasterManager;
class EventMonitor : public phxsql::ThreadBase {
public:
EventMonitor(const Option *option, StorageManager *event_storage);
EventMonitor(const Option *option);
virtual ~EventMonitor();
virtual int Process();
private:
int CheckRunningStatus();

private:
StorageManager *storage_manager_;
MasterManager *master_manager_;
const Option *option_;
bool stop_;
};
Expand Down
18 changes: 13 additions & 5 deletions phxbinlogsvr/core/mysql/mysql_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
#include "phxcomm/phx_log.h"
#include "phxbinlogsvr/config/phxbinlog_config.h"
#include "phxbinlogsvr/define/errordef.h"
#include "phxbinlogsvr/core/gtid_handler.h"

#include <string.h>
#include <assert.h>

using std::string;
using std::vector;
using std::pair;
using phxsql::ColorLogWarning;
using phxsql::ColorLogError;
using phxsql::LogVerbose;
Expand Down Expand Up @@ -97,7 +99,7 @@ int MySqlManager::Query(const string& query_string, const string &ip) {
mysql_option.pwd = GetAdminPwd();
if (mysql_option.username == "") {
mysql_option.username = "root";
mysql_option.pwd = "";
mysql_option.pwd = "";
}
LogVerbose("%s username %s pwd %s query %s", __func__, mysql_option.username.c_str(), mysql_option.pwd.c_str(),
query_string.c_str());
Expand All @@ -114,7 +116,7 @@ int MySqlManager::Query(const string &query_string, vector<vector<string> > *res
mysql_option.pwd = GetAdminPwd();
if (mysql_option.username == "") {
mysql_option.username = "root";
mysql_option.pwd = "";
mysql_option.pwd = "";
}
LogVerbose("%s username %s pwd %s query %s", __func__, mysql_option.username.c_str(), mysql_option.pwd.c_str(),
query_string.c_str());
Expand All @@ -131,7 +133,7 @@ int MySqlManager::GetValue(const std::string &value_type, const std::string &val
mysql_option.pwd = GetAdminPwd();
if (mysql_option.username == "") {
mysql_option.username = "root";
mysql_option.pwd = "";
mysql_option.pwd = "";
}
LogVerbose("%s username %s pwd %s", __func__, mysql_option.username.c_str(), mysql_option.pwd.c_str());

Expand Down Expand Up @@ -200,7 +202,7 @@ bool MySqlManager::CheckAdminAccount(const string &admin_username, const string
}

int MySqlManager::ChangePwd(const string &username, const string &pwd) {
return Query(MySqlStringHelper::GetChangePwdStr(username,pwd));
return Query(MySqlStringHelper::GetChangePwdStr(username,pwd));
}

int MySqlManager::CreateUser(const string &username,const string &pwd) {
Expand Down Expand Up @@ -331,7 +333,7 @@ int MySqlManager::CreateReplica(const string &admin_username, const string &repl
}
LogVerbose("%s grant %s user %s done", __func__, grant_string.c_str(), replica_username.c_str());
}
return OK;
return OK;
}

int MySqlManager::CreateMySqlAdminInfo(const string &now_admin_username, const string &now_admin_pwd,
Expand Down Expand Up @@ -411,4 +413,10 @@ int MySqlManager::RemoveMemberAdminPermission(const std::string &member_ip) {
return RemoveMemberAdminPermission(GetAdminUserName(), GetAdminPwd(), member_ip);
}

string MySqlManager::ReduceGtidByOne(const string &gtid) {
pair<string, size_t> gtid_item = GtidHandler::ParseGTID(gtid);
gtid_item.second--;
return GtidHandler::GenGTID(gtid_item.first, gtid_item.second);
}

}
3 changes: 2 additions & 1 deletion phxbinlogsvr/core/mysql/mysql_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class MySqlManager {
int AddMemberAdminPermission(const std::string &member_ip);
int RemoveMemberAdminPermission(const std::string &member_ip);
int CheckAdminPermission(const std::vector<std::string> &member_list);
static std::string ReduceGtidByOne(const std::string &gtid);
protected:
bool CheckAdminAccount(const std::string &admin_username, const std::string &admin_pwd);

Expand All @@ -67,7 +68,7 @@ class MySqlManager {
int CheckReplicaUser(const std::string &username, const std::string &pwd);

int CreateUser(const std::string &username, const std::string &pwd);
int ChangePwd(const std::string &usename, const std::string &pwd);
int ChangePwd(const std::string &usename, const std::string &pwd);
int CreateAdmin(const std::string &admin_username, const std::string &admin_pwd,
const std::vector<std::string> &ip_list);
int CreateReplica(const std::string &admin_username, const std::string &replica_username,
Expand Down
6 changes: 3 additions & 3 deletions phxbinlogsvr/core/repl/replication_transfer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ int ReplicationTransfer::Process() {
ctx_->SetSeq(seq);
if (ret) {
ColorLogError("%s send buffer fail len %zu ret %d seq %d", "slave", recv_buff[i].size(), ret, seq);
STATISTICS(ReplSendDataFail());
STATISTICS(ReplSendDataFail());
break;
}
}
Expand Down Expand Up @@ -113,7 +113,7 @@ int ReplicationTransfer::GetEvents(vector<string> *event_list) {
if (ret) {
return ret;
}
STATISTICS(GtidEventTransferNum(event_list->size()));
STATISTICS(GtidEventTransferNum(event_list->size()));

std::string max_gtid;
ret = GtidHandler::ParseEventList(event_data, event_list, true, &max_gtid);
Expand Down Expand Up @@ -158,7 +158,7 @@ void ReplicationTransfer::CountCheckSum(const vector<string> &event_list, const
for (size_t i = 0; i < checksum_for_log.size(); ++i)
ColorLogError("%s get check sum %llu", __func__, checksum_for_log[i]);
checksum_for_log.push_back(checksum_);
STATISTICS(ReplCheckSumFail());
STATISTICS(ReplCheckSumFail());
assert(checksum_ == info.checksum());
}
}
Expand Down
7 changes: 5 additions & 2 deletions phxbinlogsvr/core/storage/event_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,17 @@ event_index_status EventIndex::GetGTIDIndex(const string &gtid, ::google::protob
bool lower_bound) {
string buffer;

leveldb::Iterator* it = level_db_->NewIterator(leveldb::ReadOptions());
leveldb::ReadOptions rop = leveldb::ReadOptions();
leveldb::Iterator* it = level_db_->NewIterator(rop);
if (it == NULL) {
return event_index_status::DB_ERROR;
}

event_index_status ret = event_index_status::DATA_NOT_FOUND;
it->Seek(gtid);
if (it->Valid()) {
LogVerbose("%s find key %s, want key %s", __func__, it->key().ToString().c_str(), gtid.c_str());
LogVerbose("%s find key %s, want key %s",
__func__, it->key().ToString().c_str(), gtid.c_str());

if (it->key().ToString() == gtid
|| (lower_bound && GtidHandler::GetUUIDByGTID(it->key().ToString()) == GtidHandler::GetUUIDByGTID(gtid))) {
Expand Down Expand Up @@ -128,6 +130,7 @@ event_index_status EventIndex::IsExist(const string &gtid) {
}

event_index_status EventIndex::DeleteGTIDIndex(const string &gtid) {
LogVerbose("%s delete gtid %s",__func__, gtid.c_str());
leveldb::Status status = level_db_->Delete(leveldb::WriteOptions(), gtid);

if (status.ok() || status.IsNotFound())
Expand Down
3 changes: 2 additions & 1 deletion phxbinlogsvr/core/storage/event_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ class EventIndex {
void CloseDB();
void OpenDB(const std::string &event_path);

event_index_status GetGTIDIndex(const std::string &gtid, ::google::protobuf::Message *data_info, bool lower_bound);
event_index_status GetGTIDIndex(const std::string &gtid, ::google::protobuf::Message *data_info,
bool lower_bound);

private:
leveldb::DB *level_db_;
Expand Down
32 changes: 15 additions & 17 deletions phxbinlogsvr/core/storage/event_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,10 @@ uint64_t EventStorage::GetCurrentInstanceInterval() {
LockManager lock(&mutex_);
uint64_t last_instanceid = uuid_handler_->GetLastInstanceID();
uint64_t oldest_instanceid = event_file_manager_->GetOldestInstanceIDofFile();
ColorLogInfo("%s %p get newest instance id %llu oldest instanceid %llu",
__func__, this, last_instanceid, oldest_instanceid);
if(last_instanceid < oldest_instanceid) return 0;
return last_instanceid - oldest_instanceid;
ColorLogInfo("%s %p get newest instance id %llu oldest instanceid %llu",
__func__, this, last_instanceid, oldest_instanceid);
if(last_instanceid < oldest_instanceid) return 0;
return last_instanceid - oldest_instanceid;
}

int EventStorage::AddEvent(const string &gtid, const EventData &event_data, bool switch_file) {
Expand All @@ -185,7 +185,7 @@ int EventStorage::AddEvent(const string &gtid, const EventData &event_data, bool
LogVerbose("%s add event gtid %s instance id %lu, laststanceid %lu", __func__, gtid.c_str(),
event_data.instance_id(), last_instanceid);
if (event_data.instance_id() <= last_instanceid) {
STATISTICS(GtidEventExit());
STATISTICS(GtidEventExit());
ColorLogWarning("%s gtid %s exist, last instance id %llu, current instance id %llu", __func__,
gtid.c_str(), last_instanceid, event_data.instance_id());
return EVENT_EXIST;
Expand Down Expand Up @@ -219,11 +219,11 @@ int EventStorage::AddEvent(const string &gtid, const EventData &event_data, bool
CheckAndSwitchFile();
}
} else {
STATISTICS(GtidEventAddEventFail());
}
STATISTICS(GtidEventAddEventFail());
}
ColorLogInfo("%s write data gtid %s instance id %llu checksum %llu ret %d file %s run %u ms",
__func__, event_data.gtid().c_str(), event_data.instance_id(), event_data.checksum(), ret,
data_info.file_name().c_str(), timer.GetTime()/1000 );
__func__, event_data.gtid().c_str(), event_data.instance_id(), event_data.checksum(), ret,
data_info.file_name().c_str(), timer.GetTime()/1000 );
return ret;
}

Expand All @@ -236,7 +236,7 @@ int EventStorage::ReSet(const string &gtid) {
EventDataInfo data_info;
int ret = RealGetLowerBoundGTIDInfo(gtid, &data_info);
if (ret == DATA_EMPTY) {
STATISTICS(GtidEventResetGtidFilePosFail());
STATISTICS(GtidEventResetGtidFilePosFail());
ColorLogWarning("%s gtid not found", __func__, gtid.c_str());
return GTID_FAIL;
} else if (ret == OK) {
Expand All @@ -260,7 +260,7 @@ int EventStorage::GetEvent(EventData *data, bool wait) {
Wait();
}
if (ret != OK && ret != DATA_EMPTY)
STATISTICS(GtidEventGetEventFail());
STATISTICS(GtidEventGetEventFail());
if (ret == OK && data->gtid().empty()) {
ColorLogInfo("%s get gtid header skip, ret %d", __func__, ret);
continue;
Expand All @@ -284,13 +284,11 @@ int EventStorage::GetGTIDInfo(const string &gtid, EventDataInfo *data_info, bool
}

string EventStorage::GetLastGTIDByUUID(const string &uuid) {
LogVerbose("%s get uuid %s",__func__, uuid.c_str());
LockManager lock(&mutex_);
return uuid_handler_->GetLastestGTIDByUUID(uuid);
}

string EventStorage::GetLastGTIDByGTID(const string &gtid) {
LogVerbose("%s get gtid %s",__func__, gtid.c_str());
LockManager lock(&mutex_);
return uuid_handler_->GetLastestGTIDByGTID(gtid);
}
Expand Down Expand Up @@ -345,7 +343,7 @@ int EventStorage::DelCheckPointFile(const string &maxfilename, const uint32_t &m
}
}
}
RemoveFile(*file_name);
RemoveFile(*file_name);
}
}
return OK;
Expand Down Expand Up @@ -552,7 +550,7 @@ int EventStorage::CheckAndSwitchFile(bool force) {
EventDataInfo data_info;
event_file_manager_->GetWriteFileInfo(&data_info);
if (!force || data_info.offset() > option_->GetBinLogSvrConfig()->GetMaxEventFileSize()) {
STATISTICS(GtidEventSwitchDataFile());
STATISTICS(GtidEventSwitchDataFile());
ColorLogInfo("%s file %s size %zu has full, max size %zu, switch", __func__,
data_info.file_name().c_str(), data_info.offset(),
option_->GetBinLogSvrConfig()->GetMaxEventFileSize());
Expand All @@ -573,8 +571,8 @@ void EventStorage::Notify() {
}

void EventStorage::Wait() {
pthread_cond_wait(&cond_, &mutex_);
return;
pthread_cond_wait(&cond_, &mutex_);
return;
}

uint64_t EventStorage::GetLastestCheckPointInstanceID() {
Expand Down

0 comments on commit da97840

Please sign in to comment.