Skip to content

Commit

Permalink
feat(bigquery): add support for optimized query path (googleapis#2560)
Browse files Browse the repository at this point in the history
* bigquery: add support for optimized query path.

For users who invoke Read() directly on a Query object,
this change evaluates the query config and runs the query
using an optimized path when possible.  In cases where it's
not possible to run using the optimized path, we fall back
to the existing mechanism which uses jobs.insert and polling.
  • Loading branch information
shollyman authored Sep 18, 2020
1 parent 7469469 commit 3a1457f
Show file tree
Hide file tree
Showing 4 changed files with 207 additions and 5 deletions.
22 changes: 22 additions & 0 deletions bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,28 @@ func (c *Client) insertJob(ctx context.Context, job *bq.Job, media io.Reader) (*
return bqToJob(res, c)
}

// runQuery invokes the optimized query path.
// Due to differences in options it supports, it cannot be used for all existing
// jobs.insert requests that are query jobs.
func (c *Client) runQuery(ctx context.Context, queryRequest *bq.QueryRequest) (*bq.QueryResponse, error) {
call := c.bqs.Jobs.Query(c.projectID, queryRequest)
setClientHeader(call.Header())

var res *bq.QueryResponse
var err error
invoke := func() error {
res, err = call.Do()
return err
}

// We control request ID, so we can always runWithRetry.
err = runWithRetry(ctx, invoke)
if err != nil {
return nil, err
}
return res, nil
}

// Convert a number of milliseconds since the Unix epoch to a time.Time.
// Treat an input of zero specially: convert it to the zero time,
// rather than the start of the epoch.
Expand Down
2 changes: 1 addition & 1 deletion bigquery/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func fetchPage(ctx context.Context, src *rowSource, schema Schema, startIndex ui
// This likely means something more severe, like a problem with schema.
return nil, err
}
// If we failed to fet data from cache, invoke the appropriate service method.
// If we failed to fetch data from cache, invoke the appropriate service method.
if src.j != nil {
return fetchJobResultPage(ctx, src, schema, startIndex, pageSize, pageToken)
}
Expand Down
95 changes: 91 additions & 4 deletions bigquery/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ package bigquery
import (
"context"
"errors"
"fmt"

"cloud.google.com/go/internal/trace"
"cloud.google.com/go/internal/uid"
bq "google.golang.org/api/bigquery/v2"
)

Expand Down Expand Up @@ -324,11 +326,96 @@ func (q *Query) newJob() (*bq.Job, error) {
}

// Read submits a query for execution and returns the results via a RowIterator.
// It is a shorthand for Query.Run followed by Job.Read.
func (q *Query) Read(ctx context.Context) (*RowIterator, error) {
job, err := q.Run(ctx)
// If the request can be satisfied by running using the optimized query path, it
// is used in place of the jobs.insert path as this path does not expose a job
// object.
func (q *Query) Read(ctx context.Context) (it *RowIterator, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Query.Run")
defer func() { trace.EndSpan(ctx, err) }()
queryRequest, err := q.probeFastPath()
if err != nil {
// Any error means we fallback to the older mechanism.
job, err := q.Run(ctx)
if err != nil {
return nil, err
}
return job.Read(ctx)
}
// we have a config, run on fastPath.
resp, err := q.client.runQuery(ctx, queryRequest)
if err != nil {
return nil, err
}
return job.Read(ctx)
// construct a minimal job for backing the row iterator.
minimalJob := &Job{
c: q.client,
jobID: resp.JobReference.JobId,
location: resp.JobReference.Location,
projectID: resp.JobReference.ProjectId,
}
if resp.JobComplete {
rowSource := &rowSource{
j: minimalJob,
// RowIterator can precache results from the iterator to save a lookup.
cachedRows: resp.Rows,
cachedSchema: resp.Schema,
cachedNextToken: resp.PageToken,
}
return newRowIterator(ctx, rowSource, fetchPage), nil
}
// We're on the fastPath, but we need to poll because the job is incomplete.
// Fallback to job-based Read().
return minimalJob.Read(ctx)
}

// probeFastPath is used to attempt configuring a jobs.Query request based on a
// user's Query configuration. If all the options set on the job are supported on the
// faster query path, this method returns a QueryRequest suitable for execution.
func (q *Query) probeFastPath() (*bq.QueryRequest, error) {
// This is a denylist of settings which prevent us from composing an equivalent
// bq.QueryRequest due to differences between configuration parameters accepted
// by jobs.insert vs jobs.query.
if q.QueryConfig.Dst != nil ||
q.QueryConfig.TableDefinitions != nil ||
q.QueryConfig.CreateDisposition != "" ||
q.QueryConfig.WriteDisposition != "" ||
!(q.QueryConfig.Priority == "" || q.QueryConfig.Priority == InteractivePriority) ||
q.QueryConfig.UseLegacySQL ||
q.QueryConfig.MaxBillingTier != 0 ||
q.QueryConfig.TimePartitioning != nil ||
q.QueryConfig.RangePartitioning != nil ||
q.QueryConfig.Clustering != nil ||
q.QueryConfig.DestinationEncryptionConfig != nil ||
q.QueryConfig.SchemaUpdateOptions != nil ||
// User has defined the jobID generation behavior
q.JobIDConfig.JobID != "" {
return nil, fmt.Errorf("QueryConfig incompatible with fastPath")
}
pfalse := false
qRequest := &bq.QueryRequest{
Query: q.QueryConfig.Q,
Location: q.Location,
UseLegacySql: &pfalse,
MaximumBytesBilled: q.QueryConfig.MaxBytesBilled,
RequestId: uid.NewSpace("request", nil).New(),
Labels: q.Labels,
}
if q.QueryConfig.DisableQueryCache {
qRequest.UseQueryCache = &pfalse
}
// Convert query parameters
for _, p := range q.QueryConfig.Parameters {
qp, err := p.toBQ()
if err != nil {
return nil, err
}
qRequest.QueryParameters = append(qRequest.QueryParameters, qp)
}
if q.QueryConfig.DefaultDatasetID != "" {
qRequest.DefaultDataset = &bq.DatasetReference{
ProjectId: q.QueryConfig.DefaultProjectID,
DatasetId: q.QueryConfig.DefaultDatasetID,
}
}
return qRequest, nil
}
93 changes: 93 additions & 0 deletions bigquery/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"cloud.google.com/go/internal/testutil"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
bq "google.golang.org/api/bigquery/v2"
)

Expand Down Expand Up @@ -386,6 +387,98 @@ func TestQuery(t *testing.T) {
}
}

