Skip to content

Commit

Permalink
tablet: acquire/release locks in batches
Browse files Browse the repository at this point in the history
Prior to this patch, each row operation required a separate call into
the LockManager to acquire or release its corresponding row lock. This
required atomic operations on the LockManager, each of which would
bounce the cacheline containing the spinlock(s) between the prepare
thread (acquiring locks) and the apply threads (releasing locks). Atomic
operations on cachelines in remote cores are quite expensive, so this
caused quite a lot of CPU usage (locking related methods took more CPU
than the actual work on prepare/apply threads)

This patch changes the LockManager API somewhat to allow
acquiring/releasing the locks in bulk. WriteOp now contains a single
ScopedRowLock object to hold all of the required locks, instead of a
separate ScopedRowLock per row. The acquire/release paths are also
optimized now to get better instruction level parallelism in the
hashtable lookups -- prefetches are used to bring the appropriate cache
lines into CPU cache prior to reading them.

This patch also simplifies the locking to use a single lockmanager-wide lock
instead of per-bucket locks. Per-bucket locks would still be possible but
don't seem to be beneficial, and would complicate the codepath quite a bit.
Removing them also saves some significant memory from the LockManager hash
tables.

Benchmarked by running:

  $ perf stat ./build/release/bin/kudu tserver run -fs-wal-dir /tmp/ts \
    -enable_maintenance_manager=0 -unlock-unsafe-flags

and
  $ kudu perf loadgen localhost -num_rows_per_thread=10000000 -num_threads=8

This workload ends up somewhat client-bound on my machine, so wallclock time
isn't improved much, but the tserver CPU consumption is reduced by about 23%:

Before:

 Performance counter stats for './build/release/bin/kudu tserver run -fs-wal-dir /tmp/ts -enable_maintenance_manager=0 -unlock-unsafe-flags':

         177786.37 msec task-clock                #    7.648 CPUs utilized
            215350      context-switches          #    0.001 M/sec
             55653      cpu-migrations            #    0.313 K/sec
           3219417      page-faults               #    0.018 M/sec
      724369004287      cycles                    #    4.074 GHz                      (83.32%)
      142799406201      stalled-cycles-frontend   #   19.71% frontend cycles idle     (83.36%)
      109903159567      stalled-cycles-backend    #   15.17% backend cycles idle      (83.40%)
      719706215568      instructions              #    0.99  insn per cycle
                                                  #    0.20  stalled cycles per insn  (83.30%)
      131445739053      branches                  #  739.347 M/sec                    (83.31%)
         479779584      branch-misses             #    0.37% of all branches          (83.32%)

     165.187526000 seconds user
      12.866637000 seconds sys

After:

 Performance counter stats for './build/release/bin/kudu tserver run -fs-wal-dir /tmp/ts -enable_maintenance_manager=0 -unlock-unsafe-flags':

         145986.70 msec task-clock                #    6.063 CPUs utilized
            202600      context-switches          #    0.001 M/sec
             51442      cpu-migrations            #    0.352 K/sec
           3368490      page-faults               #    0.023 M/sec
      597915142173      cycles                    #    4.096 GHz                      (83.51%)
       58333772996      stalled-cycles-frontend   #    9.76% frontend cycles idle     (83.35%)
      104785221789      stalled-cycles-backend    #   17.53% backend cycles idle      (83.18%)
      690655964982      instructions              #    1.16  insn per cycle
                                                  #    0.15  stalled cycles per insn  (83.38%)
      126988529873      branches                  #  869.864 M/sec                    (83.40%)
         469031328      branch-misses             #    0.37% of all branches          (83.17%)

     134.072172000 seconds user
      12.192747000 seconds sys

Change-Id: I3cb724e953ecdf188a35181c2f91b721b3416524
Reviewed-on: http://gerrit.cloudera.org:8080/16169
Tested-by: Todd Lipcon <[email protected]>
Reviewed-by: Andrew Wong <[email protected]>
  • Loading branch information
toddlipcon committed Jul 16, 2020
1 parent e81caa8 commit 713879a
Show file tree
Hide file tree
Showing 8 changed files with 252 additions and 247 deletions.
109 changes: 51 additions & 58 deletions src/kudu/tablet/lock_manager-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

#include "kudu/gutil/macros.h"
#include "kudu/gutil/stringprintf.h"
#include "kudu/util/array_view.h"
#include "kudu/util/env.h"
#include "kudu/util/slice.h"
#include "kudu/util/stopwatch.h"
Expand All @@ -58,56 +59,66 @@ class LockManagerTest : public KuduTest {
public:
void VerifyAlreadyLocked(const Slice& key) {
LockEntry *entry;
ASSERT_EQ(LockManager::LOCK_BUSY,
lock_manager_.TryLock(key, kFakeTransaction, LockManager::LOCK_EXCLUSIVE, &entry));
ASSERT_FALSE(lock_manager_.TryLock(key, kFakeTransaction, &entry));
}

LockManager lock_manager_;
};

