Skip to content

Commit

Permalink
[FLINK-7483][blob] prevent cleanup of re-registered jobs
Browse files Browse the repository at this point in the history
When a job is registered, it may have been released before and we thus need to
reset the cleanup timeout again.
Nico Kruber authored and tillrohrmann committed Oct 5, 2017

Verified

This commit was signed with the committer’s verified signature.
tillrohrmann Till Rohrmann
1 parent 4947ee6 commit 40ef908
Showing 2 changed files with 75 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -81,7 +81,8 @@ public class BlobCache extends TimerTask implements BlobService {
/**
* Job reference counters with a time-to-live (TTL).
*/
private static class RefCount {
@VisibleForTesting
static class RefCount {
/**
* Number of references to a job.
*/
@@ -166,6 +167,9 @@ public void registerJob(JobID jobId) {
if (ref == null) {
ref = new RefCount();
jobRefCounters.put(jobId, ref);
} else {
// reset cleanup timeout
ref.keepUntil = -1;
}
++ref.references;
}
@@ -184,7 +188,7 @@ public void releaseJob(JobID jobId) {
RefCount ref = jobRefCounters.get(jobId);

if (ref == null) {
LOG.warn("improper use of releaseJob() without a matching number of registerJob() calls");
LOG.warn("improper use of releaseJob() without a matching number of registerJob() calls for jobId " + jobId);
return;
}

@@ -484,6 +488,16 @@ public BlobClient createClient() throws IOException {
return new BlobClient(serverAddress, blobClientConfig);
}

/**
* Returns the job reference counters - for testing purposes only!
*
* @return job reference counters (internal state!)
*/
@VisibleForTesting
Map<JobID, RefCount> getJobRefCounters() {
return jobRefCounters;
}

/**
* Returns a file handle to the file associated with the given blob key on the blob
* server.
Original file line number Diff line number Diff line change
@@ -20,7 +20,6 @@

import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.TestLogger;

@@ -37,7 +36,9 @@
import java.util.Collection;
import java.util.List;

import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;

/**
* A few tests for the deferred ref-counting based cleanup inside the {@link BlobCache}.
@@ -141,6 +142,63 @@ public void testJobCleanup() throws IOException, InterruptedException {
}
}

/**
* Tests that {@link BlobCache} sets the expected reference counts and cleanup timeouts when
* registering, releasing, and re-registering jobs.
*/
@Test
public void testJobReferences() throws IOException, InterruptedException {

JobID jobId = new JobID();

Configuration config = new Configuration();
config.setString(BlobServerOptions.STORAGE_DIRECTORY,
temporaryFolder.newFolder().getAbsolutePath());
config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 3_600_000L); // 1 hour should effectively prevent races

// NOTE: use fake address - we will not connect to it here
InetSocketAddress serverAddress = new InetSocketAddress("localhost", 12345);

try (BlobCache cache = new BlobCache(serverAddress, config, new VoidBlobStore())) {

// register once
cache.registerJob(jobId);
assertEquals(1, cache.getJobRefCounters().get(jobId).references);
assertEquals(-1, cache.getJobRefCounters().get(jobId).keepUntil);

// register a second time
cache.registerJob(jobId);
assertEquals(2, cache.getJobRefCounters().get(jobId).references);
assertEquals(-1, cache.getJobRefCounters().get(jobId).keepUntil);

// release once
cache.releaseJob(jobId);
assertEquals(1, cache.getJobRefCounters().get(jobId).references);
assertEquals(-1, cache.getJobRefCounters().get(jobId).keepUntil);

// release a second time
long cleanupLowerBound =
System.currentTimeMillis() + config.getLong(BlobServerOptions.CLEANUP_INTERVAL);
cache.releaseJob(jobId);
assertEquals(0, cache.getJobRefCounters().get(jobId).references);
assertThat(cache.getJobRefCounters().get(jobId).keepUntil,
greaterThanOrEqualTo(cleanupLowerBound));

// register again
cache.registerJob(jobId);
assertEquals(1, cache.getJobRefCounters().get(jobId).references);
assertEquals(-1, cache.getJobRefCounters().get(jobId).keepUntil);

// finally release the job
cleanupLowerBound =
System.currentTimeMillis() + config.getLong(BlobServerOptions.CLEANUP_INTERVAL);
cache.releaseJob(jobId);
assertEquals(0, cache.getJobRefCounters().get(jobId).references);
assertThat(cache.getJobRefCounters().get(jobId).keepUntil,
greaterThanOrEqualTo(cleanupLowerBound));
}
}

/**
* Tests that {@link BlobCache} cleans up after calling {@link BlobCache#releaseJob(JobID)}
* but only after preserving the file for a bit longer.

0 comments on commit 40ef908

Please sign in to comment.