forked from facebook/rocksdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlog_reader.h
188 lines (153 loc) · 6.45 KB
/
log_reader.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <memory>
#include <stdint.h>
#include "db/log_format.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "rocksdb/options.h"
namespace rocksdb {
class SequentialFileReader;
class Logger;
namespace log {
/**
* Reader is a general purpose log stream reader implementation. The actual job
* of reading from the device is implemented by the SequentialFile interface.
*
* Please see Writer for details on the file and record layout.
*/
class Reader {
public:
// Interface for reporting errors.
class Reporter {
public:
virtual ~Reporter();
// Some corruption was detected. "size" is the approximate number
// of bytes dropped due to the corruption.
virtual void Corruption(size_t bytes, const Status& status) = 0;
};
// Create a reader that will return log records from "*file".
// "*file" must remain live while this Reader is in use.
//
// If "reporter" is non-nullptr, it is notified whenever some data is
// dropped due to a detected corruption. "*reporter" must remain
// live while this Reader is in use.
//
// If "checksum" is true, verify checksums if available.
Reader(std::shared_ptr<Logger> info_log,
// @lint-ignore TXT2 T25377293 Grandfathered in
std::unique_ptr<SequentialFileReader>&& file, Reporter* reporter,
bool checksum, uint64_t log_num);
virtual ~Reader();
// Read the next record into *record. Returns true if read
// successfully, false if we hit end of the input. May use
// "*scratch" as temporary storage. The contents filled in *record
// will only be valid until the next mutating operation on this
// reader or the next mutation to *scratch.
virtual bool ReadRecord(Slice* record, std::string* scratch,
WALRecoveryMode wal_recovery_mode =
WALRecoveryMode::kTolerateCorruptedTailRecords);
// Returns the physical offset of the last record returned by ReadRecord.
//
// Undefined before the first call to ReadRecord.
uint64_t LastRecordOffset();
// returns true if the reader has encountered an eof condition.
bool IsEOF() {
return eof_;
}
// returns true if the reader has encountered read error.
bool hasReadError() const { return read_error_; }
// when we know more data has been written to the file. we can use this
// function to force the reader to look again in the file.
// Also aligns the file position indicator to the start of the next block
// by reading the rest of the data from the EOF position to the end of the
// block that was partially read.
virtual void UnmarkEOF();
SequentialFileReader* file() { return file_.get(); }
Reporter* GetReporter() const { return reporter_; }
uint64_t GetLogNumber() const { return log_number_; }
protected:
std::shared_ptr<Logger> info_log_;
const std::unique_ptr<SequentialFileReader> file_;
Reporter* const reporter_;
bool const checksum_;
char* const backing_store_;
// Internal state variables used for reading records
Slice buffer_;
bool eof_; // Last Read() indicated EOF by returning < kBlockSize
bool read_error_; // Error occurred while reading from file
// Offset of the file position indicator within the last block when an
// EOF was detected.
size_t eof_offset_;
// Offset of the last record returned by ReadRecord.
uint64_t last_record_offset_;
// Offset of the first location past the end of buffer_.
uint64_t end_of_buffer_offset_;
// which log number this is
uint64_t const log_number_;
// Whether this is a recycled log file
bool recycled_;
// Extend record types with the following special values
enum {
kEof = kMaxRecordType + 1,
// Returned whenever we find an invalid physical record.
// Currently there are three situations in which this happens:
// * The record has an invalid CRC (ReadPhysicalRecord reports a drop)
// * The record is a 0-length record (No drop is reported)
kBadRecord = kMaxRecordType + 2,
// Returned when we fail to read a valid header.
kBadHeader = kMaxRecordType + 3,
// Returned when we read an old record from a previous user of the log.
kOldRecord = kMaxRecordType + 4,
// Returned when we get a bad record length
kBadRecordLen = kMaxRecordType + 5,
// Returned when we get a bad record checksum
kBadRecordChecksum = kMaxRecordType + 6,
};
// Return type, or one of the preceding special values
unsigned int ReadPhysicalRecord(Slice* result, size_t* drop_size);
// Read some more
bool ReadMore(size_t* drop_size, int *error);
void UnmarkEOFInternal();
// Reports dropped bytes to the reporter.
// buffer_ must be updated to remove the dropped bytes prior to invocation.
void ReportCorruption(size_t bytes, const char* reason);
void ReportDrop(size_t bytes, const Status& reason);
private:
// No copying allowed
Reader(const Reader&);
void operator=(const Reader&);
};
class FragmentBufferedReader : public Reader {
public:
FragmentBufferedReader(std::shared_ptr<Logger> info_log,
// @lint-ignore TXT2 T25377293 Grandfathered in
std::unique_ptr<SequentialFileReader>&& _file,
Reporter* reporter, bool checksum, uint64_t log_num)
: Reader(info_log, std::move(_file), reporter, checksum, log_num),
fragments_(),
in_fragmented_record_(false) {}
~FragmentBufferedReader() override {}
bool ReadRecord(Slice* record, std::string* scratch,
WALRecoveryMode wal_recovery_mode =
WALRecoveryMode::kTolerateCorruptedTailRecords) override;
void UnmarkEOF() override;
private:
std::string fragments_;
bool in_fragmented_record_;
bool TryReadFragment(Slice* result, size_t* drop_size,
unsigned int* fragment_type_or_err);
bool TryReadMore(size_t* drop_size, int* error);
// No copy allowed
FragmentBufferedReader(const FragmentBufferedReader&);
void operator=(const FragmentBufferedReader&);
};
} // namespace log
} // namespace rocksdb