forked from facebook/rocksdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
simulated_hybrid_file_system.cc
152 lines (135 loc) · 5.44 KB
/
simulated_hybrid_file_system.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#ifndef ROCKSDB_LITE
#include "tools/simulated_hybrid_file_system.h"
#include <algorithm>
#include <sstream>
#include <string>
#include "rocksdb/rate_limiter.h"
namespace ROCKSDB_NAMESPACE {
const int kLatencyAddedPerRequestUs = 15000;
const int64_t kRequestPerSec = 100;
const int64_t kDummyBytesPerRequest = 1024 * 1024;
// The metadata file format: each line is a full filename of a file which is
// warm
SimulatedHybridFileSystem::SimulatedHybridFileSystem(
const std::shared_ptr<FileSystem>& base,
const std::string& metadata_file_name)
: FileSystemWrapper(base),
// Limit to 100 requests per second.
rate_limiter_(NewGenericRateLimiter(
kDummyBytesPerRequest * kRequestPerSec /* rate_bytes_per_sec */,
1000 /* refill_period_us */)),
metadata_file_name_(metadata_file_name),
name_("SimulatedHybridFileSystem: " + std::string(target()->Name())) {
IOStatus s = base->FileExists(metadata_file_name, IOOptions(), nullptr);
if (s.IsNotFound()) {
return;
}
std::string metadata;
s = ReadFileToString(base.get(), metadata_file_name, &metadata);
if (!s.ok()) {
fprintf(stderr, "Error reading from file %s: %s",
metadata_file_name.c_str(), s.ToString().c_str());
// Exit rather than assert as this file system is built to run with
// benchmarks, which usually run on release mode.
std::exit(1);
}
std::istringstream input;
input.str(metadata);
std::string line;
while (std::getline(input, line)) {
fprintf(stderr, "Warm file %s\n", line.c_str());
warm_file_set_.insert(line);
}
}
// Need to write out the metadata file to file. See comment of
// SimulatedHybridFileSystem::SimulatedHybridFileSystem() for format of the
// file.
SimulatedHybridFileSystem::~SimulatedHybridFileSystem() {
std::string metadata;
for (const auto& f : warm_file_set_) {
metadata += f;
metadata += "\n";
}
IOStatus s = WriteStringToFile(target(), metadata, metadata_file_name_, true);
if (!s.ok()) {
fprintf(stderr, "Error writing to file %s: %s", metadata_file_name_.c_str(),
s.ToString().c_str());
}
}
IOStatus SimulatedHybridFileSystem::NewRandomAccessFile(
const std::string& fname, const FileOptions& file_opts,
std::unique_ptr<FSRandomAccessFile>* result, IODebugContext* dbg) {
Temperature temperature = Temperature::kUnknown;
{
const std::lock_guard<std::mutex> lock(mutex_);
if (warm_file_set_.find(fname) != warm_file_set_.end()) {
temperature = Temperature::kWarm;
}
}
IOStatus s = target()->NewRandomAccessFile(fname, file_opts, result, dbg);
result->reset(
new SimulatedHybridRaf(result->release(), rate_limiter_, temperature));
return s;
}
IOStatus SimulatedHybridFileSystem::NewWritableFile(
const std::string& fname, const FileOptions& file_opts,
std::unique_ptr<FSWritableFile>* result, IODebugContext* dbg) {
if (file_opts.temperature == Temperature::kWarm) {
const std::lock_guard<std::mutex> lock(mutex_);
fprintf(stderr, "warm file %s\n", fname.c_str());
warm_file_set_.insert(fname);
}
return target()->NewWritableFile(fname, file_opts, result, dbg);
}
IOStatus SimulatedHybridFileSystem::DeleteFile(const std::string& fname,
const IOOptions& options,
IODebugContext* dbg) {
{
const std::lock_guard<std::mutex> lock(mutex_);
warm_file_set_.erase(fname);
}
return target()->DeleteFile(fname, options, dbg);
}
IOStatus SimulatedHybridRaf::Read(uint64_t offset, size_t n,
const IOOptions& options, Slice* result,
char* scratch, IODebugContext* dbg) const {
if (temperature_ == Temperature::kWarm) {
Env::Default()->SleepForMicroseconds(kLatencyAddedPerRequestUs);
RequestRateLimit(1);
}
return target()->Read(offset, n, options, result, scratch, dbg);
}
IOStatus SimulatedHybridRaf::MultiRead(FSReadRequest* reqs, size_t num_reqs,
const IOOptions& options,
IODebugContext* dbg) {
if (temperature_ == Temperature::kWarm) {
RequestRateLimit(static_cast<int64_t>(num_reqs));
Env::Default()->SleepForMicroseconds(kLatencyAddedPerRequestUs *
static_cast<int>(num_reqs));
}
return target()->MultiRead(reqs, num_reqs, options, dbg);
}
IOStatus SimulatedHybridRaf::Prefetch(uint64_t offset, size_t n,
const IOOptions& options,
IODebugContext* dbg) {
if (temperature_ == Temperature::kWarm) {
RequestRateLimit(1);
Env::Default()->SleepForMicroseconds(kLatencyAddedPerRequestUs);
}
return target()->Prefetch(offset, n, options, dbg);
}
void SimulatedHybridRaf::RequestRateLimit(int64_t num_requests) const {
int64_t left = num_requests * kDummyBytesPerRequest;
const int64_t kMaxToRequest = kDummyBytesPerRequest / 100;
while (left > 0) {
int64_t to_request = std::min(kMaxToRequest, left);
rate_limiter_->Request(to_request, Env::IOPriority::IO_LOW, nullptr);
left -= to_request;
}
}
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE