forked from mysql/mysql-server
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwindow_iterators.h
262 lines (208 loc) · 11.3 KB
/
window_iterators.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
#ifndef SQL_ITERATORS_WINDOW_ITERATORS_H_
#define SQL_ITERATORS_WINDOW_ITERATORS_H_
/* Copyright (c) 2018, 2024, Oracle and/or its affiliates.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License, version 2.0,
as published by the Free Software Foundation.
This program is designed to work with certain software (including
but not limited to OpenSSL) that is licensed under separate terms,
as designated in a particular file or component or in included license
documentation. The authors of MySQL hereby grant you an additional
permission to link the program and your derivative works with the
separately licensed software that they have either included with
the program or referenced in the documentation.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License, version 2.0, for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
#include "my_alloc.h"
#include "sql/iterators/row_iterator.h"
class JOIN;
class THD;
class Temp_table_param;
class Window;
/*
WindowIterator is similar to AggregateIterator, but deals with windowed
aggregates (i.e., OVER expressions). It deals specifically with aggregates
that don't need to buffer rows.
Window function execution is centered around temporary table materialization;
every window corresponds to exactly one materialization (although the
“materialization” can often be shortcut to streaming). For every window,
we must materialize/evaluate exactly the aggregates that belong to that
window, and no others (earlier ones are just copied from the temporary table
fields, later ones are ignored). Thus, create_tmp_table() has special logic
when materializing a temporary table for a window function; if the
Temp_table_param has m_window set (non-nullptr), we ignore all aggregates that
don't belong to that window. E.g., assume we have foo() OVER w1, bar() OVER
w2, baz() OVER w2, quux() OVER w3, the temporary tables and field lists will
look like:
Temp table | SELECT list
foo() bar() baz() |
before wnd: | foo() bar() baz()
window 1: value ----- ----- | temp_w1.foo bar() baz()
window 2: value value value | temp_w2.foo temp_w2.bar temp_w2.baz
In e.g. step 2, w2.foo is simply copied from w1.foo (through
temp_table_param->copy_fields), while w2.bar and w2.baz are evaluated
from bar() and baz() (through temp_table_param->copy_func).
WindowIterator only takes responsibility for resetting the window functions
on a window boundary; the rest is handled by correct input ordering (typically
through sorting) and delicate ordering of copy_funcs() calls.
(BufferingWindowIterator, below, has more intricate logic for feeding rows
into the window functions, and only stopping to output new rows whenever
process_buffered_windowing_record() signals it is time to do that -- but apart
from that, the separation of concerns is much the same.)
In particular, ordering of copies gets complicated when we have expressions
that depend on window functions, or even window functions from multiple
windows. Say we have something like foo() OVER w1 + bar() OVER w2.
split_sum_funcs() will have made slices for us so that we have separate items
for foo() and bar():
base slice window 1 output window 2 output
0: <ref1> + <ref2> + + temp_w2.+
1: foo() OVER w1 foo() temp_w1.foo temp_w2.foo
2: bar() OVER w2 bar() N/A temp_w2.bar
We first copy fields and non-WF-related functions into the output table,
from the previous slice (e.g., for window 2, we copy temp_w1.foo to
temp_w2.foo); these are always safe. Then, we copy/evaluate the window
functions themselves (#1 or #2, depending on which window we are evaluating).
Finally, we get to the composite item (#0); in order not to evaluate the
window functions anew, the references in the add expression must refer to the
temporary table fields that we just populated, so we need to be in the
_output_ slice. When buffering is active (BufferingWindowIterator), we have
more phases to deal with; it would be good to have this documented as well.
If we are outputting to a temporary table, we take over responsibility
for storing the fields from MaterializeIterator, which would otherwise do it.
*/
class WindowIterator final : public RowIterator {
public:
WindowIterator(THD *thd, unique_ptr_destroy_only<RowIterator> source,
Temp_table_param *temp_table_param, // Includes the window.
JOIN *join, int output_slice);
bool Init() override;
int Read() override;
void SetNullRowFlag(bool is_null_row) override {
m_source->SetNullRowFlag(is_null_row);
}
void StartPSIBatchMode() override { m_source->StartPSIBatchMode(); }
void EndPSIBatchModeIfStarted() override {
m_source->EndPSIBatchModeIfStarted();
}
void UnlockRow() override {
// There's nothing we can do here.
}
private:
/// The iterator we are reading from.
unique_ptr_destroy_only<RowIterator> const m_source;
/// Parameters for the temporary table we are outputting to.
Temp_table_param *m_temp_table_param;
/// The window function itself.
Window *m_window;
/// The join we are a part of.
JOIN *m_join;
/// The slice we will be using when reading rows.
int m_input_slice;
/// The slice we will be using when outputting rows.
int m_output_slice;
};
/**
BufferingWindowIterator is like WindowIterator, but deals with window
functions that need to buffer rows.
If we don't need to buffer rows to evaluate the window functions, execution
is simple; see WindowIterator for details. In that case, we can just evaluate
the window functions as we go here, similar to the non-windowing flow.
If we do need buffering, though, we buffer the row in Read(). Next, we enter a
loop calling process_buffered_windowing_record, and conditionally return
the row. That is, if process_buffered_windowing_record was able to complete
evaluation of a row (cf. output_row_ready), including its window functions
given how much has already been buffered, we return a row, else we read more
rows, and postpone evaluation and returning till we have enough rows in the
buffer.
When we have read a full partition (or reach EOF), we evaluate any remaining
rows. Note that since we have to read one row past the current partition to
detect that that previous row was indeed the last row in a partition, we
need to re-establish the first row of the next partition when we are done
processing the current one. This is because the record will be overwritten
(many times) during evaluation of window functions in the current partition.
Usually [1], for window execution we have two or three tmp tables per
windowing step involved (although not all are always materialized;
they may be just streaming through StreamingIterator):
- The input table, corresponding to the parent iterator. Holds (possibly
sorted) records ready for windowing, sorted on expressions concatenated from
any PARTITION BY and ORDER BY clauses.
- The output table, as given by temp_table_param: where we write the evaluated
records from this step. Note that we may optimize away this last write if
we have no final ORDER BY or DISTINCT.
- If we have buffering, the frame buffer, held by
Window::m_frame_buffer[_param].
[1] This is not always the case. For the first window, if we have no
PARTITION BY or ORDER BY in the window, and there is more than one table
in the join, the logical input can consist of more than one table
(e.g. a NestedLoopIterator).
The first thing we do in Read() is:
We copy fields from IN to OUT (copy_fields), and evaluate non-WF functions
(copy_funcs): those functions then read their arguments from IN and store
their result into their result_field which is a field in OUT.
Then, let's take SUM(A+FLOOR(B)) OVER (ROWS 2 FOLLOWING) as example. Above,
we have stored A and the result of FLOOR in OUT. Now we buffer (save) the row
from OUT into the FB: For that, we copy both field A and FLOOR's result_field
from OUT to FB; a single copy_fields() call handles both copy jobs. Then we
look at the rows we have buffered and may realize that we have enough of the
frame to calculate SUM for a certain row (not necessarily the one we just
buffered; might be an earlier row, in our example it is the row which is 2
rows above the buffered row). If we do, to calculate WFs, we bring back the
frame's rows; which is done by: first copying field A and FLOOR's result_field
back from FB to OUT, thus getting in OUT all that SUM needs (A and FLOOR),
then giving that OUT row to SUM (SUM will then add the row's value to its
total; that happens in copy_funcs). After we have done that on all rows of the
frame, we have the values of SUM ready in OUT, we also restore the row which
owns this SUM value, in the same way as we restored the frame's rows, and we
return from Read() - we're done for this row. However, on the next Read()
call, we loop to check if we can calculate one more row with the frame
we have, and if so, we do, until we can't calculate any more rows -- in which
case we're back to just buffering.
*/
class BufferingWindowIterator final : public RowIterator {
public:
BufferingWindowIterator(
THD *thd, unique_ptr_destroy_only<RowIterator> source,
Temp_table_param *temp_table_param, // Includes the window.
JOIN *join, int output_slice);
bool Init() override;
int Read() override;
void SetNullRowFlag(bool is_null_row) override {
m_source->SetNullRowFlag(is_null_row);
}
void StartPSIBatchMode() override { m_source->StartPSIBatchMode(); }
void EndPSIBatchModeIfStarted() override {
m_source->EndPSIBatchModeIfStarted();
}
void UnlockRow() override {
// There's nothing we can do here.
}
private:
int ReadBufferedRow(bool new_partition_or_eof);
/// The iterator we are reading from.
unique_ptr_destroy_only<RowIterator> const m_source;
/// Parameters for the temporary table we are outputting to.
Temp_table_param *m_temp_table_param;
/// The window function itself.
Window *m_window;
/// The join we are a part of.
JOIN *m_join;
/// The slice we will be using when reading rows.
int m_input_slice;
/// The slice we will be using when outputting rows.
int m_output_slice;
/// If true, we may have more buffered rows to process that need to be
/// checked for before reading more rows from the source.
bool m_possibly_buffered_rows;
/// Whether the last input row started a new partition, and was tucked away
/// to finalize the previous partition; if so, we need to bring it back
/// for processing before we read more rows.
bool m_last_input_row_started_new_partition;
/// Whether we have seen the last input row.
bool m_eof;
};
#endif // SQL_ITERATORS_WINDOW_ITERATORS_H_