Skip to content

Commit

Permalink
add support for get operator
Browse files Browse the repository at this point in the history
  • Loading branch information
baotiao committed Feb 7, 2015
1 parent 76e9955 commit 312334f
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 26 deletions.
2 changes: 1 addition & 1 deletion include/tick_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class TickConn
bool SetNonblock();
Status TickReadBuf();
void DriveMachine();
void TickGetRequest();
int TickGetRequest();
int TickSendReply();

private:
Expand Down
8 changes: 6 additions & 2 deletions include/tick_packet.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@
#include "bada_sdk.pb.h"


Status SetBufferParse(const int32_t opcode, const char *rbuf, const int32_t rbuf_len, std::string *key, std::string *value);
Status SetParse(const int32_t opcode, const char *rbuf, const int32_t rbuf_len, std::string *key, std::string *value);

void SetBufferBuild(bool status, SdkSetRet *sdkSetRet);
void SetRetBuild(bool status, SdkSetRet *sdkSetRet);


Status GetParse(const int32_t opcode, const char *rbuf, const int32_t rbuf_len, std::string *key);

void GetRetBuild(std::string &val, SdkGetRet *sdkGetRet);
#endif
57 changes: 37 additions & 20 deletions src/tick_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,26 +67,27 @@ void TickConn::DriveMachine()
*/
}

void TickConn::TickGetRequest()
int TickConn::TickGetRequest()
{
ssize_t nread = 0;
nread = read(fd_, rbuf_ + rbuf_len_, TICK_MAX_MESSAGE);
if (nread == -1) {
if (errno == EAGAIN) {
nread = 0;
} else {
return ;
return -1;
}
} else if (nread == 0) {
return ;
return -1;
}

int32_t integer = 0;
bool flag = true;
std::string *key;
std::string *value;
SdkSetRet sdkSetRet;
int packet_len;
SdkGetRet sdkGetRet;
int packet_len = TICK_MAX_MESSAGE;
if (nread) {
rbuf_len_ += nread;
while (flag) {
Expand Down Expand Up @@ -121,23 +122,38 @@ void TickConn::TickGetRequest()
}
break;
case kComplete:
key = new std::string();
value = new std::string();
SetBufferParse(r_opcode_, rbuf_ + COMMAND_HEADER_LENGTH + COMMAND_CODE_LENGTH, rbuf_len_ - COMMAND_HEADER_LENGTH - COMMAND_CODE_LENGTH, key, value);

// printf("%s %s\n", key->c_str(), value->c_str());
g_tickServer->db_->Put(leveldb::WriteOptions(), (*key), (*value));
SetBufferBuild(true, &sdkSetRet);
packet_len = TICK_MAX_MESSAGE;
sdkSetRet.SerializeToArray(wbuf_ + COMMAND_HEADER_LENGTH + COMMAND_CODE_LENGTH, packet_len);
delete(key);
delete(value);
BuildObuf(kSdkSetRet, sdkSetRet.ByteSize());
connStatus_ = kHeader;
if (cur_pos_ == rbuf_len_) {
cur_pos_ = 0;
rbuf_len_ = 0;
if (r_opcode_ == kSdkSet) {
key = new std::string();
value = new std::string();
SetParse(r_opcode_, rbuf_ + COMMAND_HEADER_LENGTH + COMMAND_CODE_LENGTH, rbuf_len_ - COMMAND_HEADER_LENGTH - COMMAND_CODE_LENGTH, key, value);
// printf("%s %s\n", key->c_str(), value->c_str());
g_tickServer->db_->Put(leveldb::WriteOptions(), (*key), (*value));
SetRetBuild(true, &sdkSetRet);
sdkSetRet.SerializeToArray(wbuf_ + COMMAND_HEADER_LENGTH + COMMAND_CODE_LENGTH, packet_len);
delete(key);
delete(value);
BuildObuf(kSdkSetRet, sdkSetRet.ByteSize());
connStatus_ = kHeader;
if (cur_pos_ == rbuf_len_) {
cur_pos_ = 0;
rbuf_len_ = 0;
}
} else if (r_opcode_ == kSdkGet) {
key = new std::string();
GetParse(r_opcode_, rbuf_ + COMMAND_HEADER_LENGTH + COMMAND_CODE_LENGTH, rbuf_len_ - COMMAND_HEADER_LENGTH - COMMAND_CODE_LENGTH, key);
std::string getRes;
g_tickServer->db_->Get(leveldb::ReadOptions(), (*key), &getRes);
GetRetBuild(getRes, &sdkGetRet);
sdkGetRet.SerializeToArray(wbuf_ + COMMAND_HEADER_LENGTH + COMMAND_CODE_LENGTH, packet_len);
delete(key);
BuildObuf(kSdkGetRet, sdkGetRet.ByteSize());
connStatus_ = kHeader;
if (cur_pos_ == rbuf_len_) {
cur_pos_ = 0;
rbuf_len_ = 0;
}
}

return 0;
break;

Expand All @@ -152,6 +168,7 @@ void TickConn::TickGetRequest()
}
}
}
return -1;
}

int TickConn::TickSendReply()
Expand Down
38 changes: 36 additions & 2 deletions src/tick_packet.cc
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#include "tick_packet.h"
#include "tick_define.h"

Status SetBufferParse(const int32_t opcode, const char *rbuf, const int32_t rbuf_len, std::string *key, std::string *value)
Status SetParse(const int32_t opcode, const char *rbuf, const int32_t rbuf_len, std::string *key, std::string *value)
{
Status s;
if (opcode == kSdkInvalidOperation) {
Expand Down Expand Up @@ -30,8 +30,42 @@ Status SetBufferParse(const int32_t opcode, const char *rbuf, const int32_t rbuf
return s;
}

void SetBufferBuild(bool status, SdkSetRet *sdkSetRet)
void SetRetBuild(bool status, SdkSetRet *sdkSetRet)
{
sdkSetRet->set_opcode(kSdkSetRet);
sdkSetRet->set_status(status);
}

Status GetParse(const int32_t opcode, const char *rbuf, const int32_t rbuf_len, std::string *key)
{
Status s;
if (opcode == kSdkInvalidOperation) {
SdkInvalidOperation sdkInvalidOperation;
if (sdkInvalidOperation.ParseFromArray(rbuf, rbuf_len)) {
s = Status::Corruption("Parse invalid operation error");
return s;
}
if (sdkInvalidOperation.what() == 1003) {
return Status::NotFound("Can't not found the key");
}
return Status::InvalidArgument("Invalid operation");
} else {
SdkGet sdkGet;
if (!sdkGet.ParseFromArray(rbuf, rbuf_len)) {
s = Status::Corruption("Parse error");
return s;
}
if (sdkGet.opcode() == kSdkGet) {
key->assign(sdkGet.key().data(), sdkGet.key().size());
} else {
s = Status::IOError("Get error");
}
}
return s;
}

void GetRetBuild(std::string &value, SdkGetRet *sdkGetRet)
{
sdkGetRet->set_opcode(kSdkGetRet);
sdkGetRet->set_value(value);
}
2 changes: 1 addition & 1 deletion src/tick_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ void TickThread::RunProcess()
if (inConn == NULL) {
continue;
}
if (inConn->TickGetRequest().ok()) {
if (inConn->TickGetRequest() == 0) {
// log_info("GetRequest ok fd %d", tfe->fd_);
/*
* tickEpoll_->TickDelEvent(tfe->fd_);
Expand Down

0 comments on commit 312334f

Please sign in to comment.