Skip to content

Commit

Permalink
AsyncLogging
Browse files Browse the repository at this point in the history
  • Loading branch information
834810071 committed Nov 18, 2019
1 parent cc1385b commit 914c8c3
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 1 deletion.
99 changes: 99 additions & 0 deletions base/AsyncLogging.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,104 @@ AsyncLogging::AsyncLogging(const string &basename,

void AsyncLogging::append(const char *logline, int len)
{
muduo::MutexLockGuard lock(mutex_);
if (currentBuffer_->avail() > len)
{
currentBuffer_->append(logline, len);
}
else
{
buffers_.push_back(std::move(currentBuffer_));

if (nextBuffer_) // 存在
{
currentBuffer_ = std::move(nextBuffer_);
}
else
{
currentBuffer_.reset(new Buffer); // Rarely happens
}

currentBuffer_->append(logline, len);
cond_.notify();
}
}

void AsyncLogging::threadFunc()
{
assert(running_ == true);
latch_.countDown();
LogFile output(basename_, rollSize_, false);
BufferPtr newBuffer1(new Buffer);
BufferPtr newBuffer2(new Buffer);
newBuffer1->bzero();
newBuffer2->bzero();
BufferVector buffersToWrite;
buffersToWrite.reserve(16);
while (running_)
{
assert(newBuffer1 && newBuffer1->length() == 0);
assert(newBuffer2 && newBuffer2->length() == 0);
assert(buffersToWrite.empty());

{
muduo::MutexLockGuard lock(mutex_);
if (buffers_.empty()) // unusual usage!
{
cond_.waitForSeconds(flushInterval_);
}
buffers_.push_back(std::move(currentBuffer_)); // 存储
currentBuffer_ = std::move(newBuffer1); // 清空
buffersToWrite.swap(buffers_); // 交换存储
if (!nextBuffer_) // 不存在
{
nextBuffer_ = std::move(newBuffer2);
}
}

assert(!buffersToWrite.empty());

if (buffersToWrite.size() > 25) // ???
{
char buf[256];
snprintf(buf, sizeof buf, "Dropped log message at %s, %zd larger buffers\n",
Timestamp::now().toFormattedString().c_str(),
buffersToWrite.size()-2);
fputs(buf, stderr);
output.append(buf, static_cast<int>(strlen(buf)));
buffersToWrite.erase(buffersToWrite.begin()+2, buffersToWrite.end());
}

for (const auto& buffer : buffersToWrite)
{
// FIXME: use unbuffered stdio FILE ? or use ::writev ?
output.append(buffer->data(), buffer->length());
}

if (buffersToWrite.size() > 2)
{
// drop non-bzero-ed buffers, avoid trashing
buffersToWrite.resize(2);
}

if (!newBuffer1)
{
assert(!buffersToWrite.empty());
newBuffer1 = std::move(buffersToWrite.back());
buffersToWrite.pop_back();
newBuffer1->reset();
}

if (!newBuffer2)
{
assert(!buffersToWrite.empty());
newBuffer2 = std::move(buffersToWrite.back());
buffersToWrite.pop_back();
newBuffer2->reset();
}
buffersToWrite.clear();
output.flush();
}

output.flush();
}
2 changes: 1 addition & 1 deletion base/BoundedBlockingQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ namespace muduo
notFull_.wait();
}
assert(!queue_.full());
queue_.push_back(std::move(x));
queue_.push_back(std::move(X));
notEmpty_.notify();
}

Expand Down
35 changes: 35 additions & 0 deletions base/tests/AsyncLogging_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
//
// Created by jxq on 19-11-18.
//

#include "../AsyncLogging.h"
#include "../Logging.h"

using namespace muduo;

AsyncLogging* logptr = NULL;

void asyncoutput(const char* msg, int len)
{
logptr->append(msg, len); // 前台进程传递日志
}

int main(int argc, char** argv)
{
LOG_INFO << "hello";
char name[256];
strcpy(name, argv[0]);
AsyncLogging asynclog("asynclog", 500*1000*1000);
asynclog.start();

logptr = &asynclog;
Logger::setOutput(asyncoutput);
for (int i = 0; i < 10; ++i)
{
LOG_INFO << "ASYNC LOG";
struct timespec ts = {0, 500*1000*1000};
nanosleep(&ts, NULL);
};

return 0;
}
2 changes: 2 additions & 0 deletions base/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ add_executable(Date_unittest Date_unittest.cpp)
add_executable(Logging_test Logging_test.cpp)
add_executable(LogStream_bench LogStream_bench.cpp)
add_executable(LogFile_test LogFile_test.cpp)
add_executable(AsyncLogging_test AsyncLogging_test.cpp)

target_link_libraries(LogFile_test base)
target_link_libraries(LogStream_bench base)
Expand All @@ -43,6 +44,7 @@ target_link_libraries(Mutex_test base)
target_link_libraries(BlockingQueue_test base)
target_link_libraries(BlockingQueue_bench base)
target_link_libraries(ThreadPool_test base)
target_link_libraries(AsyncLogging_test base)

#if(BOOSTTEST_LIBRARY)
add_executable(LogStream_test LogStream_test.cpp)
Expand Down

0 comments on commit 914c8c3

Please sign in to comment.