TEST_F(LockManagerTest, TestLockUnlockSingleRow) {
Slice key_a("a");
ScopedRowLock(&lock_manager_, kFakeTransaction, key_a, LockManager::LOCK_EXCLUSIVE);
ScopedRowLock(&lock_manager_, kFakeTransaction, key_a, LockManager::LOCK_EXCLUSIVE);
ScopedRowLock(&lock_manager_, kFakeTransaction, key_a, LockManager::LOCK_EXCLUSIVE);
Slice key_a[] = {"a"};
for (int i = 0; i < 3; i++) {
ScopedRowLock l(&lock_manager_, kFakeTransaction, key_a, LockManager::LOCK_EXCLUSIVE);
}
}

// Test if the same transaction locks the same row multiple times.
TEST_F(LockManagerTest, TestMultipleLockSameRow) {
Slice key_a("a");
Slice key_a[] = {"a"};
ScopedRowLock first_lock(&lock_manager_, kFakeTransaction, key_a, LockManager::LOCK_EXCLUSIVE);
ASSERT_EQ(LockManager::LOCK_ACQUIRED, first_lock.GetLockStatusForTests());
VerifyAlreadyLocked(key_a);
ASSERT_TRUE(first_lock.acquired());
VerifyAlreadyLocked(key_a[0]);

{
ScopedRowLock second_lock(&lock_manager_, kFakeTransaction, key_a, LockManager::LOCK_EXCLUSIVE);
ASSERT_EQ(LockManager::LOCK_ACQUIRED, second_lock.GetLockStatusForTests());
VerifyAlreadyLocked(key_a);
ASSERT_TRUE(second_lock.acquired());
VerifyAlreadyLocked(key_a[0]);
}

ASSERT_EQ(LockManager::LOCK_ACQUIRED, first_lock.GetLockStatusForTests());
VerifyAlreadyLocked(key_a);
ASSERT_TRUE(first_lock.acquired());
VerifyAlreadyLocked(key_a[0]);
}

TEST_F(LockManagerTest, TestLockUnlockMultipleRows) {
Slice key_a("a"), key_b("b");
Slice key_a[] = {"a"};
Slice key_b[] = {"b"};
for (int i = 0; i < 3; ++i) {
ScopedRowLock l1(&lock_manager_, kFakeTransaction, key_a, LockManager::LOCK_EXCLUSIVE);
ScopedRowLock l2(&lock_manager_, kFakeTransaction, key_b, LockManager::LOCK_EXCLUSIVE);
VerifyAlreadyLocked(key_a);
VerifyAlreadyLocked(key_b);
VerifyAlreadyLocked(key_a[0]);
VerifyAlreadyLocked(key_b[0]);
}
}

TEST_F(LockManagerTest, TestLockBatch) {
vector<Slice> keys = {"a", "b", "c"};
{
ScopedRowLock l1(&lock_manager_, kFakeTransaction, keys, LockManager::LOCK_EXCLUSIVE);
for (const auto& k : keys) {
VerifyAlreadyLocked(k);
}
}
}

TEST_F(LockManagerTest, TestRelockSameRow) {
Slice key_a("a");
Slice key_a[] = {"a"};
ScopedRowLock row_lock(&lock_manager_, kFakeTransaction, key_a, LockManager::LOCK_EXCLUSIVE);
VerifyAlreadyLocked(key_a);
VerifyAlreadyLocked(key_a[0]);
}

