Skip to content

Commit

Permalink
tests/rpc: add test for compressors
Browse files Browse the repository at this point in the history
This patch adds generic test for RPC compressors and uses it to test
lz4_compressor.
  • Loading branch information
pdziepak committed Mar 13, 2019
1 parent cb52a30 commit 86ed239
Showing 1 changed file with 225 additions and 0 deletions.
225 changes: 225 additions & 0 deletions tests/unit/rpc_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* Copyright (C) 2016 ScyllaDB
*/

#include <random>

#include "loopback_socket.hh"
#include <seastar/rpc/rpc.hh>
Expand Down Expand Up @@ -545,3 +546,227 @@ SEASTAR_THREAD_TEST_CASE(test_rpc_scheduling_connection_based_compatibility) {
});
}).get();
}

void test_compressor(std::function<std::unique_ptr<seastar::rpc::compressor>()> compressor_factory) {
using namespace seastar::rpc;

auto linearize = [&] (const auto& buffer) {
return visit(buffer.bufs,
[] (const temporary_buffer<char>& buf) {
return buf.clone();
},
[&] (const std::vector<temporary_buffer<char>>& bufs) {
auto buf = temporary_buffer<char>(buffer.size);
auto dst = buf.get_write();
for (auto& b : bufs) {
dst = std::copy_n(b.get(), b.size(), dst);
}
return buf;
}
);
};

auto split_buffer = [&] (temporary_buffer<char> b, size_t chunk_size) {
std::vector<temporary_buffer<char>> bufs;
auto src = b.get();
auto n = b.size();
while (n) {
auto this_chunk = std::min(chunk_size, n);
bufs.emplace_back(this_chunk);
std::copy_n(src, this_chunk, bufs.back().get_write());
src += this_chunk;
n -= this_chunk;
}
return bufs;
};

auto clone = [&] (const auto& buffer) {
auto c = std::decay_t<decltype(buffer)>();
c.size = buffer.size;
c.bufs = visit(buffer.bufs,
[] (const temporary_buffer<char>& buf) -> decltype(c.bufs) {
return buf.clone();
},
[] (const std::vector<temporary_buffer<char>>& bufs) -> decltype(c.bufs) {
std::vector<temporary_buffer<char>> c;
c.reserve(bufs.size());
for (auto& b : bufs) {
c.emplace_back(b.clone());
}
return c;
}
);
return c;
};

auto compressor = compressor_factory();

std::vector<std::tuple<sstring, size_t, snd_buf>> inputs;

auto eng = std::default_random_engine{std::random_device{}()};
auto dist = std::uniform_int_distribution<char>();

auto snd = snd_buf(1);
*snd.front().get_write() = 'a';
inputs.emplace_back("one byte, no headroom", 0, std::move(snd));

snd = snd_buf(1);
*snd.front().get_write() = 'a';
inputs.emplace_back("one byte, 128k of headroom", 128 * 1024, std::move(snd));

auto buf = temporary_buffer<char>(16 * 1024);
std::fill_n(buf.get_write(), 16 * 1024, 'a');

snd = snd_buf(16 * 1024);
snd.size = 16 * 1024;
snd.bufs = buf.clone();
inputs.emplace_back("single 16 kB buffer of \'a\'", 0, std::move(snd));

buf = temporary_buffer<char>(16 * 1024);
std::generate_n(buf.get_write(), 16 * 1024, [&] { return dist(eng); });

snd = snd_buf(16 * 1024);
snd.size = 16 * 1024;
snd.bufs = buf.clone();
inputs.emplace_back("single 16 kB buffer of random", 0, std::move(snd));

buf = temporary_buffer<char>(1 * 1024 * 1024);
std::fill_n(buf.get_write(), 1 * 1024 * 1024, 'a');

snd = snd_buf();
snd.size = 1 * 1024 * 1024;
snd.bufs = split_buffer(buf.clone(), 128 * 1024 - 128);
inputs.emplace_back("1 MB buffer of \'a\' split into 128 kB - 128", 0, std::move(snd));

snd = snd_buf();
snd.size = 1 * 1024 * 1024;
snd.bufs = split_buffer(buf.clone(), 128 * 1024);
inputs.emplace_back("1 MB buffer of \'a\' split into 128 kB", 0, std::move(snd));

buf = temporary_buffer<char>(1 * 1024 * 1024);
std::generate_n(buf.get_write(), 1 * 1024 * 1024, [&] { return dist(eng); });

snd = snd_buf();
snd.size = 1 * 1024 * 1024;
snd.bufs = split_buffer(buf.clone(), 128 * 1024);
inputs.emplace_back("1 MB buffer of random split into 128 kB", 0, std::move(snd));

buf = temporary_buffer<char>(1 * 1024 * 1024 + 1);
std::fill_n(buf.get_write(), 1 * 1024 * 1024 + 1, 'a');

snd = snd_buf();
snd.size = 1 * 1024 * 1024 + 1;
snd.bufs = split_buffer(buf.clone(), 128 * 1024);
inputs.emplace_back("1 MB + 1B buffer of \'a\' split into 128 kB", 0, std::move(snd));

buf = temporary_buffer<char>(1 * 1024 * 1024 + 1);
std::generate_n(buf.get_write(), 1 * 1024 * 1024 + 1, [&] { return dist(eng); });

snd = snd_buf();
snd.size = 1 * 1024 * 1024 + 1;
snd.bufs = split_buffer(buf.clone(), 128 * 1024);
inputs.emplace_back("16 MB + 1 B buffer of random split into 128 kB", 0, std::move(snd));


std::vector<std::tuple<sstring, std::function<rcv_buf(snd_buf)>>> transforms {
{ "identity", [] (snd_buf snd) {
rcv_buf rcv;
rcv.size = snd.size;
rcv.bufs = std::move(snd.bufs);
return rcv;
} },
{ "linearized", [&linearize] (snd_buf snd) {
rcv_buf rcv;
rcv.size = snd.size;
rcv.bufs = linearize(snd);
return rcv;
} },
{ "split 1 B", [&] (snd_buf snd) {
rcv_buf rcv;
rcv.size = snd.size;
rcv.bufs = split_buffer(linearize(snd), 1);
return rcv;
} },
{ "split 129 B", [&] (snd_buf snd) {
rcv_buf rcv;
rcv.size = snd.size;
rcv.bufs = split_buffer(linearize(snd), 129);
return rcv;
} },
{ "split 4 kB", [&] (snd_buf snd) {
rcv_buf rcv;
rcv.size = snd.size;
rcv.bufs = split_buffer(linearize(snd), 4096);
return rcv;
} },
{ "split 4 kB - 128", [&] (snd_buf snd) {
rcv_buf rcv;
rcv.size = snd.size;
rcv.bufs = split_buffer(linearize(snd), 4096 - 128);
return rcv;
} },
};

auto sanity_check = [&] (const auto& buffer) {
auto actual_size = visit(buffer.bufs,
[] (const temporary_buffer<char>& buf) {
return buf.size();
},
[] (const std::vector<temporary_buffer<char>>& bufs) {
return boost::accumulate(bufs, size_t(0), [] (size_t sz, const temporary_buffer<char>& buf) {
return sz + buf.size();
});
}
);
BOOST_CHECK_EQUAL(actual_size, buffer.size);
};

for (auto& in : inputs) {
BOOST_TEST_MESSAGE("Input: " << std::get<0>(in));
auto headroom = std::get<1>(in);
auto compressed = compressor->compress(headroom, clone(std::get<2>(in)));
sanity_check(compressed);

// Remove headroom
BOOST_CHECK_GE(compressed.size, headroom);
compressed.size -= headroom;
visit(compressed.bufs,
[&] (temporary_buffer<char>& buf) {
BOOST_CHECK_GE(buf.size(), headroom);
buf.trim_front(headroom);
},
[&] (std::vector<temporary_buffer<char>>& bufs) {
while (headroom) {
BOOST_CHECK(!bufs.empty());
auto to_remove = std::min(bufs.front().size(), headroom);
bufs.front().trim_front(to_remove);
if (bufs.front().empty() && bufs.size() > 1) {
bufs.erase(bufs.begin());
}
headroom -= to_remove;
}
}
);

auto in_l = linearize(std::get<2>(in));

for (auto& t : transforms) {
BOOST_TEST_MESSAGE(" Transform: " << std::get<0>(t));
auto received = std::get<1>(t)(clone(compressed));

auto decompressed = compressor->decompress(std::move(received));
sanity_check(decompressed);

BOOST_CHECK_EQUAL(decompressed.size, std::get<2>(in).size);

auto out_l = linearize(decompressed);

BOOST_CHECK_EQUAL(in_l.size(), out_l.size());
BOOST_CHECK(in_l == out_l);
}
}
}

SEASTAR_THREAD_TEST_CASE(test_lz4_compressor) {
test_compressor([] { return std::make_unique<rpc::lz4_compressor>(); });
}

0 comments on commit 86ed239

Please sign in to comment.