Skip to content

Commit

Permalink
Merge pr 331, with some fixups (GoogleCloudPlatform#351)
Browse files Browse the repository at this point in the history
* Check for existence of job before polling. Closes GoogleCloudPlatform#330.

In case of local runners, export job does not exists, check if job
exists before polling for result. This will prevent unnecessary
exceptions, polling in local mode.

* Merge PR-331, and fixups.

* address feedback
  • Loading branch information
peihe authored and dhalperi committed Aug 17, 2016
1 parent ff728c6 commit 92d23ba
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -395,10 +395,6 @@ public static class Bound extends PTransform<PInput, PCollection<TableRow>> {
"Validation of query \"%1$s\" failed. If the query depends on an earlier stage of the"
+ " pipeline, This validation can be disabled using #withoutValidation.";

// The maximum number of retries to poll a BigQuery job in the cleanup phase.
// We expect the jobs have already DONE, and don't need a high max retires.
private static final int CLEANUP_JOB_POLL_MAX_RETRIES = 10;

private Bound() {
this(
null /* name */,
Expand Down Expand Up @@ -600,8 +596,9 @@ void cleanup(PipelineOptions options) throws Exception {
JobReference jobRef = new JobReference()
.setProjectId(executingProject)
.setJobId(getExtractJobId(jobIdToken));
Job extractJob = bqServices.getJobService(bqOptions).pollJob(
jobRef, CLEANUP_JOB_POLL_MAX_RETRIES);

Job extractJob = bqServices.getJobService(bqOptions)
.getJob(jobRef);

Collection<String> extractFiles = null;
if (extractJob != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ Job pollJob(JobReference jobRef, int maxAttempts)
*/
JobStatistics dryRunQuery(String projectId, String query)
throws InterruptedException, IOException;

/**
* Gets the specified {@link Job} by the given {@link JobReference}.
*
* Returns null if the job is not found.
*/
Job getJob(JobReference jobRef) throws IOException, InterruptedException;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,51 @@ public JobStatistics dryRunQuery(String projectId, String query)
Sleeper.DEFAULT,
backoff).getStatistics();
}

/**
* {@inheritDoc}
*
* <p>Retries the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
*
* @throws IOException if it exceeds max RPC retries.
*/
@Override
public Job getJob(JobReference jobRef) throws IOException, InterruptedException {
BackOff backoff =
new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
return getJob(jobRef, Sleeper.DEFAULT, backoff);
}

@VisibleForTesting
public Job getJob(JobReference jobRef, Sleeper sleeper, BackOff backoff)
throws IOException, InterruptedException {
String jobId = jobRef.getJobId();
Exception lastException;
do {
try {
return client.jobs().get(jobRef.getProjectId(), jobId).execute();
} catch (GoogleJsonResponseException e) {
if (errorExtractor.itemNotFound(e)) {
LOG.info("No BigQuery job with job id {} found.", jobId);
return null;
}
LOG.warn(
"Ignoring the error encountered while trying to query the BigQuery job {}",
jobId, e);
lastException = e;
} catch (IOException e) {
LOG.warn(
"Ignoring the error encountered while trying to query the BigQuery job {}",
jobId, e);
lastException = e;
}
} while (nextBackOff(sleeper, backoff));
throw new IOException(
String.format(
"Unable to find BigQuery job: %s, aborting after %d retries.",
jobRef, MAX_RPC_ATTEMPTS),
lastException);
}
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,17 +210,21 @@ private static class FakeJobService implements JobService, Serializable {

private Object[] startJobReturns;
private Object[] pollJobReturns;
private Object[] getJobReturns;
private String executingProject;
// Both counts will be reset back to zeros after serialization.
// This is a work around for DoFn's verifyUnmodified check.
private transient int startJobCallsCount;
private transient int pollJobStatusCallsCount;
private transient int getJobCallsCount;

public FakeJobService() {
this.startJobReturns = new Object[0];
this.pollJobReturns = new Object[0];
this.getJobReturns = new Object[0];
this.startJobCallsCount = 0;
this.pollJobStatusCallsCount = 0;
this.getJobCallsCount = 0;
}

/**
Expand All @@ -234,6 +238,16 @@ public FakeJobService startJobReturns(Object... startJobReturns) {
return this;
}

/**
* Sets the return values to mock {@link JobService#getJob}.
*
* <p>Throws if the {@link Object} is a {@link InterruptedException}, returns otherwise.
*/
public FakeJobService getJobReturns(Object... getJobReturns) {
this.getJobReturns = getJobReturns;
return this;
}

/**
* Sets the return values to mock {@link JobService#pollJob}.
*
Expand Down Expand Up @@ -325,6 +339,32 @@ public JobStatistics dryRunQuery(String projectId, String query)
throws InterruptedException, IOException {
throw new UnsupportedOperationException();
}

@Override
public Job getJob(JobReference jobRef) throws InterruptedException {
if (!Strings.isNullOrEmpty(executingProject)) {
checkArgument(
jobRef.getProjectId().equals(executingProject),
"Project id: %s is not equal to executing project: %s",
jobRef.getProjectId(), executingProject);
}

if (getJobCallsCount < getJobReturns.length) {
Object ret = getJobReturns[getJobCallsCount++];
if (ret == null) {
return null;
} else if (ret instanceof Job) {
return (Job) ret;
} else if (ret instanceof InterruptedException) {
throw (InterruptedException) ret;
} else {
throw new RuntimeException("Unexpected return type: " + ret.getClass());
}
} else {
throw new RuntimeException(
"Exceeded expected number of calls: " + getJobReturns.length);
}
}
}

@Rule public transient ExpectedException thrown = ExpectedException.none();
Expand Down Expand Up @@ -518,7 +558,7 @@ public void testReadFromTable() throws IOException {
FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
.withJobService(new FakeJobService()
.startJobReturns("done", "done")
.pollJobReturns(Status.UNKNOWN)
.getJobReturns((Job) null)
.verifyExecutingProject(bqOptions.getProject()))
.readerReturns(
toJsonString(new TableRow().set("name", "a").set("number", 1)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,62 @@ public void testPollJobUnknown() throws IOException, InterruptedException {
verify(response, times(1)).getContentType();
}

@Test
public void testGetJobSucceeds() throws Exception {
Job testJob = new Job();
testJob.setStatus(new JobStatus());

when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
when(response.getStatusCode()).thenReturn(200);
when(response.getContent()).thenReturn(toStream(testJob));

BigQueryServicesImpl.JobServiceImpl jobService =
new BigQueryServicesImpl.JobServiceImpl(bigquery);
JobReference jobRef = new JobReference()
.setProjectId("projectId")
.setJobId("jobId");
Job job = jobService.getJob(jobRef, Sleeper.DEFAULT, BackOff.ZERO_BACKOFF);

assertEquals(testJob, job);
verify(response, times(1)).getStatusCode();
verify(response, times(1)).getContent();
verify(response, times(1)).getContentType();
}

@Test
public void testGetJobNotFound() throws Exception {
when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
when(response.getStatusCode()).thenReturn(404);

BigQueryServicesImpl.JobServiceImpl jobService =
new BigQueryServicesImpl.JobServiceImpl(bigquery);
JobReference jobRef = new JobReference()
.setProjectId("projectId")
.setJobId("jobId");
Job job = jobService.getJob(jobRef, Sleeper.DEFAULT, BackOff.ZERO_BACKOFF);

assertEquals(null, job);
verify(response, times(1)).getStatusCode();
verify(response, times(1)).getContent();
verify(response, times(1)).getContentType();
}

@Test
public void testGetJobThrows() throws Exception {
when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
when(response.getStatusCode()).thenReturn(401);

BigQueryServicesImpl.JobServiceImpl jobService =
new BigQueryServicesImpl.JobServiceImpl(bigquery);
JobReference jobRef = new JobReference()
.setProjectId("projectId")
.setJobId("jobId");
thrown.expect(IOException.class);
thrown.expectMessage(String.format("Unable to find BigQuery job: %s", jobRef));

jobService.getJob(jobRef, Sleeper.DEFAULT, BackOff.STOP_BACKOFF);
}

@Test
public void testExecuteWithRetries() throws IOException, InterruptedException {
Table testTable = new Table();
Expand Down

0 comments on commit 92d23ba

Please sign in to comment.