TEST_F(LockManagerTest, TestMoveLock) {
// Acquire a lock.
Slice key_a("a");
Slice key_a[] = {"a"};
ScopedRowLock row_lock(&lock_manager_, kFakeTransaction, key_a, LockManager::LOCK_EXCLUSIVE);
ASSERT_TRUE(row_lock.acquired());

Expand All @@ -119,15 +130,9 @@ TEST_F(LockManagerTest, TestMoveLock) {

class LmTestResource {
public:
explicit LmTestResource(const Slice* id)
: id_(id),
owner_(0),
is_owned_(false) {
}
explicit LmTestResource(Slice id) : id_(id), owner_(0), is_owned_(false) {}

const Slice* id() const {
return id_;
}
Slice id() const { return id_; }

void acquire(uint64_t tid) {
std::unique_lock<std::mutex> lock(lock_);
Expand All @@ -148,17 +153,16 @@ class LmTestResource {
private:
DISALLOW_COPY_AND_ASSIGN(LmTestResource);

const Slice* id_;
const Slice id_;
std::mutex lock_;
uint64_t owner_;
bool is_owned_;
};

class LmTestThread {
public:
LmTestThread(LockManager* manager, vector<const Slice*> keys,
const vector<LmTestResource*> resources)
: manager_(manager), keys_(std::move(keys)), resources_(resources) {}
LmTestThread(LockManager* manager, vector<Slice> keys, vector<LmTestResource*> resources)
: manager_(manager), keys_(std::move(keys)), resources_(std::move(resources)) {}

void Start() {
thread_ = thread([this]() { this->Run(); });
Expand All @@ -168,15 +172,9 @@ class LmTestThread {
tid_ = Env::Default()->gettid();
const OpState* my_txn = reinterpret_cast<OpState*>(tid_);

std::sort(keys_.begin(), keys_.end());
std::sort(keys_.begin(), keys_.end(), Slice::Comparator());
for (int i = 0; i < FLAGS_num_iterations; i++) {
std::vector<shared_ptr<ScopedRowLock> > locks;
// TODO: We don't have an API for multi-row
for (const Slice* key : keys_) {
locks.push_back(std::make_shared<ScopedRowLock>(
manager_, my_txn, *key, LockManager::LOCK_EXCLUSIVE));
}

ScopedRowLock l(manager_, my_txn, keys_, LockManager::LOCK_EXCLUSIVE);
for (LmTestResource* r : resources_) {
r->acquire(tid_);
}
Expand All @@ -193,14 +191,14 @@ class LmTestThread {
private:
DISALLOW_COPY_AND_ASSIGN(LmTestThread);
LockManager* manager_;
vector<const Slice*> keys_;
vector<Slice> keys_;
const vector<LmTestResource*> resources_;
uint64_t tid_;
thread thread_;
};

static void runPerformanceTest(const char *test_type,
vector<shared_ptr<LmTestThread> > *threads) {
static void RunPerformanceTest(const char* test_type,
vector<shared_ptr<LmTestThread> >* threads) {
Stopwatch sw(Stopwatch::ALL_THREADS);
sw.start();
for (const shared_ptr<LmTestThread>& t : *threads) {
Expand Down Expand Up @@ -233,12 +231,9 @@ static void runPerformanceTest(const char *test_type,
// Test running a bunch of threads at once that want an overlapping set of
// resources.
TEST_F(LockManagerTest, TestContention) {
Slice slice_a("a");
LmTestResource resource_a(&slice_a);
Slice slice_b("b");
LmTestResource resource_b(&slice_b);
Slice slice_c("c");
LmTestResource resource_c(&slice_c);
LmTestResource resource_a("a");
LmTestResource resource_b("b");
LmTestResource resource_c("c");
vector<shared_ptr<LmTestThread> > threads;
for (int i = 0; i < FLAGS_num_test_threads; ++i) {
vector<LmTestResource*> resources;
Expand All @@ -252,15 +247,14 @@ TEST_F(LockManagerTest, TestContention) {
resources.push_back(&resource_c);
resources.push_back(&resource_a);
}
vector<const Slice*> keys;
for (vector<LmTestResource*>::const_iterator r = resources.begin();
r != resources.end(); ++r) {
keys.push_back((*r)->id());
vector<Slice> keys;
for (LmTestResource* r : resources) {
keys.push_back(r->id());
}
threads.push_back(std::make_shared<LmTestThread>(
&lock_manager_, keys, resources));
}
runPerformanceTest("Contended", &threads);
RunPerformanceTest("Contended", &threads);
}

// Test running a bunch of threads at once that want different
Expand All @@ -276,19 +270,18 @@ TEST_F(LockManagerTest, TestUncontended) {
}
vector<shared_ptr<LmTestResource> > resources;
for (int i = 0; i < FLAGS_num_test_threads; i++) {
resources.push_back(
std::make_shared<LmTestResource>(&slices[i]));
resources.push_back(std::make_shared<LmTestResource>(slices[i]));
}
vector<shared_ptr<LmTestThread> > threads;
for (int i = 0; i < FLAGS_num_test_threads; ++i) {
vector<const Slice*> k;
k.push_back(&slices[i]);
vector<Slice> k;
k.push_back(slices[i]);
vector<LmTestResource*> r;
r.push_back(resources[i].get());
threads.push_back(std::make_shared<LmTestThread>(
&lock_manager_, k, r));
}
runPerformanceTest("Uncontended", &threads);
RunPerformanceTest("Uncontended", &threads);
}

} // namespace tablet
Expand Down
Loading

0 comments on commit 713879a

Please sign in to comment.