From 24db045fc1936ff1bc5069bdb76c2fd654f6ab25 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Thu, 26 Jan 2017 20:29:58 +0100 Subject: [PATCH] [FLINK-5666] [tests] Add blob server clean up tests Previously, deleting in HA mode was only tested with a local file system. This verifies that the delete still works on HDFS. This closes #3222. --- .../org/apache/flink/hdfstests/HDFSTest.java | 19 +++++++++ .../runtime/blob/BlobRecoveryITCase.java | 40 +++++++++++++++---- .../BlobLibraryCacheRecoveryITCase.java | 8 ++-- 3 files changed, 55 insertions(+), 12 deletions(-) diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java index 1df6390ae5976..49db0f87ecfc1 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java @@ -24,10 +24,14 @@ import org.apache.flink.api.java.ExecutionEnvironmentFactory; import org.apache.flink.api.java.LocalEnvironment; import org.apache.flink.api.java.io.AvroOutputFormat; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.examples.java.wordcount.WordCount; +import org.apache.flink.runtime.blob.BlobRecoveryITCase; import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.util.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; @@ -217,6 +221,21 @@ public void testDeletePathIfEmpty() throws IOException { assertFalse(fs.exists(directory)); } + /** + * Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed JARs are recoverable from any + * participating BlobServer. + */ + @Test + public void testBlobServerRecovery() throws Exception { + org.apache.flink.configuration.Configuration + config = new org.apache.flink.configuration.Configuration(); + config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); + config.setString(ConfigConstants.STATE_BACKEND, "ZOOKEEPER"); + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI); + + BlobRecoveryITCase.testBlobServerRecovery(config); + } + // package visible static abstract class DopOneTestEnvironment extends ExecutionEnvironment { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java index 3fe207e514413..a8eb1d34b3561 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java @@ -23,18 +23,24 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.io.File; +import java.io.IOException; import java.io.InputStream; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Arrays; import java.util.Random; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class BlobRecoveryITCase { @@ -61,6 +67,16 @@ public void cleanUp() throws Exception { */ @Test public void testBlobServerRecovery() throws Exception { + Configuration config = new Configuration(); + config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); + config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM"); + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, recoveryDir.getPath()); + + testBlobServerRecovery(config); + } + + public static void testBlobServerRecovery(final Configuration config) throws IOException { + String storagePath = config.getString(HighAvailabilityOptions.HA_STORAGE_PATH); Random rand = new Random(); BlobServer[] server = new BlobServer[2]; @@ -68,10 +84,6 @@ public void testBlobServerRecovery() throws Exception { BlobClient client = null; try { - Configuration config = new Configuration(); - config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); - config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM"); - config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, recoveryDir.getPath()); for (int i = 0; i < server.length; i++) { server[i] = new BlobServer(config); @@ -96,6 +108,11 @@ public void testBlobServerRecovery() throws Exception { client.put(jobId[0], testKey[0], expected); // Request 3 client.put(jobId[1], testKey[1], expected, 32, 256); // Request 4 + // check that the storage directory exists + final Path blobServerPath = new Path(storagePath, "blob"); + FileSystem fs = blobServerPath.getFileSystem(); + assertTrue("Unknown storage dir: " + blobServerPath, fs.exists(blobServerPath)); + // Close the client and connect to the other server client.close(); client = new BlobClient(serverAddress[1], config); @@ -146,6 +163,17 @@ public void testBlobServerRecovery() throws Exception { client.delete(keys[1]); client.delete(jobId[0], testKey[0]); client.delete(jobId[1], testKey[1]); + + // Verify everything is clean + if (fs.exists(blobServerPath)) { + final org.apache.flink.core.fs.FileStatus[] recoveryFiles = + fs.listStatus(blobServerPath); + ArrayList filenames = new ArrayList(recoveryFiles.length); + for (org.apache.flink.core.fs.FileStatus file: recoveryFiles) { + filenames.add(file.toString()); + } + fail("Unclean state backend: " + filenames); + } } finally { for (BlobServer s : server) { @@ -158,9 +186,5 @@ public void testBlobServerRecovery() throws Exception { client.close(); } } - - // Verify everything is clean - File[] recoveryFiles = recoveryDir.listFiles(); - assertEquals("Unclean state backend: " + Arrays.toString(recoveryFiles), 0, recoveryFiles.length); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java index 8fabdf690f783..a727d51bccb60 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java @@ -143,6 +143,10 @@ public void testRecoveryRegisterAndDownload() throws Exception { client.delete(keys.get(0)); client.delete(keys.get(1)); } + + // Verify everything is clean + File[] recoveryFiles = temporaryFolder.getRoot().listFiles(); + assertEquals("Unclean state backend: " + Arrays.toString(recoveryFiles), 0, recoveryFiles.length); } finally { for (BlobServer s : server) { @@ -159,9 +163,5 @@ public void testRecoveryRegisterAndDownload() throws Exception { libCache.shutdown(); } } - - // Verify everything is clean - File[] recoveryFiles = temporaryFolder.getRoot().listFiles(); - assertEquals("Unclean state backend: " + Arrays.toString(recoveryFiles), 0, recoveryFiles.length); } }