Skip to content

Commit

Permalink
[FLINK-18301][e2e] Backup kafka logs on failure
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Jun 17, 2020
1 parent 0c4c461 commit acd0de8
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.tests.util.AutoClosableProcess;
import org.apache.flink.tests.util.CommandLineWrapper;
import org.apache.flink.tests.util.TestUtils;
import org.apache.flink.tests.util.activation.OperatingSystemRestriction;
import org.apache.flink.tests.util.cache.DownloadCache;
import org.apache.flink.util.OperatingSystem;
Expand All @@ -29,6 +30,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintStream;
Expand All @@ -45,6 +48,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
Expand Down Expand Up @@ -72,12 +76,15 @@ public class LocalStandaloneKafkaResource implements KafkaResource {
private final DownloadCache downloadCache = DownloadCache.get();
private final String kafkaVersion;
private Path kafkaDir;
@Nullable
private Path logBackupDirectory;

LocalStandaloneKafkaResource(final String kafkaVersion) {
LocalStandaloneKafkaResource(final String kafkaVersion, @Nullable Path logBackupDirectory) {
OperatingSystemRestriction.forbid(
String.format("The %s relies on UNIX utils and shell scripts.", getClass().getSimpleName()),
OperatingSystem.WINDOWS);
this.kafkaVersion = kafkaVersion;
this.logBackupDirectory = logBackupDirectory;
}

private static String getKafkaDownloadUrl(final String kafkaVersion) {
Expand Down Expand Up @@ -162,6 +169,20 @@ private void setupKafkaCluster() throws IOException {

@Override
public void afterTestSuccess() {
shutdownResource();
downloadCache.afterTestSuccess();
tmp.delete();
}

@Override
public void afterTestFailure() {
shutdownResource();
backupLogs();
downloadCache.afterTestFailure();
tmp.delete();
}

private void shutdownResource() {
try {
AutoClosableProcess.runBlocking(
kafkaDir.resolve(Paths.get("bin", "kafka-server-stop.sh")).toString()
Expand Down Expand Up @@ -192,8 +213,19 @@ public void afterTestSuccess() {
} catch (IOException ioe) {
LOG.warn("Error while shutting down zookeeper.", ioe);
}
downloadCache.afterTestSuccess();
tmp.delete();
}

private void backupLogs() {
if (logBackupDirectory != null) {
final Path targetDirectory = logBackupDirectory.resolve("kafka-" + UUID.randomUUID().toString());
try {
Files.createDirectories(targetDirectory);
TestUtils.copyDirectory(kafkaDir.resolve("logs"), targetDirectory);
LOG.info("Backed up logs to {}.", targetDirectory);
} catch (IOException e) {
LOG.warn("An error has occurred while backing up logs to {}.", targetDirectory, e);
}
}
}

private static boolean isZookeeperRunning(final Path kafkaDir) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,29 @@

package org.apache.flink.tests.util.kafka;

import org.apache.flink.tests.util.parameters.ParameterProperty;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Optional;

/**
* A {@link KafkaResourceFactory} for the {@link LocalStandaloneKafkaResourceFactory}.
*/
public final class LocalStandaloneKafkaResourceFactory implements KafkaResourceFactory {
private static final Logger LOG = LoggerFactory.getLogger(LocalStandaloneKafkaResourceFactory.class);

private static final ParameterProperty<Path> DISTRIBUTION_LOG_BACKUP_DIRECTORY = new ParameterProperty<>("logBackupDir", Paths::get);

@Override
public KafkaResource create(final String kafkaVersion) {
return new LocalStandaloneKafkaResource(kafkaVersion);
Optional<Path> logBackupDirectory = DISTRIBUTION_LOG_BACKUP_DIRECTORY.get();
if (!logBackupDirectory.isPresent()) {
LOG.warn("Property {} not set, logs will not be backed up in case of test failures.", DISTRIBUTION_LOG_BACKUP_DIRECTORY.getPropertyName());
}
return new LocalStandaloneKafkaResource(kafkaVersion, logBackupDirectory.orElse(null));
}
}

0 comments on commit acd0de8

Please sign in to comment.