forked from fastio/1store
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfile_reader.hh
122 lines (116 loc) · 4.03 KB
/
file_reader.hh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#pragma once
#include <vector>
#include <typeinfo>
#include <limits>
#include "core/future.hh"
#include "core/future-util.hh"
#include "core/sstring.hh"
#include "core/fstream.hh"
#include "core/shared_ptr.hh"
#include "core/do_with.hh"
#include "core/thread.hh"
#include "core/align.hh"
#include "core/file.hh"
#include "core/sstring.hh"
#include "core/seastar.hh"
#include "core/shared_future.hh"
#include "core/byteorder.hh"
#include "core/gate.hh"
#include "utils/disk-error-handler.hh"
#include "utils/bytes.hh"
#include "store/checked-file-impl.hh"
#include <iterator>
#include "seastarx.hh"
#include "exceptions/exceptions.hh"
namespace store {
extern future<file> make_file(const io_error_handler& error_handler, sstring name, open_flags flags);
class random_access_reader {
std::unique_ptr<input_stream<char>> _in;
seastar::gate _close_gate;
protected:
virtual input_stream<char> open_at(uint64_t pos) = 0;
public:
future<temporary_buffer<char>> read_exactly(size_t n) {
return _in->read_exactly(n);
}
void seek(uint64_t pos) {
if (_in) {
seastar::with_gate(_close_gate, [in = std::move(_in)] () mutable {
auto fut = in->close();
return fut.then([in = std::move(in)] {});
});
}
_in = std::make_unique<input_stream<char>>(open_at(pos));
}
bool eof() { return _in->eof(); }
virtual future<> close() {
return _close_gate.close().then([this] {
return _in->close();
});
}
virtual ~random_access_reader() { }
};
class file_random_access_reader : public random_access_reader {
file _file;
uint64_t _file_size;
size_t _buffer_size;
unsigned _read_ahead;
public:
virtual input_stream<char> open_at(uint64_t pos) override {
auto len = _file_size - pos;
file_input_stream_options options;
options.buffer_size = _buffer_size;
options.read_ahead = _read_ahead;
return make_file_input_stream(_file, pos, len, std::move(options));
}
explicit file_random_access_reader(file f, uint64_t file_size, size_t buffer_size = 8192, unsigned read_ahead = 4)
: _file(std::move(f)), _file_size(file_size), _buffer_size(buffer_size), _read_ahead(read_ahead)
{
seek(0);
}
virtual future<> close() override {
return random_access_reader::close().finally([this] {
return _file.close().handle_exception([save = _file] (auto ep) {
general_disk_error();
});
});
}
};
struct read_file_options {
size_t _buffer_size = 8192;
unsigned _read_ahead = 4;
const io_priority_class& _io_priority_class;
read_file_options(const io_priority_class& pc) : _io_priority_class(pc) {}
read_file_options(read_file_options&& o)
: _buffer_size (std::move(o._buffer_size))
, _read_ahead(std::move(o._read_ahead))
, _io_priority_class(o._io_priority_class)
{
}
};
template <typename T>
future<> read_file(const sstring& filename, T& component, const io_error_handler& handler, read_file_options&& opt) {
auto file_path = filename;
return open_file_dma(file_path, open_flags::ro).then([&component, opt = std::move(opt), &handler] (file file_) {
auto fut = file_.size();
return fut.then([&component, file_ = std::move(file_), opt = std::move(opt), &handler] (uint64_t size) {
auto f = make_checked_file(handler, file_);
auto r = make_lw_shared<file_random_access_reader>(std::move(f), size, opt._buffer_size);
auto fut = decode_from(*r, component);
return fut.finally([r] {
return r->close();
}).then([r] {});
});
}).then_wrapped([file_path] (future<> f) {
try {
f.get();
} catch (std::system_error& e) {
if (e.code() == std::error_code(ENOENT, std::system_category())) {
throw redis::io_exception(file_path + ": file not found");
}
throw;
}
return make_ready_future<>();
});
}
}