Skip to content

Commit

Permalink
[FLINK-5666] [tests] Add blob server clean up tests
Browse files Browse the repository at this point in the history
Previously, deleting in HA mode was only tested with a local file system.
This verifies that the delete still works on HDFS.

This closes apache#3222.
  • Loading branch information
Nico Kruber authored and uce committed Jan 27, 2017
1 parent b5f870a commit 24db045
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@
import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.api.java.io.AvroOutputFormat;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.examples.java.wordcount.WordCount;
import org.apache.flink.runtime.blob.BlobRecoveryITCase;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.util.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
Expand Down Expand Up @@ -217,6 +221,21 @@ public void testDeletePathIfEmpty() throws IOException {
assertFalse(fs.exists(directory));
}

/**
* Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed JARs are recoverable from any
* participating BlobServer.
*/
@Test
public void testBlobServerRecovery() throws Exception {
org.apache.flink.configuration.Configuration
config = new org.apache.flink.configuration.Configuration();
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
config.setString(ConfigConstants.STATE_BACKEND, "ZOOKEEPER");
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI);

BlobRecoveryITCase.testBlobServerRecovery(config);
}

// package visible
static abstract class DopOneTestEnvironment extends ExecutionEnvironment {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,24 @@
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Random;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class BlobRecoveryITCase {

Expand All @@ -61,17 +67,23 @@ public void cleanUp() throws Exception {
*/
@Test
public void testBlobServerRecovery() throws Exception {
Configuration config = new Configuration();
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, recoveryDir.getPath());

testBlobServerRecovery(config);
}

public static void testBlobServerRecovery(final Configuration config) throws IOException {
String storagePath = config.getString(HighAvailabilityOptions.HA_STORAGE_PATH);
Random rand = new Random();

BlobServer[] server = new BlobServer[2];
InetSocketAddress[] serverAddress = new InetSocketAddress[2];
BlobClient client = null;

try {
Configuration config = new Configuration();
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, recoveryDir.getPath());

for (int i = 0; i < server.length; i++) {
server[i] = new BlobServer(config);
Expand All @@ -96,6 +108,11 @@ public void testBlobServerRecovery() throws Exception {
client.put(jobId[0], testKey[0], expected); // Request 3
client.put(jobId[1], testKey[1], expected, 32, 256); // Request 4

// check that the storage directory exists
final Path blobServerPath = new Path(storagePath, "blob");
FileSystem fs = blobServerPath.getFileSystem();
assertTrue("Unknown storage dir: " + blobServerPath, fs.exists(blobServerPath));

// Close the client and connect to the other server
client.close();
client = new BlobClient(serverAddress[1], config);
Expand Down Expand Up @@ -146,6 +163,17 @@ public void testBlobServerRecovery() throws Exception {
client.delete(keys[1]);
client.delete(jobId[0], testKey[0]);
client.delete(jobId[1], testKey[1]);

// Verify everything is clean
if (fs.exists(blobServerPath)) {
final org.apache.flink.core.fs.FileStatus[] recoveryFiles =
fs.listStatus(blobServerPath);
ArrayList<String> filenames = new ArrayList<String>(recoveryFiles.length);
for (org.apache.flink.core.fs.FileStatus file: recoveryFiles) {
filenames.add(file.toString());
}
fail("Unclean state backend: " + filenames);
}
}
finally {
for (BlobServer s : server) {
Expand All @@ -158,9 +186,5 @@ public void testBlobServerRecovery() throws Exception {
client.close();
}
}

// Verify everything is clean
File[] recoveryFiles = recoveryDir.listFiles();
assertEquals("Unclean state backend: " + Arrays.toString(recoveryFiles), 0, recoveryFiles.length);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ public void testRecoveryRegisterAndDownload() throws Exception {
client.delete(keys.get(0));
client.delete(keys.get(1));
}

// Verify everything is clean
File[] recoveryFiles = temporaryFolder.getRoot().listFiles();
assertEquals("Unclean state backend: " + Arrays.toString(recoveryFiles), 0, recoveryFiles.length);
}
finally {
for (BlobServer s : server) {
Expand All @@ -159,9 +163,5 @@ public void testRecoveryRegisterAndDownload() throws Exception {
libCache.shutdown();
}
}

// Verify everything is clean
File[] recoveryFiles = temporaryFolder.getRoot().listFiles();
assertEquals("Unclean state backend: " + Arrays.toString(recoveryFiles), 0, recoveryFiles.length);
}
}

0 comments on commit 24db045

Please sign in to comment.