Skip to content

Commit

Permalink
add sudoku/pipeline.cc
Browse files Browse the repository at this point in the history
  • Loading branch information
chenshuo committed Apr 1, 2015
1 parent ecb318e commit 1661eb1
Show file tree
Hide file tree
Showing 3 changed files with 302 additions and 10 deletions.
3 changes: 3 additions & 0 deletions examples/sudoku/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ target_link_libraries(sudoku_solver_hybrid muduo_inspect)
add_executable(sudoku_client_batch batch.cc sudoku.cc)
target_link_libraries(sudoku_client_batch muduo_net)

add_executable(sudoku_client_pipeline pipeline.cc sudoku.cc)
target_link_libraries(sudoku_client_pipeline muduo_net)

add_executable(sudoku_loadtest loadtest.cc sudoku.cc)
target_link_libraries(sudoku_loadtest muduo_net)

Expand Down
25 changes: 15 additions & 10 deletions examples/sudoku/loadtest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,19 @@ InputPtr readInput(std::istream& in)
return input;
}

typedef boost::function<void(const string&, double)> DoneCallback;

class SudokuClient : boost::noncopyable
{
public:
SudokuClient(EventLoop* loop,
const InetAddress& serverAddr,
const InputPtr& input,
const string& name)
const string& name,
bool nodelay)
: name_(name),
tcpNoDelay_(nodelay),
client_(loop, serverAddr, name_),
input_(input),
count_(0),
ticks_(0)
count_(0)
{
client_.setConnectionCallback(
boost::bind(&SudokuClient::onConnection, this, _1));
Expand Down Expand Up @@ -93,6 +92,8 @@ class SudokuClient : boost::noncopyable
if (conn->connected())
{
LOG_INFO << name_ << " connected";
if (tcpNoDelay_)
conn->setTcpNoDelay(true);
conn_ = conn;
}
else
Expand Down Expand Up @@ -161,12 +162,12 @@ class SudokuClient : boost::noncopyable
}

const string name_;
const bool tcpNoDelay_;
TcpClient client_;
TcpConnectionPtr conn_;
Buffer requests_;
const InputPtr input_;
int count_;
int ticks_;
boost::unordered_map<int, Timestamp> sendTime_;
std::vector<int> latencies_;
};
Expand All @@ -180,15 +181,15 @@ class SudokuLoadtest : boost::noncopyable
{
}

