Skip to content

Commit

Permalink
fix gtid check
Browse files Browse the repository at this point in the history
  • Loading branch information
cjcchen committed Jan 5, 2017
1 parent aa11a00 commit 180aef1
Showing 9 changed files with 102 additions and 89 deletions.
51 changes: 33 additions & 18 deletions phxbinlogsvr/core/handler/event_monitor.cpp
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
*/


#include "event_monitor.h"
#include "master_manager.h"
#include "master_monitor.h"
@@ -32,7 +33,7 @@ namespace phxbinlog {
EventMonitor::EventMonitor(const Option *option) {
option_ = option;
storage_manager_ = StorageManager::GetGlobalStorageManager(option);
master_manager_ = MasterManager::GetGlobalMasterManager(option);
master_manager_ = MasterManager::GetGlobalMasterManager(option);
stop_ = false;

Run();
@@ -47,35 +48,49 @@ int EventMonitor::CheckRunningStatus() {
int ret = OK;
vector < string > gtid_list;
if (option_->GetBinLogSvrConfig()->IsForceMakingCheckPoint()) {
ret = MasterMonitor::GetMySQLMaxGTIDList(option_, &gtid_list,master_manager_->IsMaster());
ret = MasterMonitor::GetMySQLMaxGTIDList(option_, &gtid_list);
} else {
vector < string > member_iplist;
master_manager_->GetMemberIPList(&member_iplist);

ret = MasterMonitor::GetGlobalMySQLMaxGTIDList(option_, member_iplist, &gtid_list);
}


LogVerbose("%s get gtid list ret %d", __func__, ret);
if (ret == OK) {
ret = storage_manager_->MakeCheckPoint(gtid_list);

uint64_t max_instanceid = 0;
EventStorage *event_storage = storage_manager_->GetEventStorage();
for (const auto &gtid : gtid_list) {
EventDataInfo data_info;
int ret = event_storage->GetGTIDInfo(gtid, &data_info, true);
if (ret) {
continue;

//use last gtid set instead of current gtid set if it is master
//some gtid in current set may not be storaged in level db and cost much while searching
bool is_master = master_manager_->IsMaster();
if(is_master) {
if(last_check_gtid_.empty()) {
last_check_gtid_ = gtid_list;
return OK;
}
ret = storage_manager_->MakeCheckPoint(last_check_gtid_);
last_check_gtid_=gtid_list;
}
else {
ret = storage_manager_->MakeCheckPoint(gtid_list);
uint64_t max_instanceid = 0;
EventStorage *event_storage = storage_manager_->GetEventStorage();
for (const auto &gtid : gtid_list) {
EventDataInfo data_info;
int ret = event_storage->GetGTIDInfo(gtid, &data_info, true);
if (ret) {
continue;
}
max_instanceid = std::max(max_instanceid, (uint64_t) data_info.instance_id());
}
max_instanceid = std::max(max_instanceid, (uint64_t) data_info.instance_id());
}

uint64_t storage_instanceid = event_storage->GetLastInstanceID();
ColorLogInfo("%s current mysql instanceid %lu, binlog svr instanceid %lu", __func__, max_instanceid,
storage_instanceid);
uint64_t storage_instanceid = event_storage->GetLastInstanceID();
ColorLogInfo("%s current mysql instanceid %lu, binlog svr instanceid %lu", __func__, max_instanceid,
storage_instanceid);

if (max_instanceid && max_instanceid <= storage_instanceid) {
if (max_instanceid && max_instanceid <= storage_instanceid) {
STATISTICS(MySqlGtidNumDiff(storage_instanceid - max_instanceid));
}
}
}

@@ -91,4 +106,4 @@ int EventMonitor::Process() {
return OK;
}

}
}
3 changes: 3 additions & 0 deletions phxbinlogsvr/core/handler/event_monitor.h
Original file line number Diff line number Diff line change
@@ -11,6 +11,8 @@
#pragma once

#include "phxcomm/thread_base.h"
#include <vector>
#include <string>

namespace phxbinlog {

@@ -26,6 +28,7 @@ class EventMonitor : public phxsql::ThreadBase {
int CheckRunningStatus();

private:
std::vector<std::string> last_check_gtid_;
StorageManager *storage_manager_;
MasterManager *master_manager_;
const Option *option_;
51 changes: 26 additions & 25 deletions phxbinlogsvr/core/monitor/agent_monitor.cpp
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
*/


#include "agent_monitor.h"

#include "event_manager.h"
@@ -263,7 +264,7 @@ int AgentMonitor::CheckMasterInit() {
if (is_master) {
// get the current gtid in master binlog
vector < string > gtid_list;
int ret = MasterMonitor::GetMySQLMaxGTIDList(option_, &gtid_list, false);
int ret = MasterMonitor::GetMySQLMaxGTIDList(option_, &gtid_list);
if (ret)
return false;

@@ -306,34 +307,34 @@ int AgentMonitor::CheckMasterInit() {
}

int AgentMonitor::IsGTIDCompleted(const Option *option, EventManager *event_manager) {
vector < string > gtid_list;
int ret = MasterMonitor::GetMySQLMaxGTIDList(option, &gtid_list, false);
if (ret) {
ColorLogWarning("%s get sql info fail, skip", __func__);
return SVR_FAIL;
}

string last_gtid = event_manager->GetNewestGTID();
if (MasterMonitor::IsGTIDCompleted(gtid_list, last_gtid)) {
} else {
return DATA_EMPTY;
}

return OK;
vector < string > gtid_list;
int ret = MasterMonitor::GetMySQLMaxGTIDList(option, &gtid_list);
if (ret) {
ColorLogWarning("%s get sql info fail, skip", __func__);
return SVR_FAIL;
}

string last_gtid = event_manager->GetNewestGTID();
if (MasterMonitor::IsGTIDCompleted(gtid_list, last_gtid)) {
} else {
return DATA_EMPTY;
}

return OK;
}

int AgentMonitor::CheckMasterTimeOut() {
bool is_master = master_manager_->CheckMasterBySvrID(option_->GetBinLogSvrConfig()->GetEngineSvrID());
bool is_master = master_manager_->CheckMasterBySvrID(option_->GetBinLogSvrConfig()->GetEngineSvrID());
int ret = OK;
if(!is_master) {
ret=IsGTIDCompleted(option_, event_manager_);
if (ret < 0) {
return ret;
}
}
if(!is_master) {
ret=IsGTIDCompleted(option_, event_manager_);
if (ret < 0) {
return ret;
}
}

if (!AgentExternalMonitor::IsHealthy()) {
ColorLogWarning("%s external monitor not healthy", __func__);
if (!AgentExternalMonitor::IsHealthy()) {
ColorLogWarning("%s external monitor not healthy", __func__);
return SVR_FAIL;
}

@@ -362,7 +363,7 @@ int AgentMonitor::CheckSlaveRunningStatus() {
bool AgentMonitor::IsSlaveReady() {

vector < string > gtid_list;
int ret = MasterMonitor::GetMySQLMaxGTIDList(option_, &gtid_list, false);
int ret = MasterMonitor::GetMySQLMaxGTIDList(option_, &gtid_list);
if (ret) {
ColorLogError("%s get sql info fail", __func__);
return false;
13 changes: 3 additions & 10 deletions phxbinlogsvr/core/monitor/base/master_monitor.cpp
Original file line number Diff line number Diff line change
@@ -53,15 +53,8 @@ int MasterMonitor::MasterStart(const Option *option) {
return OK;
}

int MasterMonitor::GetMySQLMaxGTIDList(const Option *option, vector<string> *gtid_list, bool is_master) {
int ret = MonitorComm::GetMySQLMaxGTIDList(option, "gtid_executed", gtid_list);
if(ret==0&&is_master) {
for (auto &gtid : *gtid_list) {
gtid = MySqlManager::ReduceGtidByOne(gtid);
LogVerbose("%s master get new gtid %s",__func__, gtid.c_str());
}
}
return ret;
int MasterMonitor::GetMySQLMaxGTIDList(const Option *option, vector<string> *gtid_list) {
return MonitorComm::GetMySQLMaxGTIDList(option, "gtid_executed", gtid_list);
}

int MasterMonitor::GetGlobalMySQLMaxGTIDList(const Option *option, const vector<string> &iplist,
@@ -181,4 +174,4 @@ int MasterMonitor::CheckMasterPermisstion(const Option *option, const std::vecto
return MySqlManager::GetGlobalMySqlManager(option)->CheckAdminPermission(member_list);
}

}
}
3 changes: 2 additions & 1 deletion phxbinlogsvr/core/monitor/base/master_monitor.h
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
*/

#pragma once

#include <string>
@@ -20,7 +21,7 @@ class MasterMonitor {
public:
static int GetGlobalMySQLMaxGTIDList(const Option *option, const std::vector<std::string> &iplist,
std::vector<std::string> *gtid_list);
static int GetMySQLMaxGTIDList(const Option *option, std::vector<std::string> *gtid_list, bool is_master);
static int GetMySQLMaxGTIDList(const Option *option, std::vector<std::string> *gtid_list);

static int MasterStart(const Option *option);

9 changes: 5 additions & 4 deletions phxbinlogsvr/core/storage/event_index.cpp
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
*/


#include "event_index.h"

#include "phxcomm/phx_log.h"
@@ -76,15 +77,15 @@ event_index_status EventIndex::GetGTID(const string &gtid, ::google::protobuf::M
}

event_index_status EventIndex::GetLowerBoundGTID(const string &gtid,
::google::protobuf::Message *data_info) {
::google::protobuf::Message *data_info) {
return GetGTIDIndex(gtid, data_info, true);
}

event_index_status EventIndex::GetGTIDIndex(const string &gtid, ::google::protobuf::Message *data_info,
bool lower_bound) {
string buffer;

leveldb::ReadOptions rop = leveldb::ReadOptions();
leveldb::ReadOptions rop = leveldb::ReadOptions();
leveldb::Iterator* it = level_db_->NewIterator(rop);
if (it == NULL) {
return event_index_status::DB_ERROR;
@@ -93,8 +94,8 @@ event_index_status EventIndex::GetGTIDIndex(const string &gtid, ::google::protob
event_index_status ret = event_index_status::DATA_NOT_FOUND;
it->Seek(gtid);
if (it->Valid()) {
LogVerbose("%s find key %s, want key %s",
__func__, it->key().ToString().c_str(), gtid.c_str());
// LogVerbose("%s find key %s, want key %s",
// __func__, it->key().ToString().c_str(), gtid.c_str());

if (it->key().ToString() == gtid
|| (lower_bound && GtidHandler::GetUUIDByGTID(it->key().ToString()) == GtidHandler::GetUUIDByGTID(gtid))) {
Loading

0 comments on commit 180aef1

Please sign in to comment.