Skip to content

Commit

Permalink
feature: handle special case when mario consumer reads invalid record
Browse files Browse the repository at this point in the history
  • Loading branch information
flabby committed Dec 22, 2015
1 parent 85037e0 commit c7024a7
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 4 deletions.
4 changes: 2 additions & 2 deletions src/pika.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ void cleanup() {

static void sig_handler(const int sig)
{
// LOG(INFO) << "Caught signal " << sig;
//LOG(INFO) << "Caught signal " << sig;
::google::ShutdownGoogleLogging();
g_pikaServer->shutdown = true;

sleep(2);
sleep(1);

cleanup();
exit(1);
Expand Down
3 changes: 1 addition & 2 deletions third/mario/src/consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,7 @@ Status Consumer::Consume(std::string &scratch)
case kOldRecord:
return Status::EndFile("Eof");
default:
s = Status::Corruption("Unknow reason");
break;
return Status::Corruption("Unknow reason");
}
// TODO:do handler here
if (s.ok()) {
Expand Down
18 changes: 18 additions & 0 deletions third/mario/src/mario.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
#include <stdint.h>
#define __STDC_FORMAT_MACROS
#include <inttypes.h>
#include <signal.h>
#include <unistd.h>

namespace mario {

Expand Down Expand Up @@ -357,6 +359,10 @@ void Mario::BackgroundCall(ConsumerItem* consumer_item)
//std::cout<<"1 --> con_offset: "<<consumer_item->consumer_->con_offset()<<" pro_offset: "<<version_->pro_offset()<<std::endl;
scratch = "";
s = consumer_item->consumer_->Consume(scratch);
if (s.IsCorruption()) {
mutex_.Unlock();
break;
}
while (!consumer_item->IsExit() && !s.ok()) {
std::string confile = NewFileName(filename_, consumer_item->consumer_->filenum() + 1);
if (s.IsEndFile() && env_->FileExists(confile)) {
Expand All @@ -374,8 +380,16 @@ void Mario::BackgroundCall(ConsumerItem* consumer_item)
mutex_.Lock();
}
s = consumer_item->consumer_->Consume(scratch);
if (s.IsCorruption()) {
mutex_.Unlock();
break;
}
}
mutex_.Unlock();
if (s.IsCorruption()) {
break;
}

if (retry_ == -1) {
while (!consumer_item->IsExit() && consumer_item->h_->processMsg(scratch)) {
}
Expand All @@ -390,6 +404,10 @@ void Mario::BackgroundCall(ConsumerItem* consumer_item)
}
}

if (s.IsCorruption()) {
printf ("Consumer corruption with %s, exit\n", s.ToString().c_str());
//raise(SIGUSR1);
}
pthread_exit(NULL);
}

Expand Down

0 comments on commit c7024a7

Please sign in to comment.