38
38
#include " db/version_set.h"
39
39
#include " db/write_batch_internal.h"
40
40
#include " port/port.h"
41
+ #include " port/likely.h"
41
42
#include " rocksdb/compaction_filter.h"
42
43
#include " rocksdb/db.h"
43
44
#include " rocksdb/env.h"
@@ -270,6 +271,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
270
271
logfile_number_(0 ),
271
272
super_version_(nullptr ),
272
273
super_version_number_(0 ),
274
+ local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
273
275
tmp_batch_(),
274
276
bg_compaction_scheduled_(0 ),
275
277
bg_manual_only_(0 ),
@@ -288,7 +290,8 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
288
290
delayed_writes_(0 ),
289
291
storage_options_(options),
290
292
bg_work_gate_closed_(false ),
291
- refitting_level_(false ) {
293
+ refitting_level_(false ),
294
+ opened_successfully_(false ) {
292
295
mem_->Ref ();
293
296
env_->GetAbsolutePath (dbname, &db_absolute_path_);
294
297
@@ -319,19 +322,46 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
319
322
}
320
323
321
324
DBImpl::~DBImpl () {
322
- autovector<MemTable*> to_delete;
323
-
324
325
// Wait for background work to finish
325
326
if (flush_on_destroy_ && mem_->GetFirstSequenceNumber () != 0 ) {
326
327
FlushMemTable (FlushOptions ());
327
328
}
329
+
328
330
mutex_.Lock ();
329
331
shutting_down_.Release_Store (this ); // Any non-nullptr value is ok
330
332
while (bg_compaction_scheduled_ ||
331
333
bg_flush_scheduled_ ||
332
334
bg_logstats_scheduled_) {
333
335
bg_cv_.Wait ();
334
336
}
337
+ mutex_.Unlock ();
338
+
339
+ // Release SuperVersion reference kept in ThreadLocalPtr.
340
+ // This must be done outside of mutex_ since unref handler can lock mutex.
341
+ // It also needs to be done after FlushMemTable, which can trigger local_sv_
342
+ // access.
343
+ delete local_sv_;
344
+
345
+ mutex_.Lock ();
346
+ if (options_.allow_thread_local ) {
347
+ // Clean up obsolete files due to SuperVersion release.
348
+ // (1) Need to delete to obsolete files before closing because RepairDB()
349
+ // scans all existing files in the file system and builds manifest file.
350
+ // Keeping obsolete files confuses the repair process.
351
+ // (2) Need to check if we Open()/Recover() the DB successfully before
352
+ // deleting because if VersionSet recover fails (may be due to corrupted
353
+ // manifest file), it is not able to identify live files correctly. As a
354
+ // result, all "live" files can get deleted by accident. However, corrupted
355
+ // manifest is recoverable by RepairDB().
356
+ if (opened_successfully_) {
357
+ DeletionState deletion_state;
358
+ FindObsoleteFiles (deletion_state, true );
359
+ // manifest number starting from 2
360
+ deletion_state.manifest_file_number = 1 ;
361
+ PurgeObsoleteFiles (deletion_state);
362
+ }
363
+ }
364
+
335
365
if (super_version_ != nullptr ) {
336
366
bool is_last_reference __attribute__ ((unused));
337
367
is_last_reference = super_version_->Unref ();
@@ -349,6 +379,7 @@ DBImpl::~DBImpl() {
349
379
delete mem_->Unref ();
350
380
}
351
381
382
+ autovector<MemTable*> to_delete;
352
383
imm_.current ()->Unref (&to_delete);
353
384
for (MemTable* m: to_delete) {
354
385
delete m;
@@ -1286,6 +1317,10 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress,
1286
1317
1287
1318
if (s.ok ()) {
1288
1319
InstallSuperVersion (deletion_state);
1320
+ // Reset SuperVersions cached in thread local storage
1321
+ if (options_.allow_thread_local ) {
1322
+ ResetThreadLocalSuperVersions (&deletion_state);
1323
+ }
1289
1324
if (madeProgress) {
1290
1325
*madeProgress = 1 ;
1291
1326
}
@@ -2811,26 +2846,21 @@ Status DBImpl::Get(const ReadOptions& options,
2811
2846
// DeletionState gets created and destructed outside of the lock -- we
2812
2847
// use this convinently to:
2813
2848
// * malloc one SuperVersion() outside of the lock -- new_superversion
2814
- // * delete one SuperVersion() outside of the lock -- superversion_to_free
2849
+ // * delete SuperVersion()s outside of the lock -- superversions_to_free
2815
2850
//
2816
2851
// However, if InstallSuperVersion() gets called twice with the same,
2817
2852
// deletion_state, we can't reuse the SuperVersion() that got malloced because
2818
2853
// first call already used it. In that rare case, we take a hit and create a
2819
- // new SuperVersion() inside of the mutex. We do similar thing
2820
- // for superversion_to_free
2854
+ // new SuperVersion() inside of the mutex.
2821
2855
void DBImpl::InstallSuperVersion (DeletionState& deletion_state) {
2856
+ mutex_.AssertHeld ();
2822
2857
// if new_superversion == nullptr, it means somebody already used it
2823
2858
SuperVersion* new_superversion =
2824
2859
(deletion_state.new_superversion != nullptr ) ?
2825
2860
deletion_state.new_superversion : new SuperVersion ();
2826
2861
SuperVersion* old_superversion = InstallSuperVersion (new_superversion);
2827
2862
deletion_state.new_superversion = nullptr ;
2828
- if (deletion_state.superversion_to_free != nullptr ) {
2829
- // somebody already put it there
2830
- delete old_superversion;
2831
- } else {
2832
- deletion_state.superversion_to_free = old_superversion;
2833
- }
2863
+ deletion_state.superversions_to_free .push_back (old_superversion);
2834
2864
}
2835
2865
2836
2866
DBImpl::SuperVersion* DBImpl::InstallSuperVersion (
@@ -2839,14 +2869,31 @@ DBImpl::SuperVersion* DBImpl::InstallSuperVersion(
2839
2869
new_superversion->Init (mem_, imm_.current (), versions_->current ());
2840
2870
SuperVersion* old_superversion = super_version_;
2841
2871
super_version_ = new_superversion;
2872
+ super_version_->db = this ;
2842
2873
++super_version_number_;
2874
+ super_version_->version_number = super_version_number_;
2875
+
2843
2876
if (old_superversion != nullptr && old_superversion->Unref ()) {
2844
2877
old_superversion->Cleanup ();
2845
2878
return old_superversion; // will let caller delete outside of mutex
2846
2879
}
2847
2880
return nullptr ;
2848
2881
}
2849
2882
2883
+ void DBImpl::ResetThreadLocalSuperVersions (DeletionState* deletion_state) {
2884
+ mutex_.AssertHeld ();
2885
+ autovector<void *> sv_ptrs;
2886
+ local_sv_->Scrape (&sv_ptrs);
2887
+ for (auto ptr : sv_ptrs) {
2888
+ assert (ptr);
2889
+ auto sv = static_cast <SuperVersion*>(ptr);
2890
+ if (static_cast <SuperVersion*>(ptr)->Unref ()) {
2891
+ sv->Cleanup ();
2892
+ deletion_state->superversions_to_free .push_back (sv);
2893
+ }
2894
+ }
2895
+ }
2896
+
2850
2897
Status DBImpl::GetImpl (const ReadOptions& options,
2851
2898
const Slice& key,
2852
2899
std::string* value,
@@ -2864,10 +2911,41 @@ Status DBImpl::GetImpl(const ReadOptions& options,
2864
2911
snapshot = versions_->LastSequence ();
2865
2912
}
2866
2913
2867
- // This can be replaced by using atomics and spinlock instead of big mutex
2868
- mutex_.Lock ();
2869
- SuperVersion* get_version = super_version_->Ref ();
2870
- mutex_.Unlock ();
2914
+ // Acquire SuperVersion
2915
+ SuperVersion* sv = nullptr ;
2916
+ if (LIKELY (options_.allow_thread_local )) {
2917
+ // The SuperVersion is cached in thread local storage to avoid acquiring
2918
+ // mutex when SuperVersion does not change since the last use. When a new
2919
+ // SuperVersion is installed, the compaction or flush thread cleans up
2920
+ // cached SuperVersion in all existing thread local storage. To avoid
2921
+ // acquiring mutex for this operation, we use atomic Swap() on the thread
2922
+ // local pointer to guarantee exclusive access. If the thread local pointer
2923
+ // is being used while a new SuperVersion is installed, the cached
2924
+ // SuperVersion can become stale. It will eventually get refreshed either
2925
+ // on the next GetImpl() call or next SuperVersion installation.
2926
+ sv = static_cast <SuperVersion*>(local_sv_->Swap (nullptr ));
2927
+ if (!sv || sv->version_number !=
2928
+ super_version_number_.load (std::memory_order_relaxed)) {
2929
+ RecordTick (options_.statistics .get (), NUMBER_SUPERVERSION_UPDATES);
2930
+ SuperVersion* sv_to_delete = nullptr ;
2931
+
2932
+ if (sv && sv->Unref ()) {
2933
+ mutex_.Lock ();
2934
+ sv->Cleanup ();
2935
+ sv_to_delete = sv;
2936
+ } else {
2937
+ mutex_.Lock ();
2938
+ }
2939
+ sv = super_version_->Ref ();
2940
+ mutex_.Unlock ();
2941
+
2942
+ delete sv_to_delete;
2943
+ }
2944
+ } else {
2945
+ mutex_.Lock ();
2946
+ sv = super_version_->Ref ();
2947
+ mutex_.Unlock ();
2948
+ }
2871
2949
2872
2950
bool have_stat_update = false ;
2873
2951
Version::GetStats stats;
@@ -2880,18 +2958,18 @@ Status DBImpl::GetImpl(const ReadOptions& options,
2880
2958
// merge_operands will contain the sequence of merges in the latter case.
2881
2959
LookupKey lkey (key, snapshot);
2882
2960
BumpPerfTime (&perf_context.get_snapshot_time , &snapshot_timer);
2883
- if (get_version ->mem ->Get (lkey, value, &s, merge_context, options_)) {
2961
+ if (sv ->mem ->Get (lkey, value, &s, merge_context, options_)) {
2884
2962
// Done
2885
2963
RecordTick (options_.statistics .get (), MEMTABLE_HIT);
2886
- } else if (get_version ->imm ->Get (lkey, value, &s, merge_context, options_)) {
2964
+ } else if (sv ->imm ->Get (lkey, value, &s, merge_context, options_)) {
2887
2965
// Done
2888
2966
RecordTick (options_.statistics .get (), MEMTABLE_HIT);
2889
2967
} else {
2890
2968
StopWatchNano from_files_timer (env_, false );
2891
2969
StartPerfTimer (&from_files_timer);
2892
2970
2893
- get_version ->current ->Get (options, lkey, value, &s, &merge_context, &stats,
2894
- options_, value_found);
2971
+ sv ->current ->Get (options, lkey, value, &s, &merge_context, &stats,
2972
+ options_, value_found);
2895
2973
have_stat_update = true ;
2896
2974
BumpPerfTime (&perf_context.get_from_output_files_time , &from_files_timer);
2897
2975
RecordTick (options_.statistics .get (), MEMTABLE_MISS);
@@ -2900,31 +2978,32 @@ Status DBImpl::GetImpl(const ReadOptions& options,
2900
2978
StopWatchNano post_process_timer (env_, false );
2901
2979
StartPerfTimer (&post_process_timer);
2902
2980
2903
- bool delete_get_version = false ;
2904
2981
if (!options_.disable_seek_compaction && have_stat_update) {
2905
2982
mutex_.Lock ();
2906
- if (get_version ->current ->UpdateStats (stats)) {
2983
+ if (sv ->current ->UpdateStats (stats)) {
2907
2984
MaybeScheduleFlushOrCompaction ();
2908
2985
}
2909
- if (get_version->Unref ()) {
2910
- get_version->Cleanup ();
2911
- delete_get_version = true ;
2912
- }
2913
2986
mutex_.Unlock ();
2987
+ }
2988
+
2989
+ // Release SuperVersion
2990
+ if (LIKELY (options_.allow_thread_local )) {
2991
+ // Put the SuperVersion back
2992
+ local_sv_->Reset (static_cast <void *>(sv));
2914
2993
} else {
2915
- if (get_version->Unref ()) {
2994
+ bool delete_sv = false ;
2995
+ if (sv->Unref ()) {
2916
2996
mutex_.Lock ();
2917
- get_version ->Cleanup ();
2997
+ sv ->Cleanup ();
2918
2998
mutex_.Unlock ();
2919
- delete_get_version = true ;
2999
+ delete_sv = true ;
3000
+ }
3001
+ if (delete_sv) {
3002
+ delete sv;
2920
3003
}
2921
- }
2922
- if (delete_get_version) {
2923
- delete get_version;
2924
3004
}
2925
3005
2926
3006
// Note, tickers are atomic now - no lock protection needed any more.
2927
-
2928
3007
RecordTick (options_.statistics .get (), NUMBER_KEYS_READ);
2929
3008
RecordTick (options_.statistics .get (), BYTES_READ, value->size ());
2930
3009
BumpPerfTime (&perf_context.get_post_process_time , &post_process_timer);
@@ -3772,6 +3851,7 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
3772
3851
impl->mutex_ .Unlock ();
3773
3852
3774
3853
if (s.ok ()) {
3854
+ impl->opened_successfully_ = true ;
3775
3855
*dbptr = impl;
3776
3856
} else {
3777
3857
delete impl;
0 commit comments