Skip to content

Commit

Permalink
Improve streaming sdk integration tests
Browse files Browse the repository at this point in the history
Make streaming sdk integration tests fail quickly if either the service or the
pipeline has an error

----Release Notes----
[]
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=99510332
  • Loading branch information
dpmills authored and davorbonaci committed Aug 6, 2015
1 parent ac51a3c commit e9f687d
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.dataflow.sdk.testing;

import com.google.api.services.dataflow.model.JobMessage;
import com.google.api.services.dataflow.model.JobMetrics;
import com.google.api.services.dataflow.model.MetricUpdate;
import com.google.cloud.dataflow.sdk.Pipeline;
Expand All @@ -38,6 +39,7 @@

import java.io.IOException;
import java.math.BigDecimal;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -111,14 +113,32 @@ public Optional<Boolean> call() throws Exception {
}
}
});
job.waitToFinish(-1L, TimeUnit.SECONDS, messageHandler);
State finalState = job.waitToFinish(10L, TimeUnit.MINUTES, new JobMessagesHandler() {
@Override
public void process(List<JobMessage> messages) {
messageHandler.process(messages);
for (JobMessage message : messages) {
if (message.getMessageImportance() != null
&& message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) {
LOG.info("Dataflow job {} threw exception, cancelling. Exception was: {}",
job.getJobId(), message.getMessageText());
try {
job.cancel();
} catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
}
});
if (finalState == null || finalState == State.RUNNING) {
LOG.info("Dataflow job {} took longer than 10 minutes to complete, cancelling.",
job.getJobId());
job.cancel();
}
result = resultFuture.get();
} else {
job.waitToFinish(-1, TimeUnit.SECONDS, messageHandler);
if (job.getState() != State.DONE) {
// TODO: Get an exception from the remote service.
throw new IllegalStateException("The dataflow failed.");
}
result = checkForSuccess(job);
}
if (!result.isPresent()) {
Expand Down Expand Up @@ -147,42 +167,49 @@ public <OutputT extends POutput, InputT extends PInput> OutputT apply(

Optional<Boolean> checkForSuccess(DataflowPipelineJob job)
throws IOException {
State state = job.getState();
if (state == State.FAILED || state == State.CANCELLED) {
LOG.info("The pipeline failed");
return Optional.of(false);
}

JobMetrics metrics = job.getDataflowClient().projects().jobs()
.getMetrics(job.getProjectId(), job.getJobId()).execute();

if (metrics == null || metrics.getMetrics() == null) {
LOG.warn("Metrics not present for Dataflow job {}.", job.getJobId());
return Optional.absent();
}

int successes = 0;
int failures = 0;
for (MetricUpdate metric : metrics.getMetrics()) {
if (metric.getName() == null || metric.getName().getContext() == null
|| !metric.getName().getContext().containsKey(TENTATIVE_COUNTER)) {
// Don't double count using the non-tentative version of the metric.
continue;
} else {
int successes = 0;
int failures = 0;
for (MetricUpdate metric : metrics.getMetrics()) {
if (metric.getName() == null || metric.getName().getContext() == null
|| !metric.getName().getContext().containsKey(TENTATIVE_COUNTER)) {
// Don't double count using the non-tentative version of the metric.
continue;
}
if (DataflowAssert.SUCCESS_COUNTER.equals(metric.getName().getName())) {
successes += ((BigDecimal) metric.getScalar()).intValue();
} else if (DataflowAssert.FAILURE_COUNTER.equals(metric.getName().getName())) {
failures += ((BigDecimal) metric.getScalar()).intValue();
}
}
if (DataflowAssert.SUCCESS_COUNTER.equals(metric.getName().getName())) {
successes += ((BigDecimal) metric.getScalar()).intValue();
} else if (DataflowAssert.FAILURE_COUNTER.equals(metric.getName().getName())) {
failures += ((BigDecimal) metric.getScalar()).intValue();

if (failures > 0) {
LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of "
+ "{} expected assertions.", job.getJobId(), successes, failures,
expectedNumberOfAssertions);
return Optional.of(false);
} else if (successes >= expectedNumberOfAssertions) {
LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of "
+ "{} expected assertions.", job.getJobId(), successes, failures,
expectedNumberOfAssertions);
return Optional.of(true);
}
}

if (failures > 0) {
LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of "
+ "{} expected assertions.", job.getJobId(), successes, failures,
expectedNumberOfAssertions);
return Optional.of(false);
} else if (successes >= expectedNumberOfAssertions) {
LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of "
+ "{} expected assertions.", job.getJobId(), successes, failures,
expectedNumberOfAssertions);
return Optional.of(true);
LOG.info("Running Dataflow job {}. Found {} success, {} failures out of {} expected "
+ "assertions.", job.getJobId(), successes, failures, expectedNumberOfAssertions);
}
LOG.info("Running Dataflow job {}. Found {} success, {} failures out of {} expected "
+ "assertions.", job.getJobId(), successes, failures, expectedNumberOfAssertions);

return Optional.<Boolean>absent();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.api.client.http.LowLevelHttpResponse;
Expand All @@ -28,6 +31,7 @@
import com.google.api.client.testing.http.MockLowLevelHttpRequest;
import com.google.api.client.testing.http.MockLowLevelHttpResponse;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.JobMessage;
import com.google.api.services.dataflow.model.JobMetrics;
import com.google.api.services.dataflow.model.MetricStructuredName;
import com.google.api.services.dataflow.model.MetricUpdate;
Expand All @@ -38,14 +42,18 @@
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
import com.google.cloud.dataflow.sdk.transforms.Create;
import com.google.cloud.dataflow.sdk.util.GcsUtil;
import com.google.cloud.dataflow.sdk.util.MonitoringUtil;
import com.google.cloud.dataflow.sdk.util.MonitoringUtil.JobMessagesHandler;
import com.google.cloud.dataflow.sdk.util.NoopPathValidator;
import com.google.cloud.dataflow.sdk.util.TestCredential;
import com.google.cloud.dataflow.sdk.util.TimeUtil;
import com.google.cloud.dataflow.sdk.util.Transport;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;

import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -55,8 +63,12 @@
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import java.math.BigDecimal;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;

/** Tests for {@link TestDataflowPipelineRunner}. */
@RunWith(JUnit4.class)
Expand Down Expand Up @@ -184,40 +196,46 @@ public void testRunStreamingJobThatFails() throws Exception {

@Test
public void testCheckingForSuccessWhenDataflowAssertSucceeds() throws Exception {
DataflowPipelineJob job = new DataflowPipelineJob("test-project", "test-job", service, null);
DataflowPipelineJob job =
spy(new DataflowPipelineJob("test-project", "test-job", service, null));
Pipeline p = TestPipeline.create(options);
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
DataflowAssert.that(pc).containsInAnyOrder(1, 2, 3);

TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
when(request.execute()).thenReturn(
generateMockMetricResponse(true /* success */, true /* tentative */));
doReturn(State.DONE).when(job).getState();
assertEquals(Optional.of(true), runner.checkForSuccess(job));
}

@Test
public void testCheckingForSuccessWhenDataflowAssertFails() throws Exception {
DataflowPipelineJob job = new DataflowPipelineJob("test-project", "test-job", service, null);
DataflowPipelineJob job =
spy(new DataflowPipelineJob("test-project", "test-job", service, null));
Pipeline p = TestPipeline.create(options);
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
DataflowAssert.that(pc).containsInAnyOrder(1, 2, 3);

TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
when(request.execute()).thenReturn(
generateMockMetricResponse(false /* success */, true /* tentative */));
doReturn(State.DONE).when(job).getState();
assertEquals(Optional.of(false), runner.checkForSuccess(job));
}

@Test
public void testCheckingForSuccessSkipsNonTentativeMetrics() throws Exception {
DataflowPipelineJob job = new DataflowPipelineJob("test-project", "test-job", service, null);
DataflowPipelineJob job =
spy(new DataflowPipelineJob("test-project", "test-job", service, null));
Pipeline p = TestPipeline.create(options);
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
DataflowAssert.that(pc).containsInAnyOrder(1, 2, 3);

TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
when(request.execute()).thenReturn(
generateMockMetricResponse(true /* success */, false /* tentative */));
doReturn(State.RUNNING).when(job).getState();
assertEquals(Optional.absent(), runner.checkForSuccess(job));
}

Expand All @@ -241,4 +259,59 @@ private LowLevelHttpResponse generateMockMetricResponse(boolean success, boolean
response.setContent(jobMetrics.toPrettyString());
return response;
}

@Test
public void testStreamingPipelineFailsIfServiceFails() throws Exception {
DataflowPipelineJob job =
spy(new DataflowPipelineJob("test-project", "test-job", service, null));
Pipeline p = TestPipeline.create(options);
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
DataflowAssert.that(pc).containsInAnyOrder(1, 2, 3);

TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
when(request.execute()).thenReturn(
generateMockMetricResponse(true /* success */, false /* tentative */));
doReturn(State.FAILED).when(job).getState();
assertEquals(Optional.of(false), runner.checkForSuccess(job));
}

@Test
public void testStreamingPipelineFailsIfException() throws Exception {
expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("The dataflow failed.");

options.setStreaming(true);
Pipeline p = TestPipeline.create(options);
PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
DataflowAssert.that(pc).containsInAnyOrder(1, 2, 3);

DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class);
when(mockJob.getDataflowClient()).thenReturn(service);
when(mockJob.getState()).thenReturn(State.RUNNING);
when(mockJob.getProjectId()).thenReturn("test-project");
when(mockJob.getJobId()).thenReturn("test-job");
when(mockJob.waitToFinish(any(Long.class), any(TimeUnit.class), any(JobMessagesHandler.class)))
.thenAnswer(new Answer<State>() {
@Override
public State answer(InvocationOnMock invocation) {
JobMessage message = new JobMessage();
message.setMessageText("FooException");
message.setTime(TimeUtil.toCloudTime(Instant.now()));
message.setMessageImportance("JOB_MESSAGE_ERROR");
((MonitoringUtil.JobMessagesHandler) invocation.getArguments()[2])
.process(Arrays.asList(message));
return State.CANCELLED;
}
});

DataflowPipelineRunner mockRunner = Mockito.mock(DataflowPipelineRunner.class);
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);

when(request.execute()).thenReturn(
generateMockMetricResponse(false /* success */, true /* tentative */));
TestDataflowPipelineRunner runner = (TestDataflowPipelineRunner) p.getRunner();
runner.run(p, mockRunner);

verify(mockJob).cancel();
}
}

0 comments on commit e9f687d

Please sign in to comment.