Skip to content

Commit

Permalink
aof_to_pika no wait before send subsequence
Browse files Browse the repository at this point in the history
  • Loading branch information
CatKang committed Jun 27, 2017
1 parent 33072be commit a505ef5
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 16 deletions.
4 changes: 2 additions & 2 deletions tools/aof_to_pika/include/aof_sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
#define RM_RECONN 4

#define READ_BUF_MAX 100
#define MSG_BLOCK_MAX 128 * 1024
#define MSG_BLOCK_MAX 512 * 1024

typedef struct ConnInfo{
ConnInfo(){}
Expand Down Expand Up @@ -42,7 +42,7 @@ class AOFSender
CondVar buf_rcond_;
std::deque<std::string> read_buffer_;
std::string to_send_;
std::string last_sent_;
std::string current_bulk_;
bool message_get_();
bool check_succ_(const std::string&, long&, long&);
int mask_wait_(int fd, int mask, long long milliseconds);
Expand Down
23 changes: 9 additions & 14 deletions tools/aof_to_pika/src/aof_sender.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <netdb.h>
#include <poll.h>
#include <fcntl.h>
#include <iostream>

#include "aof_sender.h"
#include "aof_info.h"
Expand Down Expand Up @@ -106,13 +107,9 @@ bool AOFSender::process(){
char ibuf[1024 * 16];
memset(&ibuf, 0, sizeof(ibuf));
long nsucc = 0, nfail = 0, inter = 0;;
int empty_loop = 0;

while (true) {
int mask = RM_READBLE;
if (last_sent_.empty() || empty_loop > 3) {
mask |= RM_WRITABLE;
}
int mask = RM_READBLE | RM_WRITABLE;
mask = mask_wait_(sockfd_, mask, 1000);

if (mask & RM_RECONN) {
Expand All @@ -121,11 +118,8 @@ bool AOFSender::process(){
LOG_ERR("Failed to reconnect remote server! host: " + conn_info_->host_ + " port : " + conn_info_->port_ +
" try again 1 second later!");
usleep(1000000);
} else {
empty_loop += 4; // no need to wait read when reconnect
}
} else if (mask & RM_READBLE) {
empty_loop = 0;
// Read from socket
ssize_t count;
std::string reply;
Expand All @@ -146,13 +140,14 @@ bool AOFSender::process(){
if (!reply.empty()){
std::stringstream ss;
if (check_succ_(reply, nsucc, nfail)) {
last_sent_.clear();
ss << "Process OK!" << " SUCC : " << nsucc << ", FAILED: " << nfail;
if ((nsucc - inter) > 10000) {
LOG_INFO(ss.str()); inter = nsucc;
} else { LOG_TRACE(ss.str()); }
} else {
ss << "Process Failed for :[" << last_sent_ << "] with reply : [" << reply + "]";
ss << "Process Failed current bulk :[" << current_bulk_
<< "] size: " << current_bulk_.size() << ", remain send size: " << to_send_.size()
<< ", with reply : [" << reply + "]";
LOG_ERR(ss.str());
ss.str(std::string());
ss << "SUCC : " << nsucc << ", FAILED: " << nfail;
Expand All @@ -161,10 +156,10 @@ bool AOFSender::process(){
}
}
} else if (mask & RM_WRITABLE) {
empty_loop = 0;
//Read from queue
if (to_send_.empty()) {
message_get_();
current_bulk_ = to_send_;
}

// Send to socket
Expand All @@ -183,11 +178,8 @@ bool AOFSender::process(){

} while((unsigned)total_nwritten < to_send_.size());

last_sent_.assign(to_send_.substr(0, total_nwritten));
to_send_.assign(to_send_.substr(total_nwritten));
}
} else{
empty_loop++;
}

}
Expand Down Expand Up @@ -219,6 +211,9 @@ int AOFSender::mask_wait_(int fd, int mask, long long milliseconds) {
case -1:
retmask |= RM_RECONN;
break;
case 0:
std::cout << "poll timeout with mask:" << mask << std::endl;
break;
}

return retmask;
Expand Down

0 comments on commit a505ef5

Please sign in to comment.