Skip to content

Commit

Permalink
handle destination and source failures equally + fix main process fai…
Browse files Browse the repository at this point in the history
…lure handling (airbytehq#8762)

* handle destination and source failures equally

* fix typo

* fix tests

* merge with master

* add log

* move exit value check to separate method

* fix test

* get rid of throws WorkerException

* get rid of throws WorkerException in impl as well

* get rid of throws WorkerException in test

* remove import

* merge with master

* add trap for heartbeat exit and integration test

* format

* update integration test

* update comment

* fix test

* fix test again
  • Loading branch information
lmossman authored Dec 20, 2021
1 parent 9dfd0da commit 635fc68
Show file tree
Hide file tree
Showing 13 changed files with 205 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ public boolean isFinished() {
return isClosed;
}

@Override
public int getExitValue() {
return 0;
}

@Override
public Optional<AirbyteMessage> attemptRead() {
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
import java.nio.file.Path;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -119,26 +119,22 @@ public ReplicationOutput run(final StandardSyncInput syncInput, final Path jobRo
destination.start(destinationConfig, jobRoot);
source.start(sourceConfig, jobRoot);

final Future<?> destinationOutputThreadFuture = executors.submit(getDestinationOutputRunnable(
destination,
cancelled,
destinationMessageTracker,
mdc));

final Future<?> replicationThreadFuture = executors.submit(getReplicationRunnable(
source,
destination,
cancelled,
mapper,
sourceMessageTracker,
mdc));

LOGGER.info("Waiting for source thread to join.");
replicationThreadFuture.get();
LOGGER.info("Source thread complete.");
LOGGER.info("Waiting for destination thread to join.");
destinationOutputThreadFuture.get();
LOGGER.info("Destination thread complete.");
final CompletableFuture<?> destinationOutputThreadFuture = CompletableFuture.runAsync(
getDestinationOutputRunnable(destination, cancelled, destinationMessageTracker, mdc),
executors);

final CompletableFuture<?> replicationThreadFuture = CompletableFuture.runAsync(
getReplicationRunnable(source, destination, cancelled, mapper, sourceMessageTracker, mdc),
executors);

LOGGER.info("Waiting for source and destination threads to complete.");
// CompletableFuture#allOf waits until all futures finish before returning, even if one throws an
// exception. So in order to handle exceptions from a future immediately without needing to wait for
// the other future to finish, we first call CompletableFuture#anyOf.
CompletableFuture.anyOf(replicationThreadFuture, destinationOutputThreadFuture).get();
LOGGER.info("One of source or destination thread complete. Waiting on the other.");
CompletableFuture.allOf(replicationThreadFuture, destinationOutputThreadFuture).get();
LOGGER.info("Source and destination threads complete.");

} catch (final Exception e) {
hasFailed.set(true);
Expand Down Expand Up @@ -222,6 +218,9 @@ private static Runnable getReplicationRunnable(final AirbyteSource source,
}
}
destination.notifyEndOfStream();
if (!cancelled.get() && source.getExitValue() != 0) {
throw new RuntimeException("Source process exited with non-zero exit code " + source.getExitValue());
}
} catch (final Exception e) {
if (!cancelled.get()) {
// Although this thread is closed first, it races with the source's closure and can attempt one
Expand Down Expand Up @@ -249,6 +248,9 @@ private static Runnable getDestinationOutputRunnable(final AirbyteDestination de
destinationMessageTracker.accept(messageOptional.get());
}
}
if (!cancelled.get() && destination.getExitValue() != 0) {
throw new RuntimeException("Destination process exited with non-zero exit code " + destination.getExitValue());
}
} catch (final Exception e) {
if (!cancelled.get()) {
// Although this thread is closed first, it races with the destination's closure and can attempt one
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.workers.process;

import com.google.common.collect.MoreCollectors;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.config.ResourceRequirements;
Expand All @@ -12,6 +13,7 @@
import io.fabric8.kubernetes.api.model.ContainerBuilder;
import io.fabric8.kubernetes.api.model.ContainerPort;
import io.fabric8.kubernetes.api.model.ContainerPortBuilder;
import io.fabric8.kubernetes.api.model.ContainerStatus;
import io.fabric8.kubernetes.api.model.DeletionPropagation;
import io.fabric8.kubernetes.api.model.LocalObjectReference;
import io.fabric8.kubernetes.api.model.Pod;
Expand Down Expand Up @@ -174,7 +176,7 @@ private static Container getMain(final String image,
final String[] args)
throws IOException {
final var argsStr = String.join(" ", args);
final var optionalStdin = usesStdin ? String.format("cat %s | ", STDIN_PIPE_FILE) : "";
final var optionalStdin = usesStdin ? String.format("< %s", STDIN_PIPE_FILE) : "";
final var entrypointOverrideValue = entrypointOverride == null ? "" : StringEscapeUtils.escapeXSI(entrypointOverride);

// communicates its completion to the heartbeat check via a file and closes itself if the heartbeat
Expand Down Expand Up @@ -568,10 +570,15 @@ private void close() {

private boolean isTerminal(final Pod pod) {
if (pod.getStatus() != null) {
return pod.getStatus()
// Check if "main" container has terminated, as that defines whether the parent process has
// terminated.
final ContainerStatus mainContainerStatus = pod.getStatus()
.getContainerStatuses()
.stream()
.anyMatch(e -> e.getState() != null && e.getState().getTerminated() != null);
.filter(containerStatus -> containerStatus.getName().equals("main"))
.collect(MoreCollectors.onlyElement());

return mainContainerStatus.getState() != null && mainContainerStatus.getState().getTerminated() != null;
} else {
return false;
}
Expand Down Expand Up @@ -609,14 +616,16 @@ private int getReturnCode(final Pod pod) {
throw new IllegalThreadStateException("Kube pod process has not exited yet.");
}

returnCode = refreshedPod.getStatus().getContainerStatuses()
final ContainerStatus mainContainerStatus = refreshedPod.getStatus().getContainerStatuses()
.stream()
.filter(containerStatus -> containerStatus.getState() != null && containerStatus.getState().getTerminated() != null)
.map(containerStatus -> {
return containerStatus.getState().getTerminated().getExitCode();
})
.reduce(Integer::sum)
.orElseThrow();
.filter(containerStatus -> containerStatus.getName().equals("main"))
.collect(MoreCollectors.onlyElement());

if (mainContainerStatus.getState() == null || mainContainerStatus.getState().getTerminated() == null) {
throw new IllegalThreadStateException("Main container in kube pod has not terminated yet.");
}

returnCode = mainContainerStatus.getState().getTerminated().getExitCode();

LOGGER.info("Exit code for pod {} is {}", name, returnCode);
return returnCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ public interface AirbyteDestination extends CheckedConsumer<AirbyteMessage, Exce
*/
boolean isFinished();

/**
* Gets the exit value of the destination process. This should only be called after the destination
* process has finished.
*
* @return exit code of the destination process
* @throws IllegalStateException if the destination process has not exited
*/
int getExitValue();

/**
* Attempts to read an AirbyteMessage from the Destination.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ public interface AirbyteSource extends AutoCloseable {
*/
boolean isFinished();

/**
* Gets the exit value of the source process. This should only be called after the source process
* has finished.
*
* @return exit code of the source process
* @throws IllegalStateException if the source process has not exited
*/
int getExitValue();

/**
* Attempts to read an AirbyteMessage from the Source.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class DefaultAirbyteDestination implements AirbyteDestination {
private Process destinationProcess = null;
private BufferedWriter writer = null;
private Iterator<AirbyteMessage> messageIterator = null;
private Integer exitValue = null;

public DefaultAirbyteDestination(final WorkerConfigs workerConfigs, final IntegrationLauncher integrationLauncher) {
this(workerConfigs, integrationLauncher, new DefaultAirbyteStreamFactory(CONTAINER_LOG_MDC_BUILDER));
Expand Down Expand Up @@ -112,9 +113,9 @@ public void close() throws Exception {

LOGGER.debug("Closing destination process");
WorkerUtils.gentleClose(workerConfigs, destinationProcess, 1, TimeUnit.MINUTES);
if (destinationProcess.isAlive() || destinationProcess.exitValue() != 0) {
if (destinationProcess.isAlive() || getExitValue() != 0) {
final String message =
destinationProcess.isAlive() ? "Destination has not terminated " : "Destination process exit with code " + destinationProcess.exitValue();
destinationProcess.isAlive() ? "Destination has not terminated " : "Destination process exit with code " + getExitValue();
throw new WorkerException(message + ". This warning is normal if the job was cancelled.");
}
}
Expand Down Expand Up @@ -145,6 +146,18 @@ public boolean isFinished() {
return !destinationProcess.isAlive();
}

@Override
public int getExitValue() {
Preconditions.checkState(destinationProcess != null, "Destination process is null, cannot retrieve exit value.");
Preconditions.checkState(!destinationProcess.isAlive(), "Destination process is still alive, cannot retrieve exit value.");

if (exitValue == null) {
exitValue = destinationProcess.exitValue();
}

return exitValue;
}

@Override
public Optional<AirbyteMessage> attemptRead() {
Preconditions.checkState(destinationProcess != null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class DefaultAirbyteSource implements AirbyteSource {

private Process sourceProcess = null;
private Iterator<AirbyteMessage> messageIterator = null;
private Integer exitValue = null;

public DefaultAirbyteSource(final WorkerConfigs workerConfigs, final IntegrationLauncher integrationLauncher) {
this(workerConfigs, integrationLauncher, new DefaultAirbyteStreamFactory(CONTAINER_LOG_MDC_BUILDER),
Expand Down Expand Up @@ -97,6 +98,18 @@ public boolean isFinished() {
return !sourceProcess.isAlive() && !messageIterator.hasNext();
}

@Override
public int getExitValue() throws IllegalStateException {
Preconditions.checkState(sourceProcess != null, "Source process is null, cannot retrieve exit value.");
Preconditions.checkState(!sourceProcess.isAlive(), "Source process is still alive, cannot retrieve exit value.");

if (exitValue == null) {
exitValue = sourceProcess.exitValue();
}

return exitValue;
}

@Override
public Optional<AirbyteMessage> attemptRead() {
Preconditions.checkState(sourceProcess != null);
Expand All @@ -118,8 +131,8 @@ public void close() throws Exception {
GRACEFUL_SHUTDOWN_DURATION.toMillis(),
TimeUnit.MILLISECONDS);

if (sourceProcess.isAlive() || sourceProcess.exitValue() != 0) {
final String message = sourceProcess.isAlive() ? "Source has not terminated " : "Source process exit with code " + sourceProcess.exitValue();
if (sourceProcess.isAlive() || getExitValue() != 0) {
final String message = sourceProcess.isAlive() ? "Source has not terminated " : "Source process exit with code " + getExitValue();
throw new WorkerException(message + ". This warning is normal if the job was cancelled.");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ public boolean isFinished() {
return hasEmittedState.get();
}

@Override
public int getExitValue() {
return 0;
}

@Override
public Optional<AirbyteMessage> attemptRead() {
if (!hasEmittedState.get()) {
Expand Down
28 changes: 25 additions & 3 deletions airbyte-workers/src/main/resources/entrypoints/main.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
trap "touch TERMINATION_FILE_MAIN" EXIT
trap "echo 'received ABRT'; exit 1;" ABRT

ENTRYPOINT_OVERRIDE=ENTRYPOINT_OVERRIDE_VALUE

Expand All @@ -12,8 +13,29 @@ else
echo "Using existing AIRBYTE_ENTRYPOINT: $AIRBYTE_ENTRYPOINT"
fi

(OPTIONAL_STDIN (eval "$AIRBYTE_ENTRYPOINT ARGS" 2> STDERR_PIPE_FILE > STDOUT_PIPE_FILE)) &
((eval "$AIRBYTE_ENTRYPOINT ARGS" 2> STDERR_PIPE_FILE > STDOUT_PIPE_FILE) OPTIONAL_STDIN) &
CHILD_PID=$!
(while true; do if [ -f TERMINATION_FILE_CHECK ]; then echo "Heartbeat to worker failed, exiting..."; exit 1; fi; sleep 1; done) &
# Check for TERMINATION_FILE_CHECK in a loop to handle heartbeat failure
(
# must use $$$$ instead of $$ because kube entrypoint transforms $$ into $
# see https://kubernetes.io/docs/reference/kubernetes-api/workload-resources/pod-v1/#entrypoint for explanation
PARENT_PID=$$$$
echo "PARENT_PID: ${PARENT_PID}"
while true
do
if [ -f TERMINATION_FILE_CHECK ]
then
echo "Heartbeat to worker failed, exiting..."
kill -s ABRT ${PARENT_PID}
exit 0
fi
sleep 1
done
) &
echo "Waiting on CHILD_PID $CHILD_PID"
wait $CHILD_PID
exit $?
EXIT_STATUS=$?
echo "EXIT_STATUS: $EXIT_STATUS"
exit $EXIT_STATUS
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,22 @@ public void testKillingWithoutHeartbeat() throws Exception {
assertNotEquals(0, process.exitValue());
}

@Test
public void testExitValueWaitsForMainToTerminate() throws Exception {
// start a long running main process
final Process process = getProcess("sleep 2; exit 13;");

// immediately close streams
process.getInputStream().close();
process.getOutputStream().close();

// waiting for process
process.waitFor();

// the pod exit code should match the main container exit value
assertEquals(13, process.exitValue());
}

private static String getRandomFile(final int lines) {
final var sb = new StringBuilder();
for (int i = 0; i < lines; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,40 @@ void test() throws Exception {
verify(destination).close();
}

@Test
void testSourceNonZeroExitValue() throws Exception {
when(source.getExitValue()).thenReturn(1);

final ReplicationWorker worker = new DefaultReplicationWorker(
JOB_ID,
JOB_ATTEMPT,
source,
mapper,
destination,
sourceMessageTracker,
destinationMessageTracker);

final ReplicationOutput output = worker.run(syncInput, jobRoot);
assertEquals(ReplicationStatus.FAILED, output.getReplicationAttemptSummary().getStatus());
}

@Test
void testDestinationNonZeroExitValue() throws Exception {
when(destination.getExitValue()).thenReturn(1);

final ReplicationWorker worker = new DefaultReplicationWorker(
JOB_ID,
JOB_ATTEMPT,
source,
mapper,
destination,
sourceMessageTracker,
destinationMessageTracker);

final ReplicationOutput output = worker.run(syncInput, jobRoot);
assertEquals(ReplicationStatus.FAILED, output.getReplicationAttemptSummary().getStatus());
}

@Test
void testLoggingInThreads() throws IOException, WorkerException {
// set up the mdc so that actually log to a file, so that we can verify that file logging captures
Expand Down
Loading

0 comments on commit 635fc68

Please sign in to comment.