func TestProbeFastPath(t *testing.T) {
c := &Client{
projectID: "client-project-id",
}
pfalse := false
testCases := []struct {
inCfg QueryConfig
inJobCfg JobIDConfig
wantReq *bq.QueryRequest
wantErr bool
}{
{
inCfg: QueryConfig{
Q: "foo",
},
wantReq: &bq.QueryRequest{
Query: "foo",
UseLegacySql: &pfalse,
},
},
{
// All things you can set and still get a successful QueryRequest
inCfg: QueryConfig{
Q: "foo",
DefaultProjectID: "defproject",
DefaultDatasetID: "defdataset",
DisableQueryCache: true,
Priority: InteractivePriority,
MaxBytesBilled: 123,
Parameters: []QueryParameter{
{Name: "user", Value: "bob"},
},
Labels: map[string]string{
"key": "val",
},
},
wantReq: &bq.QueryRequest{
Query: "foo",
DefaultDataset: &bq.DatasetReference{ProjectId: "defproject", DatasetId: "defdataset"},
Labels: map[string]string{
"key": "val",
},
MaximumBytesBilled: 123,
UseLegacySql: &pfalse,
QueryParameters: []*bq.QueryParameter{
{
Name: "user",
ParameterType: &bq.QueryParameterType{Type: "STRING"},
ParameterValue: &bq.QueryParameterValue{Value: "bob"},
},
},
UseQueryCache: &pfalse,
},
},
{
// fail, sets destination via API
inCfg: QueryConfig{
Q: "foo",
Dst: &Table{},
},
wantErr: true,
},
{
// fail, sets specifies destination partitioning
inCfg: QueryConfig{
Q: "foo",
TimePartitioning: &TimePartitioning{},
RangePartitioning: &RangePartitioning{},
},
wantErr: true,
},
{
// fail, sets specifies schema update options
inCfg: QueryConfig{
Q: "foo",
SchemaUpdateOptions: []string{"bar"},
},
wantErr: true,
},
}
for i, tc := range testCases {
in := &Query{tc.inJobCfg, tc.inCfg, c}
gotReq, err := in.probeFastPath()
if tc.wantErr && err == nil {
t.Errorf("case %d wanted error, got nil", i)
}
if diff := testutil.Diff(gotReq, tc.wantReq, cmpopts.IgnoreFields(bq.QueryRequest{}, "RequestId")); diff != "" {
t.Errorf("QueryRequest case %d: -got +want:\n%s", i, diff)
}
}
}

func TestConfiguringQuery(t *testing.T) {
c := &Client{
projectID: "project-id",
Expand Down

0 comments on commit 3a1457f

Please sign in to comment.