Skip to content

Commit

Permalink
[FLINK-32974][client] Avoid creating a new temporary directory every …
Browse files Browse the repository at this point in the history
…time for RestClusterClient

This closes apache#23363
  • Loading branch information
wanglijie95 committed Sep 20, 2023
1 parent 5a8321f commit 6034d5f
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ public class RestClusterClient<T> implements ClusterClient<T> {
private static final Logger LOG = LoggerFactory.getLogger(RestClusterClient.class);

private final RestClusterClientConfiguration restClusterClientConfiguration;
private final java.nio.file.Path tempDir;

private final Configuration configuration;

Expand Down Expand Up @@ -200,13 +199,7 @@ public RestClusterClient(Configuration config, T clusterId) throws Exception {
public RestClusterClient(
Configuration config, T clusterId, ClientHighAvailabilityServicesFactory factory)
throws Exception {
this(
config,
null,
clusterId,
new ExponentialWaitStrategy(10L, 2000L),
factory,
Files.createTempDirectory("flink-rest-client-jobgraphs"));
this(config, null, clusterId, new ExponentialWaitStrategy(10L, 2000L), factory);
}

@VisibleForTesting
Expand All @@ -221,39 +214,20 @@ public RestClusterClient(
restClient,
clusterId,
waitStrategy,
Files.createTempDirectory("flink-rest-client-jobgraphs"));
}

@VisibleForTesting
RestClusterClient(
Configuration configuration,
@Nullable RestClient restClient,
T clusterId,
WaitStrategy waitStrategy,
java.nio.file.Path tmpDir)
throws Exception {
this(
configuration,
restClient,
clusterId,
waitStrategy,
DefaultClientHighAvailabilityServicesFactory.INSTANCE,
tmpDir);
DefaultClientHighAvailabilityServicesFactory.INSTANCE);
}

private RestClusterClient(
Configuration configuration,
@Nullable RestClient restClient,
T clusterId,
WaitStrategy waitStrategy,
ClientHighAvailabilityServicesFactory clientHAServicesFactory,
java.nio.file.Path tempDir)
ClientHighAvailabilityServicesFactory clientHAServicesFactory)
throws Exception {
this.configuration = checkNotNull(configuration);

this.restClusterClientConfiguration =
RestClusterClientConfiguration.fromConfiguration(configuration);
this.tempDir = tempDir;

if (restClient != null) {
this.restClient = restClient;
Expand Down Expand Up @@ -359,7 +333,8 @@ public CompletableFuture<JobID> submitJob(@Nonnull JobGraph jobGraph) {
() -> {
try {
final java.nio.file.Path jobGraphFile =
Files.createTempFile(tempDir, "flink-jobgraph", ".bin");
Files.createTempFile(
"flink-jobgraph-" + jobGraph.getJobID(), ".bin");
try (ObjectOutputStream objectOut =
new ObjectOutputStream(
Files.newOutputStream(jobGraphFile))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,18 +219,12 @@ private RestClusterClient<StandaloneClusterId> createRestClusterClient(int port)

private RestClusterClient<StandaloneClusterId> createRestClusterClient(
int port, Configuration clientConfig) throws Exception {
return createRestClusterClient(port, clientConfig, Files.createTempDirectory("flink"));
}

private RestClusterClient<StandaloneClusterId> createRestClusterClient(
int port, Configuration clientConfig, Path tmpDir) throws Exception {
clientConfig.setInteger(RestOptions.PORT, port);
return new RestClusterClient<>(
clientConfig,
createRestClient(),
StandaloneClusterId.getInstance(),
(attempt) -> 0,
tmpDir);
(attempt) -> 0);
}

@Nonnull
Expand Down Expand Up @@ -986,23 +980,32 @@ void testJobSubmissionFailureCauseForwardedToClient() throws Exception {
}

@Test
void testJobGraphFileCleanedUpOnJobSubmissionFailure(@TempDir Path tmp) throws Exception {
void testJobGraphFileCleanedUpOnJobSubmissionFailure() throws Exception {
final Path jobGraphFileDir = getTempDir();
try (final TestRestServerEndpoint restServerEndpoint =
createRestServerEndpoint(new SubmissionFailingHandler())) {
try (RestClusterClient<?> restClusterClient =
createRestClusterClient(
restServerEndpoint.getServerAddress().getPort(),
new Configuration(restConfig),
tmp)) {
createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) {
assertThatThrownBy(() -> restClusterClient.submitJob(jobGraph).join())
.hasCauseInstanceOf(JobSubmissionException.class);
try (Stream<Path> files = Files.list(tmp)) {
assertThat(files).isEmpty();
try (Stream<Path> files = Files.list(jobGraphFileDir)) {
assertThat(files)
.noneMatch(
path ->
path.toString()
.contains(jobGraph.getJobID().toString()));
}
}
}
}

private static Path getTempDir() throws IOException {
Path tempFile = Files.createTempFile("test", ".bin");
Path tempDir = tempFile.getParent();
Files.delete(tempFile);
return tempDir;
}

private final class SubmissionFailingHandler
extends TestHandler<
JobSubmitRequestBody, JobSubmitResponseBody, EmptyMessageParameters> {
Expand Down

0 comments on commit 6034d5f

Please sign in to comment.