forked from apple/turicreate
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsframe_reader_buffer.hpp
160 lines (135 loc) · 4.3 KB
/
sframe_reader_buffer.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
/* Copyright © 2017 Apple Inc. All rights reserved.
*
* Use of this source code is governed by a BSD-3-clause license that can
* be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause
*/
#ifndef TURI_SFRAME_READER_BUFFER
#define TURI_SFRAME_READER_BUFFER
#include <memory>
#include <vector>
#include <flexible_type/flexible_type.hpp>
#include <sframe/sframe.hpp>
#include <sframe/sframe_constants.hpp>
namespace turi {
class sframe;
/**
* \ingroup sframe_physical
* \addtogroup sframe_main Main SFrame Objects
* \{
*/
/**
* A buffered reader reading from a range of an sframe<T>.
*
* \code
* sframe<flexible_type> mysframe = ...;
*
* // Reader for the first thousand lines
* sframe_reader_buffer<flexible_type> reader(mysframe, 0, 1000);
*
* while(reader.has_next()) {
* flexible_type val = reader.next();
* ... do some thing with val ...
* }
*
* // Reader for the entire sframe
* reader = sframe_reader_buffer<flexible_type>(mysframe, 0, (size_t)(-1));
* ...
* \endcode
*
* Internally, the reader maintains a vector as buffer, and when reading
* reaches the end of the buffer, refill the buffer by reading from sframe.
*/
class sframe_reader_buffer {
public:
typedef sframe_rows::row value_type;
sframe_reader_buffer() = default;
/// Construct from sframe reader with begin and end row.
sframe_reader_buffer(
std::shared_ptr<typename sframe::reader_type> reader,
size_t row_start, size_t row_end,
size_t buffer_size = DEFAULT_SARRAY_READER_BUFFER_SIZE) {
init(reader, row_start, row_end, buffer_size);
}
void init(std::shared_ptr<typename sframe::reader_type>& reader,
size_t row_start, size_t row_end,
size_t internal_buffer_size = DEFAULT_SARRAY_READER_BUFFER_SIZE) {
m_reader = reader;
m_buffer_pos = 0;
m_iter = row_start;
m_original_row_start = row_start;
m_row_start = row_start;
m_row_end = std::min(row_end, m_reader->size());
m_buffer_size = internal_buffer_size;
m_buffer.clear();
}
/// Return the next element in the reader.
const sframe_rows::row& next();
/// Returns the current element
const sframe_rows::row& current();
/// Return true if the reader has more element.
bool has_next();
/// Return the buffer.
inline sframe_rows& get_buffer() {return m_buffer;}
/// Return the Number of elements between row_start and row_end.
inline size_t size() {return m_row_end - m_original_row_start;}
/** Resets the buffer to the initial starting conditions. Reading
* from the buffer again will start from row_start.
*/
void clear();
private:
/// Refill the chunk buffer form the sframe reader.
void refill();
typedef sframe::reader_type reader_type;
/// Buffer the prefetched elements.
sframe_rows m_buffer;
/// Current value
sframe_rows::row m_current;
/// The underlying reader as a data source.
std::shared_ptr<reader_type> m_reader;
/// Current position of the buffer reader.
size_t m_buffer_pos = 0;
/// The initial starting point. clear() will reset row_start to here.
size_t m_original_row_start = 0;
/// Start row of the remaining chunk.
size_t m_row_start = 0;
/// End row of the chunk.
size_t m_row_end = 0;
/// The size of the buffer vector
size_t m_buffer_size = 0;
/// The current iterator location
size_t m_iter = 0;
};
/// \}
//
/// Return the next element in the chunk.
inline const sframe_rows::row& sframe_reader_buffer::next() {
if (m_buffer_pos == m_buffer.num_rows()) {
refill();
m_buffer_pos = 0;
}
DASSERT_LT(m_buffer_pos, m_buffer.num_rows());
++m_iter;
m_current.copy_reference(m_buffer[m_buffer_pos++]);
return m_current;
}
inline const sframe_rows::row& sframe_reader_buffer::current() {
return m_current;
}
/// Return true if the chunk has remaining element.
inline bool sframe_reader_buffer::has_next() {
return m_iter < m_row_end;
}
/// Refill the chunk buffer form the sframe reader.
inline void sframe_reader_buffer::refill() {
size_t size_of_refill = std::min<size_t>(m_row_end - m_row_start, m_buffer_size);
m_reader->read_rows(m_row_start, m_row_start + size_of_refill, m_buffer);
m_row_start += size_of_refill;
}
inline void sframe_reader_buffer::clear() {
m_buffer.clear();
m_row_start = m_original_row_start;
m_iter = m_original_row_start;
m_buffer_pos = 0;
}
}
#endif