Skip to content

Commit

Permalink
#56 run lock escalation on a background thread
Browse files Browse the repository at this point in the history
  • Loading branch information
prohaska committed Oct 2, 2013
1 parent 800921a commit a840ce4
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 50 deletions.
41 changes: 29 additions & 12 deletions locktree/locktree.h
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,6 @@ class locktree {
void *extra);
int iterate_pending_lock_requests(lock_request_iterate_callback cb, void *extra);

void escalation_wait(uint64_t t);

private:
static const uint64_t DEFAULT_MAX_LOCK_MEMORY = 64L * 1024 * 1024;
static const uint64_t DEFAULT_LOCK_WAIT_TIME = 0;
Expand All @@ -321,15 +319,6 @@ class locktree {
uint64_t m_current_lock_memory;
memory_tracker m_mem_tracker;

// statistics about lock escalation.
uint64_t m_escalation_count;
tokutime_t m_escalation_time;
uint64_t m_escalation_latest_result;
uint64_t m_wait_escalation_count;
uint64_t m_wait_escalation_time;
uint64_t m_long_wait_escalation_count;
uint64_t m_long_wait_escalation_time;

struct lt_counters m_lt_counters;

// lock wait time for blocking row locks, in ms
Expand Down Expand Up @@ -367,12 +356,40 @@ class locktree {
void locktree_map_remove(locktree *lt);

// effect: Runs escalation on all locktrees.
// requires: Manager's mutex is held
void run_escalation(void);

static int find_by_dict_id(locktree *const &lt, const DICTIONARY_ID &dict_id);

void escalator_init(void);

void escalator_destroy(void);

// effect: Add time t to the escalator's wait time statistics
void add_escalator_wait_time(uint64_t t);

// effect: escalate's the locks in each locktree
// requires: manager's mutex is held
void escalate_all_locktrees(void);

// statistics about lock escalation.
uint64_t m_escalation_count;
tokutime_t m_escalation_time;
uint64_t m_escalation_latest_result;
uint64_t m_wait_escalation_count;
uint64_t m_wait_escalation_time;
uint64_t m_long_wait_escalation_count;
uint64_t m_long_wait_escalation_time;

toku_mutex_t m_escalator_mutex;
toku_cond_t m_escalator_work; // signal the escalator to run
toku_cond_t m_escalator_done; // signal that escalation is done
bool m_escalator_killed;
toku_pthread_t m_escalator_id;

friend class manager_unit_test;

public:
void escalator_work(void);
};
ENSURE_POD(manager);

Expand Down
146 changes: 108 additions & 38 deletions locktree/manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,7 @@ namespace toku {
void locktree::manager::create(lt_create_cb create_cb, lt_destroy_cb destroy_cb, lt_escalate_cb escalate_cb, void *escalate_extra) {
m_max_lock_memory = DEFAULT_MAX_LOCK_MEMORY;
m_current_lock_memory = 0;
m_escalation_count = 0;
m_escalation_time = 0;
m_wait_escalation_count = 0;
m_wait_escalation_time = 0;
m_long_wait_escalation_count = 0;
m_long_wait_escalation_time = 0;
m_escalation_latest_result = 0;
escalator_init();
m_lock_wait_time_ms = DEFAULT_LOCK_WAIT_TIME;
m_mem_tracker.set_manager(this);

Expand All @@ -126,9 +120,11 @@ void locktree::manager::create(lt_create_cb create_cb, lt_destroy_cb destroy_cb,
}

void locktree::manager::destroy(void) {
escalator_destroy();
invariant(m_current_lock_memory == 0);
invariant(m_locktree_map.size() == 0);
m_locktree_map.destroy();
toku_mutex_destroy(&m_mutex);
}

void locktree::manager::mutex_lock(void) {
Expand Down Expand Up @@ -319,29 +315,31 @@ void locktree::manager::run_escalation_for_test(void) {
run_escalation();
}

// effect: escalate's the locks in each locktree
// requires: manager's mutex is held
void locktree::manager::run_escalation(void) {
// there are too many row locks in the system and we need to tidy up.
//
// a simple implementation of escalation does not attempt
// to reduce the memory foot print of each txn's range buffer.
// doing so would require some layering hackery (or a callback)
// and more complicated locking. for now, just escalate each
// locktree individually, in-place.
tokutime_t t0 = toku_time_now();
size_t num_locktrees = m_locktree_map.size();
for (size_t i = 0; i < num_locktrees; i++) {
locktree *lt;
int r = m_locktree_map.fetch(i, &lt);
invariant_zero(r);
lt->escalate(m_lt_escalate_callback, m_lt_escalate_callback_extra);
uint64_t t0 = toku_current_time_microsec();
if (1) {
// run escalation on the background thread
int r;
toku_mutex_lock(&m_escalator_mutex);
toku_cond_broadcast(&m_escalator_work);
struct timeval tv;
r = gettimeofday(&tv, 0);
assert_zero(r);
uint64_t t = tv.tv_sec * 1000000 + tv.tv_usec;
t += 100000; // 100 milliseconds
toku_timespec_t wakeup_time;
wakeup_time.tv_sec = t / 1000000;
wakeup_time.tv_nsec = (t % 1000000) * 1000;
r = toku_cond_timedwait(&m_escalator_done, &m_escalator_mutex, &wakeup_time);
toku_mutex_unlock(&m_escalator_mutex);
} else {
// run escalation on this thread
mutex_lock();
escalate_all_locktrees();
mutex_unlock();
}
tokutime_t t1 = toku_time_now();

m_escalation_count++;
m_escalation_time += (t1 - t0);
m_escalation_latest_result = m_current_lock_memory;
uint64_t t1 = toku_current_time_microsec();
add_escalator_wait_time(t1 - t0);
}

void locktree::manager::memory_tracker::set_manager(manager *mgr) {
Expand All @@ -354,17 +352,10 @@ int locktree::manager::memory_tracker::check_current_lock_constraints(void) {
// mutex and check again. if we're still out of locks, run escalation.
// return an error if we're still out of locks after escalation.
if (out_of_locks()) {
uint64_t t0 = toku_current_time_microsec();
m_mgr->mutex_lock();
m_mgr->run_escalation();
if (out_of_locks()) {
m_mgr->run_escalation();
if (out_of_locks()) {
r = TOKUDB_OUT_OF_LOCKS;
}
r = TOKUDB_OUT_OF_LOCKS;
}
uint64_t t1 = toku_current_time_microsec();
m_mgr->escalation_wait(t1 - t0);
m_mgr->mutex_unlock();
}
return r;
}
Expand Down Expand Up @@ -411,13 +402,92 @@ int locktree::manager::iterate_pending_lock_requests(
return r;
}

void locktree::manager::escalation_wait(uint64_t t) {
static void *escalator_thread(void *arg) {
locktree::manager *mgr = reinterpret_cast<locktree::manager*>(arg);
mgr->escalator_work();
return arg;
}

void locktree::manager::escalator_init(void) {
ZERO_STRUCT(m_escalator_mutex);
toku_mutex_init(&m_escalator_mutex, nullptr);
toku_cond_init(&m_escalator_work, nullptr);
toku_cond_init(&m_escalator_done, nullptr);
m_escalator_killed = false;
m_escalation_count = 0;
m_escalation_time = 0;
m_wait_escalation_count = 0;
m_wait_escalation_time = 0;
m_long_wait_escalation_count = 0;
m_long_wait_escalation_time = 0;
m_escalation_latest_result = 0;
int r = toku_pthread_create(&m_escalator_id, nullptr, escalator_thread, this);
assert_zero(r);
}

void locktree::manager::escalator_destroy(void) {
toku_mutex_lock(&m_escalator_mutex);
m_escalator_killed = true;
toku_cond_broadcast(&m_escalator_work);
toku_mutex_unlock(&m_escalator_mutex);
void *ret;
int r = toku_pthread_join(m_escalator_id, &ret);
assert_zero(r);
toku_mutex_destroy(&m_escalator_mutex);
toku_cond_destroy(&m_escalator_work);
toku_cond_destroy(&m_escalator_done);
}

void locktree::manager::escalate_all_locktrees(void) {
// there are too many row locks in the system and we need to tidy up.
//
// a simple implementation of escalation does not attempt
// to reduce the memory foot print of each txn's range buffer.
// doing so would require some layering hackery (or a callback)
// and more complicated locking. for now, just escalate each
// locktree individually, in-place.
tokutime_t t0 = toku_time_now();
size_t num_locktrees = m_locktree_map.size();
for (size_t i = 0; i < num_locktrees; i++) {
locktree *lt;
int r = m_locktree_map.fetch(i, &lt);
invariant_zero(r);
lt->escalate(m_lt_escalate_callback, m_lt_escalate_callback_extra);
}
tokutime_t t1 = toku_time_now();

toku_mutex_lock(&m_escalator_mutex);
m_escalation_count++;
m_escalation_time += (t1 - t0);
m_escalation_latest_result = m_current_lock_memory;
toku_mutex_unlock(&m_escalator_mutex);
}

void locktree::manager::add_escalator_wait_time(uint64_t t) {
toku_mutex_lock(&m_escalator_mutex);
m_wait_escalation_count += 1;
m_wait_escalation_time += t;
if (t >= 1000000) {
m_long_wait_escalation_count += 1;
m_long_wait_escalation_time += t;
}
toku_mutex_unlock(&m_escalator_mutex);
}

void locktree::manager::escalator_work(void) {
toku_mutex_lock(&m_escalator_mutex);
while (!m_escalator_killed) {
toku_cond_wait(&m_escalator_work, &m_escalator_mutex);
if (!m_escalator_killed) {
toku_mutex_unlock(&m_escalator_mutex);
mutex_lock();
escalate_all_locktrees();
mutex_unlock();
toku_mutex_lock(&m_escalator_mutex);
toku_cond_broadcast(&m_escalator_done);
}
}
toku_mutex_unlock(&m_escalator_mutex);
}

#define STATUS_INIT(k,c,t,l,inc) TOKUDB_STATUS_INIT(status, k, c, t, "locktree: " l, inc)
Expand Down

0 comments on commit a840ce4

Please sign in to comment.