Skip to content

Commit

Permalink
slave itself control whether full sync is required (OpenAtomFoundatio…
Browse files Browse the repository at this point in the history
  • Loading branch information
Axlgrep committed May 15, 2019
1 parent 6c11f05 commit 7f6601f
Show file tree
Hide file tree
Showing 15 changed files with 244 additions and 80 deletions.
10 changes: 6 additions & 4 deletions include/pika_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,21 @@ struct BinlogOffset {
enum ReplState {
kNoConnect = 0,
kTryConnect = 1,
kWaitReply = 2,
kTryDBSync = 2,
kWaitDBSync = 3,
kConnected = 4,
kError = 5
kWaitReply = 4,
kConnected = 5,
kError = 6
};

// debug only
const std::string ReplStateMsg[] = {
"kNoConnect",
"kTryConnect",
"kTryDBSync",
"kWaitDBSync",
"kWaitReply",
"kConnected",
"kWaitDBSync",
"kError"
};

Expand Down
1 change: 1 addition & 0 deletions include/pika_repl_bgworker.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class PikaReplBgWorker {

static void HandleMetaSyncRequest(void* arg);
static void HandleBinlogSyncRequest(void* arg);
static void HandleDBSyncRequest(void* arg);
static void HandleTrySyncRequest(void* arg);
static void HandleWriteDb(void* arg);

Expand Down
6 changes: 4 additions & 2 deletions include/pika_repl_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,12 @@ class PikaReplClient {
Status GetBinlogSyncCtlStatus(const RmNode& slave, BinlogOffset* const sent_boffset, BinlogOffset* const acked_boffset);

Status SendMetaSync();
Status SendPartitionDBSync(const std::string& table_name,
uint32_t partition_id,
const BinlogOffset& boffset);
Status SendPartitionTrySync(const std::string& table_name,
uint32_t partition_id,
const BinlogOffset& boffset,
bool force);
const BinlogOffset& boffset);
Status SendBinlogSync(const RmNode& slave);

Status TriggerSendBinlogSync();
Expand Down
1 change: 1 addition & 0 deletions include/pika_repl_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class PikaReplClientConn: public pink::PbConn {
virtual ~PikaReplClientConn() = default;
static void HandleBinlogSyncResponse(void* arg);
static void HandleMetaSyncResponse(void* arg);
static void HandleDBSyncResponse(void* arg);
static void HandleTrySyncResponse(void* arg);
static bool IsTableStructConsistent(const std::vector<TableStruct>& current_tables,
const std::vector<TableStruct>& expect_tables);
Expand Down
4 changes: 4 additions & 0 deletions include/pika_repl_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ class PikaReplServer {
std::shared_ptr<pink::PbConn> conn,
void* req_private_data);

void ScheduleDBSyncTask(const std::shared_ptr<InnerMessage::InnerRequest> req,
std::shared_ptr<pink::PbConn> conn,
void* req_private_data);

void ScheduleTrySyncTask(const std::shared_ptr<InnerMessage::InnerRequest> req,
std::shared_ptr<pink::PbConn> conn,
void* req_private_data);
Expand Down
4 changes: 4 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ class PikaServer {
void SignalAuxiliary();
Status TriggerSendBinlogSync();
Status SendMetaSyncRequest();
Status SendPartitionDBSyncRequest(std::shared_ptr<Partition> partition);
Status SendPartitionTrySyncRequest(std::shared_ptr<Partition> partition);
Status SendBinlogSyncRequest(const std::string& table, uint32_t partition,
const std::string& ip, int port);
Expand All @@ -291,6 +292,9 @@ class PikaServer {
void ScheduleReplDbTask(const std::string &key,
PikaCmdArgsType* argv, BinlogItem* binlog_item,
const std::string& table_name, uint32_t partition_id);
void ScheduleReplDBSyncTask(const std::shared_ptr<InnerMessage::InnerRequest> req,
std::shared_ptr<pink::PbConn> conn,
void* req_private_data);
void ScheduleReplTrySyncTask(const std::shared_ptr<InnerMessage::InnerRequest> req,
std::shared_ptr<pink::PbConn> conn,
void* req_private_data);
Expand Down
10 changes: 9 additions & 1 deletion src/pika_auxiliary_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,15 @@ void PikaAuxiliaryThread::RunEveryPartitionStateMachine() {
continue;
}
if (partition->State() == ReplState::kTryConnect) {
g_pika_server->SendPartitionTrySyncRequest(partition);
// If partition need to FullSync, we Send DBSync Request
// directly, instead of TrySync first
if (partition->FullSync()) {
g_pika_server->SendPartitionDBSyncRequest(partition);
} else {
g_pika_server->SendPartitionTrySyncRequest(partition);
}
} else if (partition->State() == ReplState::kTryDBSync) {
g_pika_server->SendPartitionDBSyncRequest(partition);
} else if (partition->State() == ReplState::kWaitReply) {
continue;
} else if (partition->State() == ReplState::kWaitDBSync) {
Expand Down
54 changes: 31 additions & 23 deletions src/pika_inner_message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package InnerMessage;
enum Type {
kMetaSync = 1;
kTrySync = 2;
kBinlogSync = 3;
kHeatBeat = 4;
kDBSync = 3;
kBinlogSync = 4;
kHeatBeat = 5;
}

enum StatusCode {
Expand Down Expand Up @@ -40,7 +41,13 @@ message InnerRequest {
required Node node = 1;
required Partition partition = 2;
required BinlogOffset binlog_offset = 3;
required bool force = 4;
}

// slave to master
message DBSync {
required Node node = 1;
required Partition partition = 2;
required BinlogOffset binlog_offset = 3;
}

// master to slave
Expand All @@ -59,17 +66,12 @@ message InnerRequest {
optional int64 sid = 3;
}

// master to slave
message DbSync {
required string table_name = 1;
required uint32 partition_id = 2;
}

required Type type = 1;
optional MetaSync meta_sync = 2;
optional TrySync try_sync = 3;
repeated BinlogSync binlog_sync = 4;
optional HeatBeat heat_beat = 5;
optional DBSync db_sync = 4;
repeated BinlogSync binlog_sync = 5;
optional HeatBeat heat_beat = 6;
}

message PartitionInfo {
Expand All @@ -94,14 +96,23 @@ message InnerResponse {
// master to slave
message TrySync {
enum ReplyCode {
kOk = 1;
kInvalidOffset = 2;
kWait = 3;
kError = 4;
kOk = 1;
kSyncPointBePurged = 2;
kSyncPointLarger = 3;
kError = 4;
}
required ReplyCode reply_code = 1;
required Partition partition = 2;
optional BinlogOffset binlog_offset = 3;
optional int32 sid = 4;
}

message DBSync {
enum ReplyCode {
kWait = 1;
}
required ReplyCode reply_code = 1;
required Partition partition = 2;
optional int32 sid = 3;
}

// slave to master
Expand All @@ -117,15 +128,12 @@ message InnerResponse {
required string pong = 1;
}

// slave to master
message DbSync {
}
required Type type = 1;
required StatusCode code = 2;
optional string reply = 3;
optional MetaSync meta_sync = 4;
optional TrySync try_sync = 5;
optional BinlogSync binlog_sync = 6;
optional HeatBeat heat_beat = 7;
optional DbSync db_sync = 8;
optional DBSync db_sync = 5;
optional TrySync try_sync = 6;
optional BinlogSync binlog_sync = 7;
optional HeatBeat heat_beat = 8;
}
2 changes: 1 addition & 1 deletion src/pika_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ bool Partition::ChangeDb(const std::string& new_path) {
assert(db_);
assert(s.ok());
slash::DeleteDirIfExist(tmp_path);
LOG(INFO) << "Partition:" << partition_name_ << ", Change db success";
LOG(INFO) << "Partition: " << partition_name_ << ", Change db success";
return true;
}

Expand Down
106 changes: 69 additions & 37 deletions src/pika_repl_bgworker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ void PikaReplBgWorker::ScheduleRequest(const std::shared_ptr<InnerMessage::Inner
case InnerMessage::kMetaSync:
bg_thread_.Schedule(&PikaReplBgWorker::HandleMetaSyncRequest, static_cast<void*>(arg));
break;
case InnerMessage::kDBSync:
bg_thread_.Schedule(&PikaReplBgWorker::HandleDBSyncRequest, static_cast<void*>(arg));
break;
case InnerMessage::kTrySync:
bg_thread_.Schedule(&PikaReplBgWorker::HandleTrySyncRequest, static_cast<void*>(arg));
break;
Expand Down Expand Up @@ -274,6 +277,44 @@ void PikaReplBgWorker::HandleWriteDb(void* arg) {
delete bg_worker_arg;
}

void PikaReplBgWorker::HandleDBSyncRequest(void* arg) {
ReplBgWorkerArg* bg_worker_arg = static_cast<ReplBgWorkerArg*>(arg);
const std::shared_ptr<InnerMessage::InnerRequest> req = bg_worker_arg->req;
std::shared_ptr<pink::PbConn> conn = bg_worker_arg->conn;

InnerMessage::InnerRequest::DBSync db_sync_request = req->db_sync();
InnerMessage::Partition partition_request = db_sync_request.partition();
InnerMessage::Node node = db_sync_request.node();
InnerMessage::BinlogOffset slave_boffset = db_sync_request.binlog_offset();
std::string table_name = partition_request.table_name();
uint32_t partition_id = partition_request.partition_id();

InnerMessage::InnerResponse response;
response.set_code(InnerMessage::kOk);
response.set_type(InnerMessage::Type::kDBSync);
InnerMessage::InnerResponse::DBSync* db_sync_response = response.mutable_db_sync();
InnerMessage::Partition* partition_response = db_sync_response->mutable_partition();
partition_response->set_table_name(table_name);
partition_response->set_partition_id(partition_id);

LOG(INFO) << "Handle partition DBSync Request";
g_pika_server->TryDBSync(node.ip(), node.port() + kPortShiftRSync,
table_name, partition_id, slave_boffset.filenum());
db_sync_response->set_reply_code(InnerMessage::InnerResponse::DBSync::kWait);

std::string reply_str;
if (!response.SerializeToString(&reply_str)
|| conn->WriteResp(reply_str)) {
LOG(WARNING) << "Handle DBSync Failed";
conn->NotifyClose();
delete bg_worker_arg;
return;
}
conn->NotifyWrite();
delete bg_worker_arg;

}

void PikaReplBgWorker::HandleTrySyncRequest(void* arg) {
ReplBgWorkerArg* bg_worker_arg = static_cast<ReplBgWorkerArg*>(arg);
const std::shared_ptr<InnerMessage::InnerRequest> req = bg_worker_arg->req;
Expand All @@ -293,58 +334,49 @@ void PikaReplBgWorker::HandleTrySyncRequest(void* arg) {
LOG(WARNING) << "Table Name: " << table_name << " Partition ID: "
<< partition_id << " Not Found, TrySync Error";
} else {
bool force = try_sync_request.force();
std::string partition_name = partition->GetPartitionName();
InnerMessage::BinlogOffset slave_boffset = try_sync_request.binlog_offset();
InnerMessage::Node node = try_sync_request.node();
LOG(INFO) << "Trysync, Slave ip: " << node.ip() << ", Slave port:"
<< node.port() << ", Partition: " << partition_name << ", filenum: "
<< slave_boffset.filenum() << ", pro_offset: " << slave_boffset.offset()
<< ", force: " << (force ? "yes" : "no");
<< slave_boffset.filenum() << ", pro_offset: " << slave_boffset.offset();

response.set_code(InnerMessage::kOk);
InnerMessage::InnerResponse::TrySync* try_sync_response = response.mutable_try_sync();
InnerMessage::Partition* partition_response = try_sync_response->mutable_partition();
InnerMessage::BinlogOffset* master_partition_boffset = try_sync_response->mutable_binlog_offset();
partition_response->set_table_name(table_name);
partition_response->set_partition_id(partition_id);
if (force) {
LOG(INFO) << "Partition: " << partition_name << " force full sync, BgSave and DbSync first";
g_pika_server->TryDBSync(node.ip(), node.port() + kPortShiftRSync,
table_name, partition_id, slave_boffset.filenum());
try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kWait);
BinlogOffset boffset;
if (!partition->GetBinlogOffset(&boffset)) {
try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kError);
LOG(WARNING) << "Handle TrySync, Partition: "
<< partition_name << " Get binlog offset error, TrySync failed";
} else {
BinlogOffset boffset;
if (!partition->GetBinlogOffset(&boffset)) {
try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kError);
LOG(WARNING) << "Handle TrySync, Partition: "
<< partition_name << " Get binlog offset error, TrySync failed";
master_partition_boffset->set_filenum(boffset.filenum);
master_partition_boffset->set_offset(boffset.offset);
if (boffset.filenum < slave_boffset.filenum()
|| (boffset.filenum == slave_boffset.filenum() && boffset.offset < slave_boffset.offset())) {
try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kSyncPointLarger);
LOG(WARNING) << "Slave offset is larger than mine, Slave ip: "
<< node.ip() << ", Slave port: " << node.port() << ", Partition: "
<< partition_name << ", filenum: " << slave_boffset.filenum()
<< ", pro_offset_: " << slave_boffset.offset();
} else {
if (boffset.filenum < slave_boffset.filenum()
|| (boffset.filenum == slave_boffset.filenum() && boffset.offset < slave_boffset.offset())) {
try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kInvalidOffset);
LOG(WARNING) << "Slave offset is larger than mine, Slave ip: "
<< node.ip() << ", Slave port: " << node.port() << ", Partition: "
<< partition_name << ", filenum: " << slave_boffset.filenum()
<< ", pro_offset_: " << slave_boffset.offset() << ", force: "
<< (force ? "yes" : "no");
std::string confile = NewFileName(partition->logger()->filename, slave_boffset.filenum());
if (!slash::FileExists(confile)) {
LOG(INFO) << "Partition: " << partition_name << " binlog has been purged, may need full sync";
try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kSyncPointBePurged);
} else {
std::string confile = NewFileName(partition->logger()->filename, slave_boffset.filenum());
if (!slash::FileExists(confile)) {
LOG(INFO) << "Partition: " << partition_name << " binlog has been purged, try full sync";
g_pika_server->TryDBSync(node.ip(), node.port() + kPortShiftRSync,
table_name, partition_id, slave_boffset.filenum());
try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kWait);
try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kOk);
try_sync_response->set_sid(0);
// incremental sync
Status s = g_pika_server->AddBinlogSender(table_name, partition_id,
node.ip(), node.port(), 0, slave_boffset.filenum(), slave_boffset.offset());
if (s.ok()) {
LOG(INFO) << "Partition: " << partition_name << " TrySync Success";
} else {
try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kOk);
try_sync_response->set_sid(0);
// incremental sync
Status s = g_pika_server->AddBinlogSender(table_name, partition_id,
node.ip(), node.port(), 0, slave_boffset.filenum(), slave_boffset.offset());
if (s.ok()) {
LOG(INFO) << "Partition: " << partition_name << " TrySync Success";
} else {
LOG(WARNING) << "Partition: " << partition_name << " TrySync Failed, " << s.ToString();
}
LOG(WARNING) << "Partition: " << partition_name << " TrySync Failed, " << s.ToString();
}
}
}
Expand Down
Loading

0 comments on commit 7f6601f

Please sign in to comment.