Skip to content

Commit

Permalink
Stress test
Browse files Browse the repository at this point in the history
  • Loading branch information
mpenick committed Jun 3, 2014
1 parent e1fee52 commit 845d1f0
Show file tree
Hide file tree
Showing 8 changed files with 188 additions and 20 deletions.
2 changes: 1 addition & 1 deletion include/cassandra.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ typedef enum CassErrorSource_ {
XX(CASS_ERROR_SOURCE_LIB, CASS_ERROR_LIB_INDEX_OUT_OF_BOUNDS, 13, "Index out of bounds") \
XX(CASS_ERROR_SOURCE_LIB, CASS_ERROR_LIB_INVALID_ITEM_COUNT, 14, "Invalid item count") \
XX(CASS_ERROR_SOURCE_LIB, CASS_ERROR_LIB_INVALID_VALUE_TYPE, 15, "Invalid value type") \
XX(CASS_ERROR_SOURCE_LIB, CASS_ERROR_LIB_REQUEST_TIMEOUT, 16, "Request timed out") \
XX(CASS_ERROR_SOURCE_LIB, CASS_ERROR_LIB_REQUEST_TIMED_OUT, 16, "Request timed out") \
XX(CASS_ERROR_SOURCE_LIB, CASS_ERROR_UNABLE_TO_SET_KEYSPACE, 17, "Unable to set keyspace") \
XX(CASS_ERROR_SOURCE_SERVER, CASS_ERROR_SERVER_SERVER_ERROR, 0x0000, "Server error") \
XX(CASS_ERROR_SOURCE_SERVER, CASS_ERROR_SERVER_PROTOCOL_ERROR, 0x000A, "Protocol error") \
Expand Down
7 changes: 4 additions & 3 deletions src/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,17 @@ struct Config {
compression_(0),
max_schema_agreement_wait_(10),
control_connection_timeout_(10),
thread_count_io_(1),
thread_count_io_(4),
thread_count_callback_(4),
queue_size_io_(4096),
queue_size_event_(4096),
queue_size_log_(4096),
core_connections_per_host_(1),
max_connections_per_host_(1),
core_connections_per_host_(2),
max_connections_per_host_(4),
reconnect_wait_(10000),
max_simultaneous_creation_(1),
max_pending_requests_(128 * max_connections_per_host_),
// TODO(mpenick): Determine good timeout durations
connect_timeout_(10000),
write_timeout_(10000),
read_timeout_(10000),
Expand Down
2 changes: 1 addition & 1 deletion src/request_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class RequestHandler : public ResponseCallback {

virtual void on_timeout() {
// TODO(mpenick): Get the host for errors
future_->set_error(CASS_ERROR_LIB_REQUEST_TIMEOUT, "Request timed out");
future_->set_error(CASS_ERROR_LIB_REQUEST_TIMED_OUT, "Request timed out");
}

RequestFuture* future() { return future_; }
Expand Down
4 changes: 2 additions & 2 deletions test/integration_tests/src/basics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ BOOST_AUTO_TEST_CASE(rows_in_rows_out)
{
test_utils::execute_query(session.get(),
str(boost::format("CREATE TABLE %s (tweet_id bigint PRIMARY KEY, t1 bigint, t2 bigint, t3 bigint);") % test_utils::SIMPLE_TABLE),
nullptr, 0, consistency);
nullptr, consistency);

constexpr int num_rows = 100000;

Expand All @@ -201,7 +201,7 @@ BOOST_AUTO_TEST_CASE(rows_in_rows_out)

std::string select_query(str(boost::format("SELECT tweet_id, t1, t2, t3 FROM %s LIMIT %d;") % test_utils::SIMPLE_TABLE % num_rows));
test_utils::StackPtr<const CassResult> result;
test_utils::execute_query(session.get(), select_query, &result, 0, consistency);
test_utils::execute_query(session.get(), select_query, &result, consistency);
BOOST_REQUIRE(cass_result_row_count(result.get()) == num_rows);
BOOST_REQUIRE(cass_result_column_count(result.get()) == 4);

Expand Down
14 changes: 7 additions & 7 deletions test/integration_tests/src/collections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ void test_simple_insert_collection_all_types(CassCluster* cluster, CassValueType
{
std::vector<test_utils::Uuid> values;
for(int i = 0; i < 3; ++i) {
values.push_back(test_utils::uuid_generate_time());
values.push_back(test_utils::generate_time_uuid());
}
test_simple_insert_collection<test_utils::Uuid>(session.get(), type, CASS_VALUE_TYPE_UUID, values);
}
Expand Down Expand Up @@ -248,9 +248,9 @@ void test_simple_insert_map_all_types(CassCluster* cluster) {
test_simple_insert_map<CassInet, CassInet>(session.get(), CASS_VALUE_TYPE_INET, CASS_VALUE_TYPE_INET, values);
}
{
std::map<test_utils::Uuid, test_utils::Uuid> values = { { test_utils::uuid_generate_time(), test_utils::uuid_generate_random() },
{ test_utils::uuid_generate_time(), test_utils::uuid_generate_random() },
{ test_utils::uuid_generate_time(), test_utils::uuid_generate_random() } };
std::map<test_utils::Uuid, test_utils::Uuid> values = { { test_utils::generate_time_uuid(), test_utils::generate_random_uuid() },
{ test_utils::generate_time_uuid(), test_utils::generate_random_uuid() },
{ test_utils::generate_time_uuid(), test_utils::generate_random_uuid() } };
test_simple_insert_map<test_utils::Uuid, test_utils::Uuid>(session.get(), CASS_VALUE_TYPE_UUID, CASS_VALUE_TYPE_UUID, values);
}
{
Expand All @@ -273,9 +273,9 @@ void test_simple_insert_map_all_types(CassCluster* cluster) {
test_simple_insert_map<CassString, cass_int32_t>(session.get(), CASS_VALUE_TYPE_VARCHAR, CASS_VALUE_TYPE_INT, values);
}
{
std::map<test_utils::Uuid, CassString> values = { { test_utils::uuid_generate_time(), cass_string_init("123") },
{ test_utils::uuid_generate_time(), cass_string_init("456") },
{ test_utils::uuid_generate_time(), cass_string_init("789") } };
std::map<test_utils::Uuid, CassString> values = { { test_utils::generate_time_uuid(), cass_string_init("123") },
{ test_utils::generate_time_uuid(), cass_string_init("456") },
{ test_utils::generate_time_uuid(), cass_string_init("789") } };
test_simple_insert_map<test_utils::Uuid, CassString>(session.get(), CASS_VALUE_TYPE_UUID, CASS_VALUE_TYPE_VARCHAR, values);
}
}
Expand Down
128 changes: 128 additions & 0 deletions test/integration_tests/src/stress.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
#define BOOST_TEST_DYN_LINK
#ifdef STAND_ALONE
# define BOOST_TEST_MODULE cassandra
#endif

#include "cassandra.h"
#include "test_utils.hpp"

#include <algorithm>
#include <future>

#include <boost/test/unit_test.hpp>
#include <boost/test/debug.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/cstdint.hpp>
#include <boost/format.hpp>
#include <boost/thread.hpp>

struct STRESS_CCM_SETUP : test_utils::CCM_SETUP {
STRESS_CCM_SETUP() : CCM_SETUP(3, 0) {}
};

BOOST_FIXTURE_TEST_SUITE(stress, STRESS_CCM_SETUP)

void bind_and_execute_insert(CassSession* session, CassStatement* statement) {
std::chrono::system_clock::time_point now(std::chrono::system_clock::now());
std::chrono::milliseconds event_time(std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()));
std::string text_sample(test_utils::string_from_time_point(now));

BOOST_REQUIRE(cass_statement_bind_uuid(statement, 0,
test_utils::generate_time_uuid().uuid) == CASS_OK);
BOOST_REQUIRE(cass_statement_bind_int64(statement, 1,
event_time.count()) == CASS_OK);
BOOST_REQUIRE(cass_statement_bind_string(statement, 2,
cass_string_init2(text_sample.data(), text_sample.size())) == CASS_OK);

test_utils::StackPtr<CassFuture> future(cass_session_execute(session, statement));
cass_future_wait(future.get());
CassError code = cass_future_error_code(future.get());
if(code != CASS_OK && code != CASS_ERROR_LIB_REQUEST_TIMED_OUT) { // Timeout is okay
CassString message = cass_future_error_message(future.get());
BOOST_FAIL("Error occured during insert '" << std::string(message.data, message.length) << "' (" << code << ")");
}
}

void insert_task(CassSession* session, const std::string& query, CassConsistency consistency, int rows_per_id) {
for(int i = 0; i < rows_per_id; ++i) {
test_utils::StackPtr<CassStatement> statement(cass_statement_new(cass_string_init(query.c_str()), 3, consistency));
bind_and_execute_insert(session, statement.get());
}
}

void insert_prepared_task(CassSession* session, const CassPrepared* prepared, CassConsistency consistency, int rows_per_id) {
for(int i = 0; i < rows_per_id; ++i) {
test_utils::StackPtr<CassStatement> statement(cass_prepared_bind(prepared, 3, consistency));
bind_and_execute_insert(session, statement.get());
}
}

void select_task(CassSession* session, const std::string& query, CassConsistency consistency, int num_iterations) {
test_utils::StackPtr<CassStatement> statement(cass_statement_new(cass_string_init(query.c_str()), 0, consistency));
for(int i = 0; i < num_iterations; ++i) {
test_utils::StackPtr<CassFuture> future(cass_session_execute(session, statement.get()));
cass_future_wait(future.get());

CassError code = cass_future_error_code(future.get());
if(code != CASS_OK
&& code != CASS_ERROR_LIB_REQUEST_TIMED_OUT
&& code != CASS_ERROR_SERVER_READ_TIMEOUT) { // Timeout is okay
CassString message = cass_future_error_message(future.get());
BOOST_FAIL("Error occured during select '" << std::string(message.data, message.length) << "' (" << code << ")");
}

if(code == CASS_OK) {
test_utils::StackPtr<const CassResult> result(cass_future_get_result(future.get()));
BOOST_REQUIRE(cass_result_row_count(result.get()) > 0);
}
}
}

BOOST_AUTO_TEST_CASE(insert_and_select)
{
test_utils::StackPtr<CassFuture> session_future;
test_utils::StackPtr<CassSession> session(cass_cluster_connect(cluster, session_future.address_of()));
test_utils::wait_and_check_error(session_future.get());

test_utils::execute_query(session.get(), "CREATE KEYSPACE tester WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 3};");
test_utils::execute_query(session.get(), "USE tester;");

std::string table_name = str(boost::format("table_%s") % test_utils::generate_unique_str());

test_utils::execute_query(session.get(), str(boost::format(test_utils::CREATE_TABLE_TIME_SERIES) % table_name));

std::string insert_query = str(boost::format("INSERT INTO %s (id, event_time, text_sample) VALUES (?, ?, ?)") % table_name);
std::string select_query = str(boost::format("SELECT * FROM %s LIMIT 10000") % table_name);

test_utils::StackPtr<CassFuture> prepared_future(cass_session_prepare(session.get(),
cass_string_init2(insert_query.data(), insert_query.size())));

test_utils::wait_and_check_error(prepared_future.get());
test_utils::StackPtr<const CassPrepared> prepared(cass_future_get_prepared(prepared_future.get()));

int rows_per_id = 100;
int num_iterations = 10;

std::vector<std::future<void>> tasks;

for(int i = 0; i < 10; ++i) {
tasks.push_back(std::async(std::launch::async, insert_task, session.get(), insert_query, CASS_CONSISTENCY_QUORUM, rows_per_id));
tasks.push_back(std::async(std::launch::async, select_task, session.get(), select_query, CASS_CONSISTENCY_QUORUM, num_iterations));
tasks.push_back(std::async(std::launch::async, insert_prepared_task, session.get(), prepared.get(), CASS_CONSISTENCY_QUORUM, rows_per_id));

tasks.push_back(std::async(std::launch::async, select_task, session.get(), select_query, CASS_CONSISTENCY_QUORUM, num_iterations));
tasks.push_back(std::async(std::launch::async, insert_task, session.get(), insert_query, CASS_CONSISTENCY_QUORUM, rows_per_id));
tasks.push_back(std::async(std::launch::async, insert_prepared_task, session.get(), prepared.get(), CASS_CONSISTENCY_QUORUM, rows_per_id));
tasks.push_back(std::async(std::launch::async, insert_task, session.get(), insert_query, CASS_CONSISTENCY_QUORUM, rows_per_id));

tasks.push_back(std::async(std::launch::async, insert_prepared_task, session.get(), prepared.get(), CASS_CONSISTENCY_QUORUM, rows_per_id));
tasks.push_back(std::async(std::launch::async, insert_prepared_task, session.get(), prepared.get(), CASS_CONSISTENCY_QUORUM, rows_per_id));
tasks.push_back(std::async(std::launch::async, select_task, session.get(), select_query, CASS_CONSISTENCY_QUORUM, num_iterations));
}

for(auto& task : tasks) {
task.wait();
}
}

BOOST_AUTO_TEST_SUITE_END()
12 changes: 9 additions & 3 deletions test/integration_tests/src/test_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,8 @@ CCM_SETUP::~CCM_SETUP() {
void execute_query(CassSession* session,
const std::string& query,
StackPtr<const CassResult>* result,
cass_size_t parameter_count,
CassConsistency consistency) {
StackPtr<CassStatement> statement(cass_statement_new(cass_string_init(query.c_str()), parameter_count, consistency));
StackPtr<CassStatement> statement(cass_statement_new(cass_string_init(query.c_str()), 0, consistency));
StackPtr<CassFuture> future(cass_session_execute(session, statement.get()));
wait_and_check_error(future.get());
if(result != nullptr) {
Expand All @@ -96,9 +95,16 @@ void wait_and_check_error(CassFuture* future, cass_duration_t timeout) {
CassError code = cass_future_error_code(future);
if(code != CASS_OK) {
CassString message = cass_future_error_message(future);
BOOST_FAIL("Error occured during operation " << std::string(message.data, message.length) << " (" << code << ")");
BOOST_FAIL("Error occured during query '" << std::string(message.data, message.length) << "' (" << code << ")");
}
}

std::string string_from_time_point(std::chrono::system_clock::time_point time) {
std::time_t t = std::chrono::system_clock::to_time_t(time);
char buffer[26];
ctime_r(&t, buffer);
return std::string(buffer, 24);
}

//-----------------------------------------------------------------------------------
} // End of namespace test_utils
39 changes: 36 additions & 3 deletions test/integration_tests/src/test_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <boost/asio/ip/address.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/algorithm/string/replace.hpp>

#include "cassandra.h"

Expand Down Expand Up @@ -67,6 +68,13 @@ struct StackPtrFree<CassCollection> {
}
};

template<>
struct StackPtrFree<const CassPrepared> {
static void free(const CassPrepared* ptr) {
cass_prepared_free(ptr);
}
};

template<class T>
class StackPtr {
public:
Expand Down Expand Up @@ -367,7 +375,6 @@ const char* get_value_type(CassValueType type);
void execute_query(CassSession* session,
const std::string& query,
StackPtr<const CassResult>* result = nullptr,
cass_size_t parameter_count = 0,
CassConsistency consistency = CASS_CONSISTENCY_ONE);

void wait_and_check_error(CassFuture* future, cass_duration_t timeout = 10 * ONE_SECOND_IN_MICROS);
Expand All @@ -390,18 +397,44 @@ inline CassDecimal decimal_from_scale_and_bytes(cass_int32_t scale, CassBytes by
return decimal;
}

inline Uuid uuid_generate_time() {
inline Uuid generate_time_uuid() {
Uuid uuid;
cass_uuid_generate_time(uuid.uuid);
return uuid;
}

inline Uuid uuid_generate_random() {
inline Uuid generate_random_uuid() {
Uuid uuid;
cass_uuid_generate_random(uuid.uuid);
return uuid;
}

inline std::string generate_unique_str() {
Uuid uuid;
cass_uuid_generate_time(uuid.uuid);
char buffer[CASS_UUID_STRING_LENGTH];
cass_uuid_string(uuid, buffer);
return boost::replace_all_copy(std::string(buffer), "-", "");
}

std::string string_from_time_point(std::chrono::system_clock::time_point time);

constexpr const char* CREATE_TABLE_TIME_SERIES =
"create table %s ("
"id uuid,"
"event_time timestamp,"
"text_sample text,"
"int_sample int,"
"bigint_sample bigint,"
"float_sample float,"
"double_sample double,"
"decimal_sample decimal,"
"blob_sample blob,"
"boolean_sample boolean,"
"timestamp_sample timestamp,"
"inet_sample inet,"
"PRIMARY KEY(id, event_time));";

extern const std::string CREATE_KEYSPACE_SIMPLE_FORMAT;
extern const std::string CREATE_KEYSPACE_GENERIC_FORMAT;
extern const std::string SIMPLE_KEYSPACE;
Expand Down

0 comments on commit 845d1f0

Please sign in to comment.