diff --git a/examples/sudoku/CMakeLists.txt b/examples/sudoku/CMakeLists.txt index 3967257cd..234d4dd65 100644 --- a/examples/sudoku/CMakeLists.txt +++ b/examples/sudoku/CMakeLists.txt @@ -13,6 +13,9 @@ target_link_libraries(sudoku_solver_hybrid muduo_inspect) add_executable(sudoku_client_batch batch.cc sudoku.cc) target_link_libraries(sudoku_client_batch muduo_net) +add_executable(sudoku_client_pipeline pipeline.cc sudoku.cc) +target_link_libraries(sudoku_client_pipeline muduo_net) + add_executable(sudoku_loadtest loadtest.cc sudoku.cc) target_link_libraries(sudoku_loadtest muduo_net) diff --git a/examples/sudoku/loadtest.cc b/examples/sudoku/loadtest.cc index f0576ebe0..118c96a30 100644 --- a/examples/sudoku/loadtest.cc +++ b/examples/sudoku/loadtest.cc @@ -33,20 +33,19 @@ InputPtr readInput(std::istream& in) return input; } -typedef boost::function DoneCallback; - class SudokuClient : boost::noncopyable { public: SudokuClient(EventLoop* loop, const InetAddress& serverAddr, const InputPtr& input, - const string& name) + const string& name, + bool nodelay) : name_(name), + tcpNoDelay_(nodelay), client_(loop, serverAddr, name_), input_(input), - count_(0), - ticks_(0) + count_(0) { client_.setConnectionCallback( boost::bind(&SudokuClient::onConnection, this, _1)); @@ -93,6 +92,8 @@ class SudokuClient : boost::noncopyable if (conn->connected()) { LOG_INFO << name_ << " connected"; + if (tcpNoDelay_) + conn->setTcpNoDelay(true); conn_ = conn; } else @@ -161,12 +162,12 @@ class SudokuClient : boost::noncopyable } const string name_; + const bool tcpNoDelay_; TcpClient client_; TcpConnectionPtr conn_; Buffer requests_; const InputPtr input_; int count_; - int ticks_; boost::unordered_map sendTime_; std::vector latencies_; }; @@ -180,7 +181,7 @@ class SudokuLoadtest : boost::noncopyable { } - void runClient(const InputPtr& input, const InetAddress& serverAddr, int rps, int conn) + void runClient(const InputPtr& input, const InetAddress& serverAddr, int rps, int conn, bool nodelay) { EventLoop loop; @@ -188,7 +189,7 @@ class SudokuLoadtest : boost::noncopyable { Fmt f("c%04d", i+1); string name(f.data(), f.length()); - clients_.push_back(new SudokuClient(&loop, serverAddr, input, name)); + clients_.push_back(new SudokuClient(&loop, serverAddr, input, name, nodelay)); clients_.back().connect(); } @@ -271,9 +272,13 @@ int main(int argc, char* argv[]) { int conn = 1; int rps = 100; + bool nodelay = false; InetAddress serverAddr("127.0.0.1", 9981); switch (argc) { + case 6: + nodelay = string(argv[5]) == "-n"; + // FALL THROUGH case 5: conn = atoi(argv[4]); // FALL THROUGH @@ -286,7 +291,7 @@ int main(int argc, char* argv[]) case 2: break; default: - printf("Usage: %s input server_ip [requests_per_second] [connections]\n", argv[0]); + printf("Usage: %s input server_ip [requests_per_second] [connections] [-n]\n", argv[0]); return 0; } @@ -296,7 +301,7 @@ int main(int argc, char* argv[]) InputPtr input(readInput(in)); printf("%zd requests from %s\n", input->size(), argv[1]); SudokuLoadtest test; - test.runClient(input, serverAddr, rps, conn); + test.runClient(input, serverAddr, rps, conn, nodelay); } else { diff --git a/examples/sudoku/pipeline.cc b/examples/sudoku/pipeline.cc new file mode 100644 index 000000000..b2ced9eda --- /dev/null +++ b/examples/sudoku/pipeline.cc @@ -0,0 +1,284 @@ +#include "sudoku.h" + +#include +#include +#include + +#include +#include +#include + +#include +#include + +#include + +using namespace muduo; +using namespace muduo::net; + +typedef std::vector Input; +typedef boost::shared_ptr InputPtr; + +class SudokuClient : boost::noncopyable +{ + public: + SudokuClient(EventLoop* loop, + const InetAddress& serverAddr, + const InputPtr& input, + const string& name, + int pipelines, + bool nodelay) + : name_(name), + pipelines_(pipelines), + tcpNoDelay_(nodelay), + client_(loop, serverAddr, name_), + input_(input), + count_(0) + { + client_.setConnectionCallback( + boost::bind(&SudokuClient::onConnection, this, _1)); + client_.setMessageCallback( + boost::bind(&SudokuClient::onMessage, this, _1, _2, _3)); + } + + void connect() + { + client_.connect(); + } + + void report(std::vector* latency, int* infly) + { + latency->insert(latency->end(), latencies_.begin(), latencies_.end()); + latencies_.clear(); + *infly += static_cast(sendTime_.size()); + } + + private: + void onConnection(const TcpConnectionPtr& conn) + { + if (conn->connected()) + { + LOG_INFO << name_ << " connected"; + if (tcpNoDelay_) + conn->setTcpNoDelay(true); + conn_ = conn; + send(pipelines_); + } + else + { + LOG_INFO << name_ << " disconnected"; + conn_.reset(); + // FIXME: exit + } + } + + void send(int n) + { + Timestamp now(Timestamp::now()); + Buffer requests; + for (int i = 0; i < n; ++i) + { + char buf[256]; + const string& req = (*input_)[count_ % input_->size()]; + int len = snprintf(buf, sizeof buf, "%s-%08d:%s\r\n", + name_.c_str(), count_, req.c_str()); + requests.append(buf, len); + sendTime_[count_] = now; + ++count_; + } + + conn_->send(&requests); + } + + void onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp recvTime) + { + size_t len = buf->readableBytes(); + while (len >= kCells + 2) + { + const char* crlf = buf->findCRLF(); + if (crlf) + { + string response(buf->peek(), crlf); + buf->retrieveUntil(crlf + 2); + len = buf->readableBytes(); + if (verify(response, recvTime)) + { + send(1); + } + else + { + LOG_ERROR << "Bad response:" << response; + conn->shutdown(); + break; + } + } + else if (len > 100) // id + ":" + kCells + "\r\n" + { + LOG_ERROR << "Line is too long!"; + conn->shutdown(); + break; + } + else + { + break; + } + } + } + + bool verify(const string& response, Timestamp recvTime) + { + size_t colon = response.find(':'); + if (colon != string::npos) + { + size_t dash = response.find('-'); + if (dash != string::npos && dash < colon) + { + int id = atoi(response.c_str()+dash+1); + boost::unordered_map::iterator sendTime = sendTime_.find(id); + if (sendTime != sendTime_.end()) + { + int64_t latency_us = recvTime.microSecondsSinceEpoch() - sendTime->second.microSecondsSinceEpoch(); + latencies_.push_back(static_cast(latency_us)); + sendTime_.erase(sendTime); + } + else + { + LOG_ERROR << "Unknown id " << id << " of " << name_; + } + } + } + // FIXME + return true; + } + + const string name_; + const int pipelines_; + const bool tcpNoDelay_; + TcpClient client_; + TcpConnectionPtr conn_; + const InputPtr input_; + int count_; + boost::unordered_map sendTime_; + std::vector latencies_; +}; + +int getPercentile(const std::vector& latencies, int percent) +{ + // The Nearest Rank method + assert(latencies.size() > 0); + size_t idx = 0; + if (percent > 0) + { + idx = (latencies.size() * percent + 99) / 100 - 1; + assert(idx < latencies.size()); + } + return latencies[idx]; +} + +void report(boost::ptr_vector* clients) +{ + std::vector latencies; + int infly = 0; + for (boost::ptr_vector::iterator it = clients->begin(); + it != clients->end(); ++it) + { + it->report(&latencies, &infly); + } + + LogStream stat; + stat << "recv " << Fmt("%6zd", latencies.size()) << " in-fly " << infly; + + if (!latencies.empty()) + { + std::sort(latencies.begin(), latencies.end()); + int min = latencies.front(); + int max = latencies.back(); + int sum = std::accumulate(latencies.begin(), latencies.end(), 0); + int mean = sum / static_cast(latencies.size()); + int median = getPercentile(latencies, 50); + int p90 = getPercentile(latencies, 90); + int p99 = getPercentile(latencies, 99); + stat << " min " << min + << " max " << max + << " avg " << mean + << " median " << median + << " p90 " << p90 + << " p99 " << p99; + } + + LOG_INFO << stat.buffer(); +} + +InputPtr readInput(std::istream& in) +{ + boost::shared_ptr input(new Input); + std::string line; + while (getline(in, line)) + { + if (line.size() == implicit_cast(kCells)) + { + input->push_back(line.c_str()); + } + } + return input; +} + +void runClient(const InputPtr& input, + const InetAddress& serverAddr, + int conn, + int pipelines, + bool nodelay) +{ + EventLoop loop; + boost::ptr_vector clients; + for (int i = 0; i < conn; ++i) + { + Fmt f("c%04d", i+1); + string name(f.data(), f.length()); + clients.push_back(new SudokuClient(&loop, serverAddr, input, name, pipelines, nodelay)); + clients.back().connect(); + } + + loop.runEvery(1.0, boost::bind(report, &clients)); + loop.loop(); +} + +int main(int argc, char* argv[]) +{ + int conn = 1; + int pipelines = 1; + bool nodelay = false; + InetAddress serverAddr("127.0.0.1", 9981); + switch (argc) + { + case 6: + nodelay = string(argv[5]) == "-n"; + // FALL THROUGH + case 5: + pipelines = atoi(argv[4]); + // FALL THROUGH + case 4: + conn = atoi(argv[3]); + // FALL THROUGH + case 3: + serverAddr = InetAddress(argv[2], 9981); + // FALL THROUGH + case 2: + break; + default: + printf("Usage: %s input server_ip [connections] [pipelines] [-n]\n", argv[0]); + return 0; + } + + std::ifstream in(argv[1]); + if (in) + { + InputPtr input(readInput(in)); + printf("%zd requests from %s\n", input->size(), argv[1]); + runClient(input, serverAddr, conn, pipelines, nodelay); + } + else + { + printf("Cannot open %s\n", argv[1]); + } +}