Skip to content

Commit

Permalink
Bugfix of hub senders's close; Bugfix of pexpireat's tobinlog
Browse files Browse the repository at this point in the history
  • Loading branch information
gaodunqiao committed Nov 29, 2017
1 parent c30999c commit b172afa
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 15 deletions.
16 changes: 4 additions & 12 deletions include/pika_hub_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,12 @@ class PikaHubSenderThread : public pink::Thread {

int TryStartThread(const std::string& hub_ip, const int hub_port);

int GetTid() {
return tid_;
}
int GetTid() { return tid_; }

enum STATUS { UNSTARTED, WORKING, WAITING, ENDOFFILE };
STATUS SenderStatus() {
return status_;
}
STATUS SenderStatus() { return status_; }

void CloseClient() {
cli_->Close();
// Close socket connnection
cli_.reset(pink::NewRedisCli());
should_reset_ = true;
}
void Reset() { should_reset_ = true; should_reconnect_ = true; }

private:
bool ResetStatus();
Expand All @@ -66,6 +57,7 @@ class PikaHubSenderThread : public pink::Thread {

STATUS status_;
std::atomic<bool> should_reset_;
std::atomic<bool> should_reconnect_;

uint32_t filenum_;
uint64_t con_offset_;
Expand Down
7 changes: 5 additions & 2 deletions src/pika_hub_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ extern PikaServer* g_pika_server;
PikaHubManager::PikaHubManager(const std::set<std::string> &ips, int port,
int cron_interval)
: hub_stage_(STOPED),
hub_port_(0),
hub_filenum_(0),
hub_con_offset_(0),
sending_window_({0, -1}),
Expand Down Expand Up @@ -245,7 +246,7 @@ void PikaHubManager::StopHub(int connnection_num) {
} else {
hub_stage_ = STOPED;
for (int i = 0; i < kMaxHubSender; i++) {
sender_threads_[i]->CloseClient();
sender_threads_[i]->Reset();
}
slash::MutexLock l(&sending_window_protector_);
sending_window_.left = 0;
Expand Down Expand Up @@ -524,6 +525,7 @@ void* PikaHubSenderThread::ThreadMain() {
bool should_wait = true;
while (!should_stop()) {
if (should_reset_) {
cli_.reset(pink::NewRedisCli());
should_wait = ResetStatus();
if (should_wait) {
status_ = UNSTARTED;
Expand All @@ -539,6 +541,7 @@ void* PikaHubSenderThread::ThreadMain() {
// 1. Connect to hub
result = cli_->Connect(hub_ip_, hub_port_, g_pika_server->host());
if (result.ok()) {
should_reconnect_ = false;
LOG(INFO) << "Hub Sender Connect hub(" << hub_ip_ << ":" <<
hub_port_ << ") " << result.ToString() << ", filenum: " << filenum_;

Expand All @@ -559,7 +562,7 @@ void* PikaHubSenderThread::ThreadMain() {
}

// 3. After successful parse, we send msg;
if (!cli_->Available()) {
if (should_reconnect_) {
last_send_flag = false;
break;
}
Expand Down
2 changes: 1 addition & 1 deletion src/pika_kv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -924,7 +924,7 @@ std::string PexpireatCmd::ToBinlog(
RedisAppendContent(res, key_);
// sec
char buf[100];
int64_t expireat = time(nullptr) + time_stamp_ms_ / 1000;
int64_t expireat = time_stamp_ms_ / 1000;
slash::ll2string(buf, 100, expireat);
std::string at(buf);
RedisAppendLen(res, at.size(), "$");
Expand Down

0 comments on commit b172afa

Please sign in to comment.