Skip to content

Commit

Permalink
Dev negotiate process (OpenAtomFoundation#851)
Browse files Browse the repository at this point in the history
  • Loading branch information
whoiami committed Mar 20, 2020
1 parent 883cbf7 commit f08bf12
Show file tree
Hide file tree
Showing 13 changed files with 510 additions and 17 deletions.
5 changes: 2 additions & 3 deletions include/pika_binlog.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ class Binlog {
*/
Status SetProducerStatus(uint32_t filenum, uint64_t pro_offset);

Status Truncate(uint32_t pro_num, uint64_t pro_offset);

uint64_t file_size() {
return file_size_;
}
Expand Down Expand Up @@ -103,9 +105,6 @@ class Binlog {

std::atomic<bool> opened_;

uint32_t consumer_num_;
uint64_t item_num_;

Version* version_;
slash::WritableFile *queue_;
slash::RWFile *versionfile_;
Expand Down
21 changes: 21 additions & 0 deletions include/pika_consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ class MemLog {
slash::MutexLock l_logs(&logs_mu_);
last_offset_ = offset;
}
bool FindLogItem(const LogOffset& offset, LogOffset* found_offset);

private:
int InternalFindLogIndex(const LogOffset& offset);
Expand All @@ -126,10 +127,16 @@ class ConsensusCoordinator {
void UpdateTerm(uint32_t term);
Status CheckEnoughFollower();

// invoked by follower
Status ProcessLeaderLog(std::shared_ptr<Cmd> cmd_ptr,
const BinlogItem& attribute);
Status ProcessLocalUpdate(const LogOffset& leader_commit);

// Negotiate
Status LeaderNegotiate(
const LogOffset& f_last_offset, bool* reject, std::vector<LogOffset>* hints);
Status FollowerNegotiate(const std::vector<LogOffset>& hints, LogOffset* reply_offset);

SyncProgress& SyncPros() {
return sync_pros_;
}
Expand Down Expand Up @@ -190,6 +197,20 @@ class ConsensusCoordinator {
bool InternalUpdateCommittedIndex(const LogOffset& slaves_committed_index,
LogOffset* updated_committed_index);

Status TryGetBinlogOffset(const BinlogOffset& start_offset, LogOffset* log_offset);
Status GetBinlogOffset(
const BinlogOffset& start_offset,
const BinlogOffset& end_offset, std::vector<LogOffset>* log_offset);
Status FindBinlogFileNum(
const std::map<uint32_t, std::string> binlogs,
uint64_t target_index, uint32_t start_filenum,
uint32_t* founded_filenum);
Status FindLogicOffsetBySearchingBinlog(
const BinlogOffset& hint_offset, uint64_t target_index, LogOffset* found_offset);
Status FindLogicOffset(
const BinlogOffset& start_offset, uint64_t target_index, LogOffset* found_offset);
Status GetLogsBefore(const BinlogOffset& start_offset, std::vector<LogOffset>* hints);

slash::Mutex index_mu_;
LogOffset committed_index_;

Expand Down
14 changes: 12 additions & 2 deletions include/pika_repl_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
#include "include/pika_conf.h"
#include "src/pika_inner_message.pb.h"

class SyncMasterPartition;
class SyncSlavePartition;

class PikaReplClientConn: public pink::PbConn {
public:
PikaReplClientConn(int fd, const std::string& ip_port, pink::Thread *thread, void* worker_specific_data, pink::PinkEpoll* epoll);
Expand All @@ -22,9 +25,16 @@ class PikaReplClientConn: public pink::PbConn {
static void HandleDBSyncResponse(void* arg);
static void HandleTrySyncResponse(void* arg);
static void HandleRemoveSlaveNodeResponse(void* arg);
static bool IsTableStructConsistent(const std::vector<TableStruct>& current_tables,
const std::vector<TableStruct>& expect_tables);

static bool TrySyncConsensusCheck(
const InnerMessage::ConsensusMeta& consensus_meta,
const std::shared_ptr<SyncMasterPartition>& partition,
const std::shared_ptr<SyncSlavePartition>& slave_partition);
static bool IsTableStructConsistent(
const std::vector<TableStruct>& current_tables,
const std::vector<TableStruct>& expect_tables);
int DealMessage() override;

private:
// dispatch binlog by its table_name + partition
void DispatchBinlogRes(const std::shared_ptr<InnerMessage::InnerResponse> response);
Expand Down
9 changes: 8 additions & 1 deletion include/pika_repl_server_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
#include "pink/include/pb_conn.h"
#include "pink/include/pink_thread.h"

#include "include/pika_define.h"
#include "src/pika_inner_message.pb.h"

class SyncMasterPartition;

class PikaReplServerConn: public pink::PbConn {
public:
PikaReplServerConn(int fd, std::string ip_port, pink::Thread* thread, void* worker_specific_data, pink::PinkEpoll* epoll);
Expand All @@ -28,13 +30,18 @@ class PikaReplServerConn: public pink::PbConn {
InnerMessage::InnerResponse::TrySync* try_sync_response);
static bool TrySyncConsensusOffsetCheck(
const std::shared_ptr<SyncMasterPartition>& partition,
const InnerMessage::InnerRequest::TrySync& try_sync_request,
const InnerMessage::ConsensusMeta& meta,
InnerMessage::InnerResponse* response,
InnerMessage::InnerResponse::TrySync* try_sync_response);
static bool TrySyncUpdateSlaveNode(
const std::shared_ptr<SyncMasterPartition>& partition,
const InnerMessage::InnerRequest::TrySync& try_sync_request,
const std::shared_ptr<pink::PbConn>& conn,
InnerMessage::InnerResponse::TrySync* try_sync_response);
static void BuildConsensusMeta(
const bool& reject,
const std::vector<LogOffset>& hints,
InnerMessage::InnerResponse* response);

static void HandleDBSyncRequest(void* arg);
static void HandleBinlogSyncRequest(void* arg);
Expand Down
5 changes: 5 additions & 0 deletions include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,12 @@ class SyncMasterPartition : public SyncPartition {
Status ConsensusProcessLeaderLog(std::shared_ptr<Cmd> cmd_ptr, const BinlogItem& attribute);
Status ConsensusProcessLocalUpdate(const LogOffset& leader_commit);
LogOffset ConsensusCommittedIndex();
LogOffset ConsensusLastIndex();
Status ConsensusUpdateAppliedIndex(const LogOffset& offset);
Status ConsensusLeaderNegotiate(const LogOffset& f_last_offset,
bool* reject, std::vector<LogOffset>* hints);
Status ConsensusFollowerNegotiate(
const std::vector<LogOffset>& hints, LogOffset* reply_offset);

std::shared_ptr<StableLog> StableLogger() {
return coordinator_.StableLogger();
Expand Down
10 changes: 9 additions & 1 deletion include/pika_stable_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,34 @@ class StableLog : public std::enable_shared_from_this<StableLog> {
return stable_logger_;
}
void Leave();
LogOffset first_offset() {
slash::RWLock l(&offset_rwlock_, false);
return first_offset_;
}

// Purgelogs use
bool PurgeStableLogs(uint32_t to = 0, bool manual = false);
void ClearPurge();
bool GetBinlogFiles(std::map<uint32_t, std::string>* binlogs);

private:
void Close();
void RemoveStableLogDir();
void UpdateFirstOffset(uint32_t filenum);
/*
* Purgelogs use
*/
static void DoPurgeStableLogs(void* arg);
bool PurgeFiles(uint32_t to, bool manual);
bool GetBinlogFiles(std::map<uint32_t, std::string>* binlogs);
std::atomic<bool> purging_;

std::string table_name_;
uint32_t partition_id_;
std::string log_path_;
std::shared_ptr<Binlog> stable_logger_;

pthread_rwlock_t offset_rwlock_;
LogOffset first_offset_;
};

struct PurgeStableLogArg {
Expand Down
34 changes: 33 additions & 1 deletion src/pika_binlog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

#include <sys/time.h>
#include <glog/logging.h>
#include <fcntl.h>


#include "include/pika_binlog_transverter.h"

Expand Down Expand Up @@ -66,7 +68,6 @@ Status Version::Init() {
*/
Binlog::Binlog(const std::string& binlog_path, const int file_size) :
opened_(false),
consumer_num_(0),
version_(NULL),
queue_(NULL),
versionfile_(NULL),
Expand Down Expand Up @@ -389,3 +390,34 @@ Status Binlog::SetProducerStatus(uint32_t pro_num, uint64_t pro_offset) {
InitLogFile();
return Status::OK();
}

Status Binlog::Truncate(uint32_t pro_num, uint64_t pro_offset) {
slash::MutexLock l(&mutex_);

delete queue_;
std::string profile = NewFileName(filename_, pro_num);
const int fd = open(profile.c_str(), O_RDWR | O_CLOEXEC, 0644);
if (fd < 0) {
return Status::IOError("fd open failed");
}
if (ftruncate(fd, pro_offset)) {
return Status::IOError("ftruncate failed");
}

Status s = slash::NewWritableFile(profile, &queue_);
if (!s.ok()) {
return s;
}

pro_num_ = pro_num;
{
slash::RWLock(&(version_->rwlock_), true);
version_->pro_num_ = pro_num;
version_->pro_offset_ = pro_offset;
version_->StableSave();
}

InitLogFile();

return Status::OK();
}
Loading

0 comments on commit f08bf12

Please sign in to comment.