forked from googleapis/google-cloud-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathquery.go
522 lines (474 loc) · 18 KB
/
query.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
// Copyright 2015 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bigquery
import (
"context"
"errors"
"fmt"
"time"
"cloud.google.com/go/internal/trace"
"cloud.google.com/go/internal/uid"
bq "google.golang.org/api/bigquery/v2"
)
// QueryConfig holds the configuration for a query job.
type QueryConfig struct {
// Dst is the table into which the results of the query will be written.
// If this field is nil, a temporary table will be created.
Dst *Table
// The query to execute. See https://cloud.google.com/bigquery/query-reference for details.
Q string
// DefaultProjectID and DefaultDatasetID specify the dataset to use for unqualified table names in the query.
// If DefaultProjectID is set, DefaultDatasetID must also be set.
DefaultProjectID string
DefaultDatasetID string
// TableDefinitions describes data sources outside of BigQuery.
// The map keys may be used as table names in the query string.
//
// When a QueryConfig is returned from Job.Config, the map values
// are always of type *ExternalDataConfig.
TableDefinitions map[string]ExternalData
// CreateDisposition specifies the circumstances under which the destination table will be created.
// The default is CreateIfNeeded.
CreateDisposition TableCreateDisposition
// WriteDisposition specifies how existing data in the destination table is treated.
// The default is WriteEmpty.
WriteDisposition TableWriteDisposition
// DisableQueryCache prevents results being fetched from the query cache.
// If this field is false, results are fetched from the cache if they are available.
// The query cache is a best-effort cache that is flushed whenever tables in the query are modified.
// Cached results are only available when TableID is unspecified in the query's destination Table.
// For more information, see https://cloud.google.com/bigquery/querying-data#querycaching
DisableQueryCache bool
// DisableFlattenedResults prevents results being flattened.
// If this field is false, results from nested and repeated fields are flattened.
// DisableFlattenedResults implies AllowLargeResults
// For more information, see https://cloud.google.com/bigquery/docs/data#nested
DisableFlattenedResults bool
// AllowLargeResults allows the query to produce arbitrarily large result tables.
// The destination must be a table.
// When using this option, queries will take longer to execute, even if the result set is small.
// For additional limitations, see https://cloud.google.com/bigquery/querying-data#largequeryresults
AllowLargeResults bool
// Priority specifies the priority with which to schedule the query.
// The default priority is InteractivePriority.
// For more information, see https://cloud.google.com/bigquery/querying-data#batchqueries
Priority QueryPriority
// MaxBillingTier sets the maximum billing tier for a Query.
// Queries that have resource usage beyond this tier will fail (without
// incurring a charge). If this field is zero, the project default will be used.
MaxBillingTier int
// MaxBytesBilled limits the number of bytes billed for
// this job. Queries that would exceed this limit will fail (without incurring
// a charge).
// If this field is less than 1, the project default will be
// used.
MaxBytesBilled int64
// UseStandardSQL causes the query to use standard SQL. The default.
// Deprecated: use UseLegacySQL.
UseStandardSQL bool
// UseLegacySQL causes the query to use legacy SQL.
UseLegacySQL bool
// Parameters is a list of query parameters. The presence of parameters
// implies the use of standard SQL.
// If the query uses positional syntax ("?"), then no parameter may have a name.
// If the query uses named syntax ("@p"), then all parameters must have names.
// It is illegal to mix positional and named syntax.
Parameters []QueryParameter
// TimePartitioning specifies time-based partitioning
// for the destination table.
TimePartitioning *TimePartitioning
// RangePartitioning specifies integer range-based partitioning
// for the destination table.
RangePartitioning *RangePartitioning
// Clustering specifies the data clustering configuration for the destination table.
Clustering *Clustering
// The labels associated with this job.
Labels map[string]string
// If true, don't actually run this job. A valid query will return a mostly
// empty response with some processing statistics, while an invalid query will
// return the same error it would if it wasn't a dry run.
//
// Query.Read will fail with dry-run queries. Call Query.Run instead, and then
// call LastStatus on the returned job to get statistics. Calling Status on a
// dry-run job will fail.
DryRun bool
// Custom encryption configuration (e.g., Cloud KMS keys).
DestinationEncryptionConfig *EncryptionConfig
// Allows the schema of the destination table to be updated as a side effect of
// the query job.
SchemaUpdateOptions []string
// CreateSession will trigger creation of a new session when true.
CreateSession bool
// ConnectionProperties are optional key-values settings.
ConnectionProperties []*ConnectionProperty
// Sets a best-effort deadline on a specific job. If job execution exceeds this
// timeout, BigQuery may attempt to cancel this work automatically.
//
// This deadline cannot be adjusted or removed once the job is created. Consider
// using Job.Cancel in situations where you need more dynamic behavior.
//
// Experimental: this option is experimental and may be modified or removed in future versions,
// regardless of any other documented package stability guarantees.
JobTimeout time.Duration
// Force usage of Storage API if client is available. For test scenarios
forceStorageAPI bool
}
func (qc *QueryConfig) toBQ() (*bq.JobConfiguration, error) {
qconf := &bq.JobConfigurationQuery{
Query: qc.Q,
CreateDisposition: string(qc.CreateDisposition),
WriteDisposition: string(qc.WriteDisposition),
AllowLargeResults: qc.AllowLargeResults,
Priority: string(qc.Priority),
MaximumBytesBilled: qc.MaxBytesBilled,
TimePartitioning: qc.TimePartitioning.toBQ(),
RangePartitioning: qc.RangePartitioning.toBQ(),
Clustering: qc.Clustering.toBQ(),
DestinationEncryptionConfiguration: qc.DestinationEncryptionConfig.toBQ(),
SchemaUpdateOptions: qc.SchemaUpdateOptions,
CreateSession: qc.CreateSession,
}
if len(qc.TableDefinitions) > 0 {
qconf.TableDefinitions = make(map[string]bq.ExternalDataConfiguration)
}
for name, data := range qc.TableDefinitions {
qconf.TableDefinitions[name] = data.toBQ()
}
if qc.DefaultProjectID != "" || qc.DefaultDatasetID != "" {
qconf.DefaultDataset = &bq.DatasetReference{
DatasetId: qc.DefaultDatasetID,
ProjectId: qc.DefaultProjectID,
}
}
if tier := int64(qc.MaxBillingTier); tier > 0 {
qconf.MaximumBillingTier = &tier
}
f := false
if qc.DisableQueryCache {
qconf.UseQueryCache = &f
}
if qc.DisableFlattenedResults {
qconf.FlattenResults = &f
// DisableFlattenResults implies AllowLargeResults.
qconf.AllowLargeResults = true
}
if qc.UseStandardSQL && qc.UseLegacySQL {
return nil, errors.New("bigquery: cannot provide both UseStandardSQL and UseLegacySQL")
}
if len(qc.Parameters) > 0 && qc.UseLegacySQL {
return nil, errors.New("bigquery: cannot provide both Parameters (implying standard SQL) and UseLegacySQL")
}
ptrue := true
pfalse := false
if qc.UseLegacySQL {
qconf.UseLegacySql = &ptrue
} else {
qconf.UseLegacySql = &pfalse
}
if qc.Dst != nil && !qc.Dst.implicitTable() {
qconf.DestinationTable = qc.Dst.toBQ()
}
for _, p := range qc.Parameters {
qp, err := p.toBQ()
if err != nil {
return nil, err
}
qconf.QueryParameters = append(qconf.QueryParameters, qp)
}
if len(qc.ConnectionProperties) > 0 {
bqcp := make([]*bq.ConnectionProperty, len(qc.ConnectionProperties))
for k, v := range qc.ConnectionProperties {
bqcp[k] = v.toBQ()
}
qconf.ConnectionProperties = bqcp
}
jc := &bq.JobConfiguration{
Labels: qc.Labels,
DryRun: qc.DryRun,
Query: qconf,
}
if qc.JobTimeout > 0 {
jc.JobTimeoutMs = qc.JobTimeout.Milliseconds()
}
return jc, nil
}
func bqToQueryConfig(q *bq.JobConfiguration, c *Client) (*QueryConfig, error) {
qq := q.Query
qc := &QueryConfig{
Labels: q.Labels,
DryRun: q.DryRun,
JobTimeout: time.Duration(q.JobTimeoutMs) * time.Millisecond,
Q: qq.Query,
CreateDisposition: TableCreateDisposition(qq.CreateDisposition),
WriteDisposition: TableWriteDisposition(qq.WriteDisposition),
AllowLargeResults: qq.AllowLargeResults,
Priority: QueryPriority(qq.Priority),
MaxBytesBilled: qq.MaximumBytesBilled,
UseLegacySQL: qq.UseLegacySql == nil || *qq.UseLegacySql,
TimePartitioning: bqToTimePartitioning(qq.TimePartitioning),
RangePartitioning: bqToRangePartitioning(qq.RangePartitioning),
Clustering: bqToClustering(qq.Clustering),
DestinationEncryptionConfig: bqToEncryptionConfig(qq.DestinationEncryptionConfiguration),
SchemaUpdateOptions: qq.SchemaUpdateOptions,
CreateSession: qq.CreateSession,
}
qc.UseStandardSQL = !qc.UseLegacySQL
if len(qq.TableDefinitions) > 0 {
qc.TableDefinitions = make(map[string]ExternalData)
}
for name, qedc := range qq.TableDefinitions {
edc, err := bqToExternalDataConfig(&qedc)
if err != nil {
return nil, err
}
qc.TableDefinitions[name] = edc
}
if qq.DefaultDataset != nil {
qc.DefaultProjectID = qq.DefaultDataset.ProjectId
qc.DefaultDatasetID = qq.DefaultDataset.DatasetId
}
if qq.MaximumBillingTier != nil {
qc.MaxBillingTier = int(*qq.MaximumBillingTier)
}
if qq.UseQueryCache != nil && !*qq.UseQueryCache {
qc.DisableQueryCache = true
}
if qq.FlattenResults != nil && !*qq.FlattenResults {
qc.DisableFlattenedResults = true
}
if qq.DestinationTable != nil {
qc.Dst = bqToTable(qq.DestinationTable, c)
}
for _, qp := range qq.QueryParameters {
p, err := bqToQueryParameter(qp)
if err != nil {
return nil, err
}
qc.Parameters = append(qc.Parameters, p)
}
if len(qq.ConnectionProperties) > 0 {
props := make([]*ConnectionProperty, len(qq.ConnectionProperties))
for k, v := range qq.ConnectionProperties {
props[k] = bqToConnectionProperty(v)
}
qc.ConnectionProperties = props
}
return qc, nil
}
// QueryPriority specifies a priority with which a query is to be executed.
type QueryPriority string
const (
// BatchPriority specifies that the query should be scheduled with the
// batch priority. BigQuery queues each batch query on your behalf, and
// starts the query as soon as idle resources are available, usually within
// a few minutes. If BigQuery hasn't started the query within 24 hours,
// BigQuery changes the job priority to interactive. Batch queries don't
// count towards your concurrent rate limit, which can make it easier to
// start many queries at once.
//
// More information can be found at https://cloud.google.com/bigquery/docs/running-queries#batchqueries.
BatchPriority QueryPriority = "BATCH"
// InteractivePriority specifies that the query should be scheduled with
// interactive priority, which means that the query is executed as soon as
// possible. Interactive queries count towards your concurrent rate limit
// and your daily limit. It is the default priority with which queries get
// executed.
//
// More information can be found at https://cloud.google.com/bigquery/docs/running-queries#queries.
InteractivePriority QueryPriority = "INTERACTIVE"
)
// A Query queries data from a BigQuery table. Use Client.Query to create a Query.
type Query struct {
JobIDConfig
QueryConfig
client *Client
}
// Query creates a query with string q.
// The returned Query may optionally be further configured before its Run method is called.
func (c *Client) Query(q string) *Query {
return &Query{
client: c,
QueryConfig: QueryConfig{Q: q},
}
}
// Run initiates a query job.
func (q *Query) Run(ctx context.Context) (j *Job, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Query.Run")
defer func() { trace.EndSpan(ctx, err) }()
job, err := q.newJob()
if err != nil {
return nil, err
}
j, err = q.client.insertJob(ctx, job, nil)
if err != nil {
return nil, err
}
return j, nil
}
func (q *Query) newJob() (*bq.Job, error) {
config, err := q.QueryConfig.toBQ()
if err != nil {
return nil, err
}
return &bq.Job{
JobReference: q.JobIDConfig.createJobRef(q.client),
Configuration: config,
}, nil
}
// Read submits a query for execution and returns the results via a RowIterator.
// If the request can be satisfied by running using the optimized query path, it
// is used in place of the jobs.insert path as this path does not expose a job
// object.
func (q *Query) Read(ctx context.Context) (it *RowIterator, err error) {
if q.QueryConfig.DryRun {
return nil, errors.New("bigquery: cannot evaluate Query.Read() for dry-run queries")
}
ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Query.Run")
defer func() { trace.EndSpan(ctx, err) }()
queryRequest, err := q.probeFastPath()
if err != nil {
// Any error means we fallback to the older mechanism.
job, err := q.Run(ctx)
if err != nil {
return nil, err
}
return job.Read(ctx)
}
// we have a config, run on fastPath.
resp, err := q.client.runQuery(ctx, queryRequest)
if err != nil {
return nil, err
}
// construct a minimal job for backing the row iterator.
var minimalJob *Job
if resp.JobReference != nil {
minimalJob = &Job{
c: q.client,
jobID: resp.JobReference.JobId,
location: resp.JobReference.Location,
projectID: resp.JobReference.ProjectId,
}
}
if resp.JobComplete {
// If more pages are available, discard and use the Storage API instead
if resp.PageToken != "" && q.client.isStorageReadAvailable() {
it, err = newStorageRowIteratorFromJob(ctx, minimalJob)
if err == nil {
return it, nil
}
}
rowSource := &rowSource{
j: minimalJob,
// RowIterator can precache results from the iterator to save a lookup.
cachedRows: resp.Rows,
cachedSchema: resp.Schema,
cachedNextToken: resp.PageToken,
}
return newRowIterator(ctx, rowSource, fetchPage), nil
}
// We're on the fastPath, but we need to poll because the job is incomplete.
// Fallback to job-based Read().
//
// (Issue 2937) In order to satisfy basic probing of the job in classic path,
// we need to supply additional config which is probed for presence, not contents.
//
minimalJob.config = &bq.JobConfiguration{
Query: &bq.JobConfigurationQuery{},
}
return minimalJob.Read(ctx)
}
// probeFastPath is used to attempt configuring a jobs.Query request based on a
// user's Query configuration. If all the options set on the job are supported on the
// faster query path, this method returns a QueryRequest suitable for execution.
func (q *Query) probeFastPath() (*bq.QueryRequest, error) {
if q.forceStorageAPI && q.client.isStorageReadAvailable() {
return nil, fmt.Errorf("force Storage API usage")
}
// This is a denylist of settings which prevent us from composing an equivalent
// bq.QueryRequest due to differences between configuration parameters accepted
// by jobs.insert vs jobs.query.
if q.QueryConfig.Dst != nil ||
q.QueryConfig.TableDefinitions != nil ||
q.QueryConfig.CreateDisposition != "" ||
q.QueryConfig.WriteDisposition != "" ||
!(q.QueryConfig.Priority == "" || q.QueryConfig.Priority == InteractivePriority) ||
q.QueryConfig.UseLegacySQL ||
q.QueryConfig.MaxBillingTier != 0 ||
q.QueryConfig.TimePartitioning != nil ||
q.QueryConfig.RangePartitioning != nil ||
q.QueryConfig.Clustering != nil ||
q.QueryConfig.DestinationEncryptionConfig != nil ||
q.QueryConfig.SchemaUpdateOptions != nil ||
q.QueryConfig.JobTimeout != 0 ||
// User has defined the jobID generation behavior
q.JobIDConfig.JobID != "" {
return nil, fmt.Errorf("QueryConfig incompatible with fastPath")
}
pfalse := false
qRequest := &bq.QueryRequest{
Query: q.QueryConfig.Q,
CreateSession: q.CreateSession,
Location: q.Location,
UseLegacySql: &pfalse,
MaximumBytesBilled: q.QueryConfig.MaxBytesBilled,
RequestId: uid.NewSpace("request", nil).New(),
Labels: q.Labels,
}
if q.QueryConfig.DisableQueryCache {
qRequest.UseQueryCache = &pfalse
}
// Convert query parameters
for _, p := range q.QueryConfig.Parameters {
qp, err := p.toBQ()
if err != nil {
return nil, err
}
qRequest.QueryParameters = append(qRequest.QueryParameters, qp)
}
if q.QueryConfig.DefaultDatasetID != "" {
qRequest.DefaultDataset = &bq.DatasetReference{
ProjectId: q.QueryConfig.DefaultProjectID,
DatasetId: q.QueryConfig.DefaultDatasetID,
}
}
if q.client.enableQueryPreview {
qRequest.JobCreationMode = "JOB_CREATION_OPTIONAL"
}
return qRequest, nil
}
// ConnectionProperty represents a single key and value pair that can be sent alongside a query request or load job.
type ConnectionProperty struct {
// Name of the connection property to set.
Key string
// Value of the connection property.
Value string
}
func (cp *ConnectionProperty) toBQ() *bq.ConnectionProperty {
if cp == nil {
return nil
}
return &bq.ConnectionProperty{
Key: cp.Key,
Value: cp.Value,
}
}
func bqToConnectionProperty(in *bq.ConnectionProperty) *ConnectionProperty {
if in == nil {
return nil
}
return &ConnectionProperty{
Key: in.Key,
Value: in.Value,
}
}