void runClient(const InputPtr& input, const InetAddress& serverAddr, int rps, int conn)
void runClient(const InputPtr& input, const InetAddress& serverAddr, int rps, int conn, bool nodelay)
{
EventLoop loop;

for (int i = 0; i < conn; ++i)
{
Fmt f("c%04d", i+1);
string name(f.data(), f.length());
clients_.push_back(new SudokuClient(&loop, serverAddr, input, name));
clients_.push_back(new SudokuClient(&loop, serverAddr, input, name, nodelay));
clients_.back().connect();
}

Expand Down Expand Up @@ -271,9 +272,13 @@ int main(int argc, char* argv[])
{
int conn = 1;
int rps = 100;
bool nodelay = false;
InetAddress serverAddr("127.0.0.1", 9981);
switch (argc)
{
case 6:
nodelay = string(argv[5]) == "-n";
// FALL THROUGH
case 5:
conn = atoi(argv[4]);
// FALL THROUGH
Expand All @@ -286,7 +291,7 @@ int main(int argc, char* argv[])
case 2:
break;
default:
printf("Usage: %s input server_ip [requests_per_second] [connections]\n", argv[0]);
printf("Usage: %s input server_ip [requests_per_second] [connections] [-n]\n", argv[0]);
return 0;
}

Expand All @@ -296,7 +301,7 @@ int main(int argc, char* argv[])
InputPtr input(readInput(in));
printf("%zd requests from %s\n", input->size(), argv[1]);
SudokuLoadtest test;
test.runClient(input, serverAddr, rps, conn);
test.runClient(input, serverAddr, rps, conn, nodelay);
}
else
{
Expand Down
284 changes: 284 additions & 0 deletions examples/sudoku/pipeline.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,284 @@
#include "sudoku.h"

#include <muduo/base/Logging.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/TcpClient.h>

#include <boost/bind.hpp>
#include <boost/ptr_container/ptr_vector.hpp>
#include <boost/unordered_map.hpp>

#include <fstream>
#include <numeric>

#include <stdio.h>

using namespace muduo;
using namespace muduo::net;

typedef std::vector<string> Input;
typedef boost::shared_ptr<const Input> InputPtr;

class SudokuClient : boost::noncopyable
{
public:
SudokuClient(EventLoop* loop,
const InetAddress& serverAddr,
const InputPtr& input,
const string& name,
int pipelines,
bool nodelay)
: name_(name),
pipelines_(pipelines),
tcpNoDelay_(nodelay),
client_(loop, serverAddr, name_),
input_(input),
count_(0)
{
client_.setConnectionCallback(
boost::bind(&SudokuClient::onConnection, this, _1));
client_.setMessageCallback(
boost::bind(&SudokuClient::onMessage, this, _1, _2, _3));
}

void connect()
{
client_.connect();
}

void report(std::vector<int>* latency, int* infly)
{
latency->insert(latency->end(), latencies_.begin(), latencies_.end());
latencies_.clear();
*infly += static_cast<int>(sendTime_.size());
}

private:
void onConnection(const TcpConnectionPtr& conn)
{
if (conn->connected())
{
LOG_INFO << name_ << " connected";
if (tcpNoDelay_)
conn->setTcpNoDelay(true);
conn_ = conn;
send(pipelines_);
}
else
{
LOG_INFO << name_ << " disconnected";
conn_.reset();
// FIXME: exit
}
}

void send(int n)
{
Timestamp now(Timestamp::now());
Buffer requests;
for (int i = 0; i < n; ++i)
{
char buf[256];
const string& req = (*input_)[count_ % input_->size()];
int len = snprintf(buf, sizeof buf, "%s-%08d:%s\r\n",
name_.c_str(), count_, req.c_str());
requests.append(buf, len);
sendTime_[count_] = now;
++count_;
}

conn_->send(&requests);
}

void onMessage(const TcpConnectionPtr& conn, Buffer* buf, Timestamp recvTime)
{
size_t len = buf->readableBytes();
while (len >= kCells + 2)
{
const char* crlf = buf->findCRLF();
if (crlf)
{
string response(buf->peek(), crlf);
buf->retrieveUntil(crlf + 2);
len = buf->readableBytes();
if (verify(response, recvTime))
{
send(1);
}
else
{
LOG_ERROR << "Bad response:" << response;
conn->shutdown();
break;
}
}
else if (len > 100) // id + ":" + kCells + "\r\n"
{
LOG_ERROR << "Line is too long!";
conn->shutdown();
break;
}
else
{
break;
}
}
}

bool verify(const string& response, Timestamp recvTime)
{
size_t colon = response.find(':');
if (colon != string::npos)
{
size_t dash = response.find('-');
if (dash != string::npos && dash < colon)
{
int id = atoi(response.c_str()+dash+1);
boost::unordered_map<int, Timestamp>::iterator sendTime = sendTime_.find(id);
if (sendTime != sendTime_.end())
{
int64_t latency_us = recvTime.microSecondsSinceEpoch() - sendTime->second.microSecondsSinceEpoch();
latencies_.push_back(static_cast<int>(latency_us));
sendTime_.erase(sendTime);
}
else
{
LOG_ERROR << "Unknown id " << id << " of " << name_;
}
}
}
// FIXME
return true;
}

const string name_;
const int pipelines_;
const bool tcpNoDelay_;
TcpClient client_;
TcpConnectionPtr conn_;
const InputPtr input_;
int count_;
boost::unordered_map<int, Timestamp> sendTime_;
std::vector<int> latencies_;
};

int getPercentile(const std::vector<int>& latencies, int percent)
{
// The Nearest Rank method
assert(latencies.size() > 0);
size_t idx = 0;
if (percent > 0)
{
idx = (latencies.size() * percent + 99) / 100 - 1;
assert(idx < latencies.size());
}
return latencies[idx];
}

void report(boost::ptr_vector<SudokuClient>* clients)
{
std::vector<int> latencies;
int infly = 0;
for (boost::ptr_vector<SudokuClient>::iterator it = clients->begin();
it != clients->end(); ++it)
{
it->report(&latencies, &infly);
}

LogStream stat;
stat << "recv " << Fmt("%6zd", latencies.size()) << " in-fly " << infly;

if (!latencies.empty())
{
std::sort(latencies.begin(), latencies.end());
int min = latencies.front();
int max = latencies.back();
int sum = std::accumulate(latencies.begin(), latencies.end(), 0);
int mean = sum / static_cast<int>(latencies.size());
int median = getPercentile(latencies, 50);
int p90 = getPercentile(latencies, 90);
int p99 = getPercentile(latencies, 99);
stat << " min " << min
<< " max " << max
<< " avg " << mean
<< " median " << median
<< " p90 " << p90
<< " p99 " << p99;
}

LOG_INFO << stat.buffer();
}

InputPtr readInput(std::istream& in)
{
boost::shared_ptr<Input> input(new Input);
std::string line;
while (getline(in, line))
{
if (line.size() == implicit_cast<size_t>(kCells))
{
input->push_back(line.c_str());
}
}
return input;
}

void runClient(const InputPtr& input,
const InetAddress& serverAddr,
int conn,
int pipelines,
bool nodelay)
{
EventLoop loop;
boost::ptr_vector<SudokuClient> clients;
for (int i = 0; i < conn; ++i)
{
Fmt f("c%04d", i+1);
string name(f.data(), f.length());
clients.push_back(new SudokuClient(&loop, serverAddr, input, name, pipelines, nodelay));
clients.back().connect();
}

loop.runEvery(1.0, boost::bind(report, &clients));
loop.loop();
}

int main(int argc, char* argv[])
{
int conn = 1;
int pipelines = 1;
bool nodelay = false;
InetAddress serverAddr("127.0.0.1", 9981);
switch (argc)
{
case 6:
nodelay = string(argv[5]) == "-n";
// FALL THROUGH
case 5:
pipelines = atoi(argv[4]);
// FALL THROUGH
case 4:
conn = atoi(argv[3]);
// FALL THROUGH
case 3:
serverAddr = InetAddress(argv[2], 9981);
// FALL THROUGH
case 2:
break;
default:
printf("Usage: %s input server_ip [connections] [pipelines] [-n]\n", argv[0]);
return 0;
}

std::ifstream in(argv[1]);
if (in)
{
InputPtr input(readInput(in));
printf("%zd requests from %s\n", input->size(), argv[1]);
runClient(input, serverAddr, conn, pipelines, nodelay);
}
else
{
printf("Cannot open %s\n", argv[1]);
}
}

0 comments on commit 1661eb1

Please sign in to comment.