Skip to content

Commit

Permalink
[FLINK-7055][blob] refactor getURL() to the more generic getFile()
Browse files Browse the repository at this point in the history
The fact that we always returned URL objects is a relic of the BlobServer's only
use for URLClassLoader. Since we'd like to extend its use, returning File
objects instead is more generic.

This closes apache#4236.
  • Loading branch information
Nico Kruber authored and tillrohrmann committed Aug 14, 2017
1 parent f78eb0f commit b7c1dfa
Show file tree
Hide file tree
Showing 11 changed files with 44 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ protected void respondAsLeader(final ChannelHandlerContext ctx, final Routed rou
lastSubmittedFile.put(taskManagerID, blobKey);
}
try {
return blobCache.getURL(blobKey).getFile();
return blobCache.getFile(blobKey).getAbsolutePath();
} catch (IOException e) {
throw new FlinkFutureException("Could not retrieve blob for " + blobKey + '.', e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.apache.flink.util.Preconditions.checkArgument;
Expand All @@ -39,7 +38,7 @@
/**
* The BLOB cache implements a local cache for content-addressable BLOBs.
*
* <p>When requesting BLOBs through the {@link BlobCache#getURL} methods, the
* <p>When requesting BLOBs through the {@link BlobCache#getFile(BlobKey)} method, the
* BLOB cache will first attempt to serve the file from its local cache. Only if
* the local cache does not contain the desired BLOB, the BLOB cache will try to
* download it from a distributed file system (if available) or the BLOB
Expand Down Expand Up @@ -111,21 +110,22 @@ public BlobCache(
}

/**
* Returns the URL for the BLOB with the given key. The method will first attempt to serve
* Returns local copy of the file for the BLOB with the given key. The method will first attempt to serve
* the BLOB from its local cache. If the BLOB is not in the cache, the method will try to download it
* from this cache's BLOB server.
*
* @param requiredBlob The key of the desired BLOB.
* @return URL referring to the local storage location of the BLOB.
* @return file referring to the local storage location of the BLOB.
* @throws IOException Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server.
*/
public URL getURL(final BlobKey requiredBlob) throws IOException {
@Override
public File getFile(final BlobKey requiredBlob) throws IOException {
checkArgument(requiredBlob != null, "BLOB key cannot be null.");

final File localJarFile = BlobUtils.getStorageLocation(storageDir, requiredBlob);

if (localJarFile.exists()) {
return localJarFile.toURI().toURL();
return localJarFile;
}

// first try the distributed blob store (if available)
Expand All @@ -136,7 +136,7 @@ public URL getURL(final BlobKey requiredBlob) throws IOException {
}

if (localJarFile.exists()) {
return localJarFile.toURI().toURL();
return localJarFile;
}

// fallback: download from the BlobServer
Expand All @@ -160,7 +160,7 @@ public URL getURL(final BlobKey requiredBlob) throws IOException {
}

// success, we finished
return localJarFile.toURI().toURL();
return localJarFile;
}
catch (Throwable t) {
String message = "Failed to fetch BLOB " + requiredBlob + " from " + serverAddress +
Expand Down Expand Up @@ -188,6 +188,7 @@ public URL getURL(final BlobKey requiredBlob) throws IOException {
* Deletes the file associated with the given key from the BLOB cache.
* @param key referring to the file to be deleted
*/
@Override
public void delete(BlobKey key) throws IOException{
final File localFile = BlobUtils.getStorageLocation(storageDir, key);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -334,22 +333,23 @@ public BlobClient createClient() throws IOException {
}

/**
* Method which retrieves the URL of a file associated with a blob key. The blob server looks
* the blob key up in its local storage. If the file exists, then the URL is returned. If the
* file does not exist, then a FileNotFoundException is thrown.
* Method which retrieves the local path of a file associated with a blob key. The blob server
* looks the blob key up in its local storage. If the file exists, it is returned. If the
* file does not exist, it is retrieved from the HA blob store (if available) or a
* FileNotFoundException is thrown.
*
* @param requiredBlob blob key associated with the requested file
* @return URL of the file
* @throws IOException
* @return file referring to the local storage location of the BLOB.
* @throws IOException Thrown if the file retrieval failed.
*/
@Override
public URL getURL(BlobKey requiredBlob) throws IOException {
public File getFile(BlobKey requiredBlob) throws IOException {
checkArgument(requiredBlob != null, "BLOB key cannot be null.");

final File localFile = BlobUtils.getStorageLocation(storageDir, requiredBlob);

if (localFile.exists()) {
return localFile.toURI().toURL();
return localFile;
}
else {
try {
Expand All @@ -361,7 +361,7 @@ public URL getURL(BlobKey requiredBlob) throws IOException {
}

if (localFile.exists()) {
return localFile.toURI().toURL();
return localFile;
}
else {
throw new FileNotFoundException("Local file " + localFile + " does not exist " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,23 @@
package org.apache.flink.runtime.blob;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.net.URL;

/**
* A simple store and retrieve binary large objects (BLOBs).
*/
public interface BlobService extends Closeable {

/**
* Returns the URL of the file associated with the provided blob key.
* Returns the path to a local copy of the file associated with the provided blob key.
*
* @param key blob key associated with the requested file
* @return The URL to the file.
* @return The path to the file.
* @throws java.io.FileNotFoundException when the path does not exist;
* @throws IOException if any other error occurs when retrieving the file
*/
URL getURL(BlobKey key) throws IOException;
File getFile(BlobKey key) throws IOException;


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ public static ClassLoader retrieveClassLoader(
int pos = 0;
for (BlobKey blobKey : props.requiredJarFiles()) {
try {
allURLs[pos++] = blobClient.getURL(blobKey);
allURLs[pos++] = blobClient.getFile(blobKey).toURI().toURL();
} catch (Exception e) {
try {
blobClient.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ private URL registerReferenceToBlobKeyAndGetURL(BlobKey key) throws IOException
// it is important that we fetch the URL before increasing the counter.
// in case the URL cannot be created (failed to fetch the BLOB), we have no stale counter
try {
URL url = blobService.getURL(key);
URL url = blobService.getFile(key).toURI().toURL();

Integer references = blobKeyReferenceCounters.get(key);
int newReferences = references == null ? 1 : references + 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ private void testBlobFetchRetries(final Configuration config, final BlobStore bl
cache = new BlobCache(serverAddress, config, new VoidBlobStore());

// trigger a download - it should fail the first two times, but retry, and succeed eventually
URL url = cache.getURL(key);
URL url = cache.getFile(key).toURI().toURL();
InputStream is = url.openStream();
try {
byte[] received = new byte[data.length];
Expand Down Expand Up @@ -211,7 +211,7 @@ private void testBlobFetchWithTooManyFailures(final Configuration config, final

// trigger a download - it should fail eventually
try {
cache.getURL(key);
cache.getFile(key);
fail("This should fail");
}
catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,12 @@
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

/**
* This class contains unit tests for the {@link BlobCache}.
Expand Down Expand Up @@ -175,7 +172,7 @@ private void uploadFileGetTest(final Configuration config, boolean shutdownServe
blobCache = new BlobCache(serverAddress, cacheConfig, blobStoreService);

for (BlobKey blobKey : blobKeys) {
blobCache.getURL(blobKey);
blobCache.getFile(blobKey);
}

if (blobServer != null) {
Expand All @@ -184,28 +181,20 @@ private void uploadFileGetTest(final Configuration config, boolean shutdownServe
blobServer = null;
}

final URL[] urls = new URL[blobKeys.size()];
final File[] files = new File[blobKeys.size()];

for(int i = 0; i < blobKeys.size(); i++){
urls[i] = blobCache.getURL(blobKeys.get(i));
files[i] = blobCache.getFile(blobKeys.get(i));
}

// Verify the result
assertEquals(blobKeys.size(), urls.length);
assertEquals(blobKeys.size(), files.length);

for (final URL url : urls) {
for (final File file : files) {
assertNotNull(file);

assertNotNull(url);

try {
final File cachedFile = new File(url.toURI());

assertTrue(cachedFile.exists());
assertEquals(buf.length, cachedFile.length());

} catch (URISyntaxException e) {
fail(e.getMessage());
}
assertTrue(file.exists());
assertEquals(buf.length, file.length());
}
} finally {
if (blobServer != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void testDeleteSingleByBlobKey() {
// delete a file directly on the server
server.delete(key2);
try {
server.getURL(key2);
server.getFile(key2);
fail("BLOB should have been deleted");
}
catch (IOException e) {
Expand Down Expand Up @@ -209,7 +209,7 @@ public void testDeleteByBlobKeyFails() {
server.delete(key);

// the file should still be there
server.getURL(key);
server.getFile(key);
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,13 @@ public void testLibraryCacheManagerJobCleanup() throws IOException, InterruptedE
assertEquals(0, checkFilesExist(keys, server, false));

try {
server.getURL(keys.get(0));
server.getFile(keys.get(0));
fail("name-addressable BLOB should have been deleted");
} catch (IOException e) {
// expected
}
try {
server.getURL(keys.get(1));
server.getFile(keys.get(1));
fail("name-addressable BLOB should have been deleted");
} catch (IOException e) {
// expected
Expand Down Expand Up @@ -150,7 +150,7 @@ public void testLibraryCacheManagerJobCleanup() throws IOException, InterruptedE
* @param doThrow
* whether exceptions should be ignored (<tt>false</tt>), or throws (<tt>true</tt>)
*
* @return number of files we were able to retrieve via {@link BlobService#getURL(BlobKey)}
* @return number of files we were able to retrieve via {@link BlobService#getFile(BlobKey)}
*/
private static int checkFilesExist(
List<BlobKey> keys, BlobService blobService, boolean doThrow)
Expand All @@ -159,7 +159,7 @@ private static int checkFilesExist(

for (BlobKey key : keys) {
try {
blobService.getURL(key);
blobService.getFile(key);
++numFiles;
} catch (IOException e) {
if (doThrow) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void testRecoveryRegisterAndDownload() throws Exception {
libServer[0].registerTask(jobId, executionId, keys, Collections.<URL>emptyList());

// Verify key 1
File f = new File(cache.getURL(keys.get(0)).toURI());
File f = cache.getFile(keys.get(0));
assertEquals(expected.length, f.length());

try (FileInputStream fis = new FileInputStream(f)) {
Expand All @@ -126,7 +126,7 @@ public void testRecoveryRegisterAndDownload() throws Exception {
libCache = new BlobLibraryCacheManager(cache, 3600 * 1000);

// Verify key 1
f = new File(cache.getURL(keys.get(0)).toURI());
f = cache.getFile(keys.get(0));
assertEquals(expected.length, f.length());

try (FileInputStream fis = new FileInputStream(f)) {
Expand All @@ -138,7 +138,7 @@ public void testRecoveryRegisterAndDownload() throws Exception {
}

// Verify key 2
f = new File(cache.getURL(keys.get(1)).toURI());
f = cache.getFile(keys.get(1));
assertEquals(256, f.length());

try (FileInputStream fis = new FileInputStream(f)) {
Expand Down

0 comments on commit b7c1dfa

Please sign in to comment.