Skip to content

Commit

Permalink
Revert "Better port::Mutex::AssertHeld() and AssertNotHeld()"
Browse files Browse the repository at this point in the history
This reverts commit ddafceb.
  • Loading branch information
igorcanadi committed Apr 23, 2014
1 parent ddafceb commit 1068d2f
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 47 deletions.
2 changes: 1 addition & 1 deletion db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ class DBTest {
options.db_log_dir = test::TmpDir();
break;
case kWalDir:
options.wal_dir = test::TmpDir() + "/wal_dir";
options.wal_dir = "/tmp/wal";
break;
case kManifestFileSize:
options.max_manifest_file_size = 50; // 50 bytes
Expand Down
21 changes: 5 additions & 16 deletions port/port_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,11 @@

#include "port/port_posix.h"

#include <memory>
#include <cstdlib>
#include <stdio.h>
#include <assert.h>
#include <string.h>
#include "util/logging.h"
#include "util/thread_local.h"

namespace rocksdb {
namespace port {
Expand All @@ -28,9 +26,6 @@ static void PthreadCall(const char* label, int result) {
}

Mutex::Mutex(bool adaptive) {
#ifndef NDEBUG
locked_.reset(new ThreadLocalPtr());
#endif
#ifdef OS_LINUX
if (!adaptive) {
PthreadCall("init mutex", pthread_mutex_init(&mu_, NULL));
Expand All @@ -54,26 +49,20 @@ Mutex::~Mutex() { PthreadCall("destroy mutex", pthread_mutex_destroy(&mu_)); }
void Mutex::Lock() {
PthreadCall("lock", pthread_mutex_lock(&mu_));
#ifndef NDEBUG
locked_->Reset(this);
locked_ = true;
#endif
}

void Mutex::Unlock() {
#ifndef NDEBUG
locked_->Reset(nullptr);
locked_ = false;
#endif
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
}

void Mutex::AssertHeld() {
#ifndef NDEBUG
assert(locked_->Get() == this);
#endif
}

void Mutex::AssertNotHeld() {
#ifndef NDEBUG
assert(locked_->Get() == nullptr);
assert(locked_);
#endif
}

Expand All @@ -86,11 +75,11 @@ CondVar::~CondVar() { PthreadCall("destroy cv", pthread_cond_destroy(&cv_)); }

void CondVar::Wait() {
#ifndef NDEBUG
mu_->locked_->Reset(nullptr);
mu_->locked_ = false;
#endif
PthreadCall("wait", pthread_cond_wait(&cv_, &mu_->mu_));
#ifndef NDEBUG
mu_->locked_->Reset(mu_);
mu_->locked_ = true;
#endif
}

Expand Down
19 changes: 7 additions & 12 deletions port/port_posix.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
//
// See port_example.h for documentation for the following types/functions.

#pragma once
#ifndef STORAGE_LEVELDB_PORT_PORT_POSIX_H_
#define STORAGE_LEVELDB_PORT_PORT_POSIX_H_

#undef PLATFORM_IS_LITTLE_ENDIAN
#if defined(OS_MACOSX)
Expand Down Expand Up @@ -50,7 +51,6 @@
#include <lz4hc.h>
#endif

#include <memory>
#include <stdint.h>
#include <string>
#include <string.h>
Expand Down Expand Up @@ -83,36 +83,29 @@
#endif

namespace rocksdb {

class ThreadLocalPtr;

namespace port {

static const bool kLittleEndian = PLATFORM_IS_LITTLE_ENDIAN;
#undef PLATFORM_IS_LITTLE_ENDIAN

class CondVar;

// DO NOT declare this Mutex as static ever. Inside it depends on ThreadLocalPtr
// and its Lock() and Unlock() function depend on ThreadLocalPtr::StaticMeta,
// which is also declared static. We can't really control static
// deinitialization order.
class Mutex {
public:
/* implicit */ Mutex(bool adaptive = false);
~Mutex();

void Lock();
void Unlock();

// this will assert if the mutex is not locked
// it does NOT verify that mutex is held by a calling thread
void AssertHeld();
void AssertNotHeld();

private:
friend class CondVar;
pthread_mutex_t mu_;
#ifndef NDEBUG
std::unique_ptr<ThreadLocalPtr> locked_;
bool locked_;
#endif

// No copying
Expand Down Expand Up @@ -487,3 +480,5 @@ inline bool LZ4HC_Compress(const CompressionOptions &opts, const char* input,

} // namespace port
} // namespace rocksdb

#endif // STORAGE_LEVELDB_PORT_PORT_POSIX_H_
33 changes: 17 additions & 16 deletions util/thread_local.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,21 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "util/thread_local.h"

#include <mutex>

#include "util/mutexlock.h"
#include "port/likely.h"


namespace rocksdb {

std::unique_ptr<ThreadLocalPtr::StaticMeta> ThreadLocalPtr::StaticMeta::inst_;
std::mutex ThreadLocalPtr::StaticMeta::mutex_;
port::Mutex ThreadLocalPtr::StaticMeta::mutex_;
#if !defined(OS_MACOSX)
__thread ThreadLocalPtr::ThreadData* ThreadLocalPtr::StaticMeta::tls_ = nullptr;
#endif

ThreadLocalPtr::StaticMeta* ThreadLocalPtr::StaticMeta::Instance() {
if (UNLIKELY(inst_ == nullptr)) {
std::lock_guard<std::mutex> l(mutex_);
MutexLock l(&mutex_);
if (inst_ == nullptr) {
inst_.reset(new StaticMeta());
}
Expand All @@ -39,7 +37,7 @@ void ThreadLocalPtr::StaticMeta::OnThreadExit(void* ptr) {
auto* inst = Instance();
pthread_setspecific(inst->pthread_key_, nullptr);

std::lock_guard<std::mutex> l(mutex_);
MutexLock l(&mutex_);
inst->RemoveThreadData(tls);
// Unref stored pointers of current thread from all instances
uint32_t id = 0;
Expand All @@ -66,6 +64,7 @@ ThreadLocalPtr::StaticMeta::StaticMeta() : next_instance_id_(0) {
}

void ThreadLocalPtr::StaticMeta::AddThreadData(ThreadLocalPtr::ThreadData* d) {
mutex_.AssertHeld();
d->next = &head_;
d->prev = head_.prev;
head_.prev->next = d;
Expand All @@ -74,6 +73,7 @@ void ThreadLocalPtr::StaticMeta::AddThreadData(ThreadLocalPtr::ThreadData* d) {

void ThreadLocalPtr::StaticMeta::RemoveThreadData(
ThreadLocalPtr::ThreadData* d) {
mutex_.AssertHeld();
d->next->prev = d->prev;
d->prev->next = d->next;
d->next = d->prev = d;
Expand All @@ -93,14 +93,14 @@ ThreadLocalPtr::ThreadData* ThreadLocalPtr::StaticMeta::GetThreadLocal() {
{
// Register it in the global chain, needs to be done before thread exit
// handler registration
std::lock_guard<std::mutex> l(mutex_);
MutexLock l(&mutex_);
inst->AddThreadData(tls_);
}
// Even it is not OS_MACOSX, need to register value for pthread_key_ so that
// its exit handler will be triggered.
if (pthread_setspecific(inst->pthread_key_, tls_) != 0) {
{
std::lock_guard<std::mutex> l(mutex_);
MutexLock l(&mutex_);
inst->RemoveThreadData(tls_);
}
delete tls_;
Expand All @@ -122,7 +122,7 @@ void ThreadLocalPtr::StaticMeta::Reset(uint32_t id, void* ptr) {
auto* tls = GetThreadLocal();
if (UNLIKELY(id >= tls->entries.size())) {
// Need mutex to protect entries access within ReclaimId
std::lock_guard<std::mutex> l(mutex_);
MutexLock l(&mutex_);
tls->entries.resize(id + 1);
}
tls->entries[id].ptr.store(ptr, std::memory_order_relaxed);
Expand All @@ -132,7 +132,7 @@ void* ThreadLocalPtr::StaticMeta::Swap(uint32_t id, void* ptr) {
auto* tls = GetThreadLocal();
if (UNLIKELY(id >= tls->entries.size())) {
// Need mutex to protect entries access within ReclaimId
std::lock_guard<std::mutex> l(mutex_);
MutexLock l(&mutex_);
tls->entries.resize(id + 1);
}
return tls->entries[id].ptr.exchange(ptr, std::memory_order_relaxed);
Expand All @@ -143,7 +143,7 @@ bool ThreadLocalPtr::StaticMeta::CompareAndSwap(uint32_t id, void* ptr,
auto* tls = GetThreadLocal();
if (UNLIKELY(id >= tls->entries.size())) {
// Need mutex to protect entries access within ReclaimId
std::lock_guard<std::mutex> l(mutex_);
MutexLock l(&mutex_);
tls->entries.resize(id + 1);
}
return tls->entries[id].ptr.compare_exchange_strong(expected, ptr,
Expand All @@ -152,7 +152,7 @@ bool ThreadLocalPtr::StaticMeta::CompareAndSwap(uint32_t id, void* ptr,

void ThreadLocalPtr::StaticMeta::Scrape(uint32_t id, autovector<void*>* ptrs,
void* const replacement) {
std::lock_guard<std::mutex> l(mutex_);
MutexLock l(&mutex_);
for (ThreadData* t = head_.next; t != &head_; t = t->next) {
if (id < t->entries.size()) {
void* ptr =
Expand All @@ -165,11 +165,12 @@ void ThreadLocalPtr::StaticMeta::Scrape(uint32_t id, autovector<void*>* ptrs,
}

void ThreadLocalPtr::StaticMeta::SetHandler(uint32_t id, UnrefHandler handler) {
std::lock_guard<std::mutex> l(mutex_);
MutexLock l(&mutex_);
handler_map_[id] = handler;
}

UnrefHandler ThreadLocalPtr::StaticMeta::GetHandler(uint32_t id) {
mutex_.AssertHeld();
auto iter = handler_map_.find(id);
if (iter == handler_map_.end()) {
return nullptr;
Expand All @@ -178,7 +179,7 @@ UnrefHandler ThreadLocalPtr::StaticMeta::GetHandler(uint32_t id) {
}

uint32_t ThreadLocalPtr::StaticMeta::GetId() {
std::lock_guard<std::mutex> l(mutex_);
MutexLock l(&mutex_);
if (free_instance_ids_.empty()) {
return next_instance_id_++;
}
Expand All @@ -189,7 +190,7 @@ uint32_t ThreadLocalPtr::StaticMeta::GetId() {
}

uint32_t ThreadLocalPtr::StaticMeta::PeekId() const {
std::lock_guard<std::mutex> l(mutex_);
MutexLock l(&mutex_);
if (!free_instance_ids_.empty()) {
return free_instance_ids_.back();
}
Expand All @@ -199,7 +200,7 @@ uint32_t ThreadLocalPtr::StaticMeta::PeekId() const {
void ThreadLocalPtr::StaticMeta::ReclaimId(uint32_t id) {
// This id is not used, go through all thread local data and release
// corresponding value
std::lock_guard<std::mutex> l(mutex_);
MutexLock l(&mutex_);
auto unref = GetHandler(id);
for (ThreadData* t = head_.next; t != &head_; t = t->next) {
if (id < t->entries.size()) {
Expand Down
5 changes: 3 additions & 2 deletions util/thread_local.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@
#pragma once

#include <atomic>
#include <mutex>
#include <memory>
#include <unordered_map>
#include <vector>

#include "util/autovector.h"
#include "port/port_posix.h"
#include "util/thread_local.h"

namespace rocksdb {

Expand Down Expand Up @@ -152,7 +153,7 @@ class ThreadLocalPtr {

// protect inst, next_instance_id_, free_instance_ids_, head_,
// ThreadData.entries
static std::mutex mutex_;
static port::Mutex mutex_;
#if !defined(OS_MACOSX)
// Thread local storage
static __thread ThreadData* tls_;
Expand Down

0 comments on commit 1068d2f

Please sign in to comment.