forked from AshBT/Dato-Core
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgroupby_aggregate_impl.hpp
255 lines (203 loc) · 7.64 KB
/
groupby_aggregate_impl.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
/*
* Copyright (C) 2015 Dato, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef GRAPHLAB_SFRAME_GROUPBY_AGGREGATE_IMPL_HPP
#define GRAPHLAB_SFRAME_GROUPBY_AGGREGATE_IMPL_HPP
#include <memory>
#include <vector>
#include <cstdint>
#include <functional>
#include <unordered_set>
#include <sframe/sframe.hpp>
#include <util/cityhash_gl.hpp>
#include <parallel/mutex.hpp>
#include <sframe/group_aggregate_value.hpp>
#include <graphlab/util/hopscotch_map.hpp>
namespace graphlab {
namespace groupby_aggregate_impl {
/**
* A description of a group operation.
*/
struct group_descriptor {
/// The column number of operate on
std::vector<size_t> column_numbers;
/// The aggregator
std::shared_ptr<group_aggregate_value> aggregator;
};
/**
* This class manages all the intermedate aggregation result of a given key.
* It contains a key, and an array of multiple aggregated values for that key
* (each aggregated value is for a different aggregator. For instance, one
* could be sum, one could be count).
* It then provides
*/
struct groupby_element {
/// The key of this aggregation
std::vector<flexible_type> key;
/// All the aggregated values
mutable std::vector<std::unique_ptr<group_aggregate_value> > values;
/// A cache of the hash of the key
size_t hash_val;
groupby_element() = default;
/**
* Constructs a group element from a key, and a description of all
* the group operations. All the aggregated values will be initialized as
* new empty values.
*/
groupby_element(const std::vector<flexible_type>& group_key,
const std::vector<group_descriptor>& group_desc);
/**
* Constructs a groupby_element from a string which contains a
* serialization of the element. The array of all the descriptors is
* required.
*/
groupby_element(const std::string& val,
const std::vector<group_descriptor>& group_desc);
/**
* Constructs a group element from a key, and a description of all
* the group operations. All the aggregated values will be initialized as
* new empty values.
*/
void init(const std::vector<flexible_type>& group_key,
const std::vector<group_descriptor>& group_desc);
/// Writes the group result into an output archive
void save(oarchive& oarc) const;
/**
* Loads the group result from an input archive and a group
* operation descriptor
*/
void load(iarchive& iarc,
const std::vector<group_descriptor>& group_desc);
/**
* Useful function which compares equality of two vector<flexible_type>.
* Returns true if they are identical, returns false otherwise.
*/
static bool flexible_type_vector_equality(const std::vector<flexible_type>& a,
const std::vector<flexible_type>& b);
/**
* Useful function which compares equality of two vector<flexible_type>.
* Returns true if they are identical, returns false otherwise.
*/
static bool flexible_type_vector_equality(const std::vector<flexible_type>& a,
size_t alen,
const std::vector<flexible_type>& b,
size_t blen);
/**
* Defines an ordering among vector<flexible_type>. Returns true if
* a is ordered before b and false otherwise.
*/
static bool flexible_type_vector_lt(const std::vector<flexible_type>& a,
const std::vector<flexible_type>& b);
/**
* Provides a total ordering on group-by elements
*/
bool operator>(const groupby_element& other) const;
/**
* Provides a total ordering on group-by elements
*/
bool operator<(const groupby_element& other) const;
/**
* Returns true if this, and other have identical keys
*/
bool operator==(const groupby_element& other) const;
/**
* Combines values another groupby element which is performing the
* same set of operations
*/
void operator+=(const groupby_element& other);
/**
* Inserts a new table row to be aggregated.
*/
void add_element(const std::vector<flexible_type>& val,
const std::vector<group_descriptor>& group_desc) const;
static size_t hash_key(const std::vector<flexible_type>& key);
static size_t hash_key(const std::vector<flexible_type>& key, size_t keylen);
size_t hash() const;
void compute_hash();
};
} // namespace grouby_aggregate_impl
} // namespace graphlab
// we need to put the hash struct in std
namespace std {
/**
* Hash function.
*
* This allows us to add groupby_element to an std::unordered_set
*/
template<>
struct hash<graphlab::groupby_aggregate_impl::groupby_element> {
size_t operator()(
const graphlab::groupby_aggregate_impl::groupby_element& element) const {
return element.hash();
}
};
} // namespace std
namespace graphlab {
namespace groupby_aggregate_impl {
/**
* This
*/
class group_aggregate_container {
public:
/// construct with given sarray and the segmentid as sink.
group_aggregate_container(size_t max_buffer_size,
size_t num_segments);
/// Deleted copy constructor
group_aggregate_container(const group_aggregate_container& other) = delete;
/// Deleted assignment operator
group_aggregate_container&
operator=(const group_aggregate_container& other) = delete;
/**
* Adds a new group operation which groups the values of a column
*/
void define_group(std::vector<size_t> column_numbers,
std::shared_ptr<group_aggregate_value> aggregator);
/// Add a new element to the container.
void add(const std::vector<flexible_type>& val,
size_t num_keys);
/// Sort all elements in the container and writes to the output.
void group_and_write(sframe& out);
private:
/// collection of all the group operations
std::vector<group_descriptor> group_descriptors;
struct segment_information {
/// Locks on the elements structure
graphlab::simple_spinlock in_memory_group_lock;
graphlab::simple_spinlock fine_grain_locks[128];
atomic<size_t> refctr;
/// Intermediate group values
hopscotch_map<size_t, std::vector<groupby_element>* > elements;
/// Locks on the below structures
graphlab::mutex file_lock;
/// The temporary storage for the grouped values
sarray<std::string>::iterator outiter;
/// Storing the size of each sorted chunk.
std::vector<size_t> chunk_size;
};
/// Writes the content into the sarray segment backend.
void flush_segment(size_t segmentid);
size_t max_buffer_size;
std::vector<segment_information> segments;
sarray<std::string> intermediate_buffer;
std::unique_ptr<sarray<std::string>::reader_type> reader;
/// Sort all elements in the container and writes to the output.
void group_and_write_segment(sframe& out,
std::shared_ptr<sarray<std::string>::reader_type> reader,
size_t segmentid);
};
} // namespace groupby_aggregate_impl
} // namespace graphlab
#endif // GRAPHLAB_SFRAME_GROUPBY_AGGREGATE_IMPL_HPP