Skip to content

Commit

Permalink
[FLINK-25715][runtime] Introduce TestingDispatcherGateway#newBuilder(…
Browse files Browse the repository at this point in the history
…) method
  • Loading branch information
dmvk authored and zentol committed Jan 24, 2022
1 parent 0b8a129 commit 08f86a0
Show file tree
Hide file tree
Showing 15 changed files with 35 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void cleanup() {
@Test
public void testExceptionThrownWhenApplicationContainsNoJobs() throws Throwable {
final TestingDispatcherGateway.Builder dispatcherBuilder =
new TestingDispatcherGateway.Builder()
TestingDispatcherGateway.newBuilder()
.setSubmitFunction(
jobGraph -> CompletableFuture.completedFuture(Acknowledge.get()));

Expand Down Expand Up @@ -223,7 +223,7 @@ public void testApplicationFailsAsSoonAsOneJobFails() throws Throwable {
final ConcurrentLinkedDeque<JobID> submittedJobIds = new ConcurrentLinkedDeque<>();

final TestingDispatcherGateway.Builder dispatcherBuilder =
new TestingDispatcherGateway.Builder()
TestingDispatcherGateway.newBuilder()
.setSubmitFunction(
jobGraph -> {
submittedJobIds.add(jobGraph.getJobID());
Expand Down Expand Up @@ -418,7 +418,7 @@ public void testErrorHandlerIsCalledWhenShutdownThrowsAnException() throws Excep
private void testErrorHandlerIsCalled(Supplier<CompletableFuture<Acknowledge>> shutdownFunction)
throws Exception {
final TestingDispatcherGateway.Builder dispatcherBuilder =
new TestingDispatcherGateway.Builder()
TestingDispatcherGateway.newBuilder()
.setSubmitFunction(
jobGraph -> CompletableFuture.completedFuture(Acknowledge.get()))
.setRequestJobStatusFunction(
Expand Down Expand Up @@ -649,7 +649,7 @@ public void testDuplicateJobSubmissionWithTerminatedJobIdWithUnknownResult() thr
configurationUnderTest.set(
HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name());
final TestingDispatcherGateway.Builder dispatcherBuilder =
new TestingDispatcherGateway.Builder()
TestingDispatcherGateway.newBuilder()
.setSubmitFunction(
jobGraph ->
FutureUtils.completedExceptionally(
Expand Down Expand Up @@ -684,7 +684,7 @@ public void testDuplicateJobSubmissionWithTerminatedJobIdWithUnknownResultAttach
configurationUnderTest.set(
HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name());
final TestingDispatcherGateway.Builder dispatcherBuilder =
new TestingDispatcherGateway.Builder()
TestingDispatcherGateway.newBuilder()
.setSubmitFunction(
jobGraph ->
FutureUtils.completedExceptionally(
Expand Down Expand Up @@ -712,7 +712,7 @@ public void testDuplicateJobSubmissionWithRunningJobId() throws Throwable {
configurationUnderTest.set(
HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name());
final TestingDispatcherGateway.Builder dispatcherBuilder =
new TestingDispatcherGateway.Builder()
TestingDispatcherGateway.newBuilder()
.setSubmitFunction(
jobGraph ->
FutureUtils.completedExceptionally(
Expand Down Expand Up @@ -773,7 +773,7 @@ private TestingDispatcherGateway.Builder runningJobGatewayBuilder() {

private TestingDispatcherGateway.Builder dispatcherGatewayBuilder(JobStatus jobStatus) {
TestingDispatcherGateway.Builder builder =
new TestingDispatcherGateway.Builder()
TestingDispatcherGateway.newBuilder()
.setSubmitFunction(
jobGraph -> CompletableFuture.completedFuture(Acknowledge.get()))
.setRequestJobStatusFunction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
public class RestClusterClientSavepointTriggerTest extends TestLogger {

private static final DispatcherGateway mockRestfulGateway =
new TestingDispatcherGateway.Builder().build();
TestingDispatcherGateway.newBuilder().build();

private static final GatewayRetriever<DispatcherGateway> mockGatewayRetriever =
() -> CompletableFuture.completedFuture(mockRestfulGateway);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@
public class RestClusterClientTest extends TestLogger {

private final DispatcherGateway mockRestfulGateway =
new TestingDispatcherGateway.Builder().build();
TestingDispatcherGateway.newBuilder().build();

private GatewayRetriever<DispatcherGateway> mockGatewayRetriever;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ void applicationsRunInSeparateThreads(@TempDir Path tempDir) throws Exception {
Paths.get(System.getProperty("targetDir")).resolve(JAR_NAME),
tempDir.resolve("app.jar"));

final DispatcherGateway dispatcherGateway = new TestingDispatcherGateway.Builder().build();
final DispatcherGateway dispatcherGateway = TestingDispatcherGateway.newBuilder().build();

final ThreadCapturingApplicationRunner threadCapturingApplicationRunner =
new ThreadCapturingApplicationRunner();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ static void init() throws Exception {
jarDir.resolve("program-without-manifest.jar"));

restfulGateway =
new TestingDispatcherGateway.Builder()
TestingDispatcherGateway.newBuilder()
.setBlobServerPort(BLOB_SERVER_RESOURCE.getBlobServerPort())
.setSubmitFunction(
jobGraph -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void testPlanJar() throws Exception {
private static void runTest(String expectedCapturedStdOut, String expectedCapturedStdErr)
throws Exception {
final TestingDispatcherGateway restfulGateway =
new TestingDispatcherGateway.Builder().build();
TestingDispatcherGateway.newBuilder().build();

final JarHandlers handlers = new JarHandlers(TMP.newFolder().toPath(), restfulGateway);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class JarSubmissionITCase extends TestLogger {
@Test
public void testJarSubmission() throws Exception {
final TestingDispatcherGateway restfulGateway =
new TestingDispatcherGateway.Builder()
TestingDispatcherGateway.newBuilder()
.setBlobServerPort(blobServerResource.getBlobServerPort())
.setSubmitFunction(
jobGraph -> CompletableFuture.completedFuture(Acknowledge.get()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ private static StartStopDispatcherLeaderProcess create(UUID leaderSessionId) {
}

private TestingDispatcherGateway createDispatcherGateway(UUID leaderSessionId) {
return new TestingDispatcherGateway.Builder()
return TestingDispatcherGateway.newBuilder()
.setFencingToken(DispatcherId.fromUuid(leaderSessionId))
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ public void confirmLeaderSessionFuture_completesAfterDispatcherServiceHasBeenSta
final OneShotLatch createDispatcherServiceLatch = new OneShotLatch();
final String dispatcherAddress = "myAddress";
final TestingDispatcherGateway dispatcherGateway =
new TestingDispatcherGateway.Builder().setAddress(dispatcherAddress).build();
TestingDispatcherGateway.newBuilder().setAddress(dispatcherAddress).build();

dispatcherServiceFactory =
TestingDispatcherServiceFactory.newBuilder()
Expand Down Expand Up @@ -397,7 +397,7 @@ public void onRemovedJobGraph_failingRemovalCall_failsFatally() throws Exception
public void onAddedJobGraph_submitsRecoveredJob() throws Exception {
final CompletableFuture<JobGraph> submittedJobFuture = new CompletableFuture<>();
final TestingDispatcherGateway testingDispatcherGateway =
new TestingDispatcherGateway.Builder()
TestingDispatcherGateway.newBuilder()
.setSubmitFunction(
submittedJob -> {
submittedJobFuture.complete(submittedJob);
Expand Down Expand Up @@ -540,7 +540,7 @@ private void runJobRecoveryFailureTest(FlinkException testException) throws Exce
@Test
public void onAddedJobGraph_failingRecoveredJobSubmission_failsFatally() throws Exception {
final TestingDispatcherGateway dispatcherGateway =
new TestingDispatcherGateway.Builder()
TestingDispatcherGateway.newBuilder()
.setSubmitFunction(
jobGraph ->
FutureUtils.completedExceptionally(
Expand All @@ -566,7 +566,7 @@ private void verifyOnAddedJobGraphResultFailsFatally(
public void onAddedJobGraph_duplicateJobSubmissionDueToFalsePositive_willBeIgnored()
throws Exception {
final TestingDispatcherGateway dispatcherGateway =
new TestingDispatcherGateway.Builder()
TestingDispatcherGateway.newBuilder()
.setSubmitFunction(
jobGraph ->
FutureUtils.completedExceptionally(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ public static class Builder {
private Function<JobID, CompletableFuture<Void>> onRemovedJobGraphFunction =
ignored -> FutureUtils.completedVoidFuture();

private DispatcherGateway dispatcherGateway =
new TestingDispatcherGateway.Builder().build();
private DispatcherGateway dispatcherGateway = TestingDispatcherGateway.newBuilder().build();

private CompletableFuture<ApplicationStatus> shutDownFuture = new CompletableFuture<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
public class AbstractHandlerITCase extends TestLogger {

private static final RestfulGateway mockRestfulGateway =
new TestingDispatcherGateway.Builder().build();
TestingDispatcherGateway.newBuilder().build();

private static final GatewayRetriever<RestfulGateway> mockGatewayRetriever =
() -> CompletableFuture.completedFuture(mockRestfulGateway);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void setUp() throws IOException {
initializeFolderStructure();

final TestingDispatcherGateway dispatcherGateway =
new TestingDispatcherGateway.Builder().build();
TestingDispatcherGateway.newBuilder().build();
testInstance =
new JobManagerCustomLogHandler(
() -> CompletableFuture.completedFuture(dispatcherGateway),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public static void setupClass() throws HandlerRequestException {

@Before
public void setUp() {
dispatcherGateway = new TestingDispatcherGateway.Builder().build();
dispatcherGateway = TestingDispatcherGateway.newBuilder().build();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void teardown() throws IOException {
public void testSerializationFailureHandling() throws Exception {
final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
DispatcherGateway mockGateway =
new TestingDispatcherGateway.Builder()
TestingDispatcherGateway.newBuilder()
.setSubmitFunction(
jobGraph -> CompletableFuture.completedFuture(Acknowledge.get()))
.build();
Expand Down Expand Up @@ -140,7 +140,7 @@ public void testSuccessfulJobSubmission() throws Exception {
objectOut.writeObject(JobGraphTestUtils.emptyJobGraph());
}

TestingDispatcherGateway.Builder builder = new TestingDispatcherGateway.Builder();
TestingDispatcherGateway.Builder builder = TestingDispatcherGateway.newBuilder();
builder.setBlobServerPort(blobServer.getPort())
.setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get()))
.setHostname("localhost");
Expand Down Expand Up @@ -178,7 +178,7 @@ public void testRejectionOnCountMismatch() throws Exception {
}
final Path countExceedingFile = TEMPORARY_FOLDER.newFile().toPath();

TestingDispatcherGateway.Builder builder = new TestingDispatcherGateway.Builder();
TestingDispatcherGateway.Builder builder = TestingDispatcherGateway.newBuilder();
builder.setBlobServerPort(blobServer.getPort())
.setSubmitFunction(jobGraph -> CompletableFuture.completedFuture(Acknowledge.get()))
.setHostname("localhost");
Expand Down Expand Up @@ -222,7 +222,7 @@ public void testFileHandling() throws Exception {

CompletableFuture<JobGraph> submittedJobGraphFuture = new CompletableFuture<>();
DispatcherGateway dispatcherGateway =
new TestingDispatcherGateway.Builder()
TestingDispatcherGateway.newBuilder()
.setBlobServerPort(blobServer.getPort())
.setSubmitFunction(
submittedJobGraph -> {
Expand Down Expand Up @@ -282,7 +282,7 @@ public void testFileHandling() throws Exception {
public void testFailedJobSubmission() throws Exception {
final String errorMessage = "test";
DispatcherGateway mockGateway =
new TestingDispatcherGateway.Builder()
TestingDispatcherGateway.newBuilder()
.setSubmitFunction(
jobgraph ->
FutureUtils.completedExceptionally(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@ public CompletableFuture<String> stopWithSavepointAndGetLocation(
return stopWithSavepointAndGetLocationFunction.apply(jobId, targetDirectory);
}

public static Builder newBuilder() {
return new Builder();
}

/** Builder for the {@link TestingDispatcherGateway}. */
public static final class Builder extends TestingRestfulGateway.AbstractBuilder<Builder> {

Expand All @@ -230,6 +234,10 @@ public static final class Builder extends TestingRestfulGateway.AbstractBuilder<
private BiFunction<JobID, String, CompletableFuture<String>>
stopWithSavepointAndGetLocationFunction;

private Builder() {
// No-op.
}

public Builder setSubmitFunction(
Function<JobGraph, CompletableFuture<Acknowledge>> submitFunction) {
this.submitFunction = submitFunction;
Expand Down

0 comments on commit 08f86a0

Please sign in to comment.