Skip to content

Commit

Permalink
kube zombie handling (airbytehq#4137)
Browse files Browse the repository at this point in the history
* working except for too much logging and bad success case

* succeeds on passing case

* completes successfully

* just doesn't kill the main

* working zombie killing

* cleanup

* more cleanup

* use correct path

* fmt

* cleanups, bugfixes, integration tests

* run worker integration tests as part of ci

* delete tester class

* fix hanging checkpoint container problem

* fix name of command

* replace todo with clarifying comment
  • Loading branch information
jrhizor authored Jun 18, 2021
1 parent 991cb68 commit ef85315
Show file tree
Hide file tree
Showing 13 changed files with 439 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,16 @@
import io.airbyte.workers.process.DockerProcessFactory;
import io.airbyte.workers.process.KubeProcessFactory;
import io.airbyte.workers.process.ProcessFactory;
import io.airbyte.workers.process.WorkerHeartbeatServer;
import io.airbyte.workers.temporal.TemporalClient;
import io.airbyte.workers.temporal.TemporalPool;
import io.airbyte.workers.temporal.TemporalUtils;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
Expand Down Expand Up @@ -84,6 +87,7 @@ public class SchedulerApp {
private static final Duration SCHEDULING_DELAY = Duration.ofSeconds(5);
private static final Duration CLEANING_DELAY = Duration.ofHours(2);
private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("worker-%d").build();
private static final int KUBE_HEARTBEAT_PORT = 9000;

private final Path workspaceRoot;
private final ProcessFactory processFactory;
Expand Down Expand Up @@ -163,11 +167,13 @@ private void cleanupZombies(JobPersistence jobPersistence, JobNotifier jobNotifi
}
}

private static ProcessFactory getProcessBuilderFactory(Configs configs) {
private static ProcessFactory getProcessBuilderFactory(Configs configs) throws UnknownHostException {
if (configs.getWorkerEnvironment() == Configs.WorkerEnvironment.KUBERNETES) {
final KubernetesClient kubeClient = new DefaultKubernetesClient();
final BlockingQueue<Integer> ports = new LinkedBlockingDeque<>(configs.getTemporalWorkerPorts());
return new KubeProcessFactory("default", kubeClient, ports);
final BlockingQueue<Integer> workerPorts = new LinkedBlockingDeque<>(configs.getTemporalWorkerPorts());
final String localIp = InetAddress.getLocalHost().getHostAddress();
final String kubeHeartbeatUrl = localIp + ":" + KUBE_HEARTBEAT_PORT;
return new KubeProcessFactory("default", kubeClient, kubeHeartbeatUrl, workerPorts);
} else {
return new DockerProcessFactory(
configs.getWorkspaceRoot(),
Expand Down Expand Up @@ -209,6 +215,19 @@ public static void main(String[] args) throws IOException, InterruptedException
jobPersistence);
final JobNotifier jobNotifier = new JobNotifier(configs.getWebappUrl(), configRepository);

if (configs.getWorkerEnvironment() == Configs.WorkerEnvironment.KUBERNETES) {
Map<String, String> mdc = MDC.getCopyOfContextMap();
Executors.newSingleThreadExecutor().submit(
() -> {
MDC.setContextMap(mdc);
try {
new WorkerHeartbeatServer(KUBE_HEARTBEAT_PORT).start();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}

TrackingClientSingleton.initialize(
configs.getTrackingStrategy(),
configs.getAirbyteRole(),
Expand Down
5 changes: 5 additions & 0 deletions airbyte-workers/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import org.jsonschema2pojo.SourceType
plugins {
id 'java-library'
id 'com.github.eirnym.js2p' version '1.0'
id 'airbyte-integration-test-java'
}

configurations {
Expand All @@ -17,6 +18,8 @@ dependencies {
implementation 'io.temporal:temporal-sdk:1.0.4'
implementation 'org.apache.ant:ant:1.10.10'
implementation 'org.apache.commons:commons-lang3:3.11'
implementation 'org.eclipse.jetty:jetty-server:9.4.31.v20200723'
implementation 'org.eclipse.jetty:jetty-servlet:9.4.31.v20200723'

implementation project(':airbyte-config:models')
implementation project(':airbyte-db')
Expand All @@ -29,6 +32,8 @@ dependencies {
testImplementation 'org.postgresql:postgresql:42.2.18'

testImplementation project(':airbyte-commons-docker')

integrationTestJavaImplementation project(':airbyte-workers')
}

jsonSchema2Pojo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import io.airbyte.commons.io.IOs;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.string.Strings;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ContainerBuilder;
Expand All @@ -37,6 +38,7 @@
import io.fabric8.kubernetes.api.model.VolumeMount;
import io.fabric8.kubernetes.api.model.VolumeMountBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.internal.readiness.Readiness;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand All @@ -48,6 +50,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -78,7 +81,10 @@
* specified stdin socket.</li>
* <li>6) The child process is able to access configuration data via the shared volume. It's inputs
* and outputs - stdin, stdout and stderr - are forwarded the parent process via the sidecars.</li>
*
* <li>7) The main process has its entrypoint wrapped to perform IO redirection and better error
* handling.</li>
* <li>8) A heartbeat sidecar checks if the worker that launched the pod is still alive. If not, the
* pod will fail.</li>
*
* See the constructor for more information.
*/
Expand All @@ -95,6 +101,9 @@ public class KubePodProcess extends Process {
private static final String STDOUT_PIPE_FILE = PIPES_DIR + "/stdout";
private static final String STDERR_PIPE_FILE = PIPES_DIR + "/stderr";
private static final String CONFIG_DIR = "/config";
private static final String TERMINATION_DIR = "/termination";
private static final String TERMINATION_FILE_MAIN = TERMINATION_DIR + "/main";
private static final String TERMINATION_FILE_CHECK = TERMINATION_DIR + "/check";
private static final String SUCCESS_FILE_NAME = "FINISHED_UPLOADING";

// 143 is the typical SIGTERM exit code.
Expand Down Expand Up @@ -188,19 +197,26 @@ private static Container getInit(boolean usesStdin, List<VolumeMount> mainVolume
.build();
}

private static Container getMain(String image, boolean usesStdin, String entrypoint, List<VolumeMount> mainVolumeMounts, String[] args) {
private static Container getMain(String image, boolean usesStdin, String entrypoint, List<VolumeMount> mainVolumeMounts, String[] args)
throws IOException {
var argsStr = String.join(" ", args);
var entrypointStr = entrypoint + " " + argsStr + " ";

var entrypointStrWithPipes = entrypointStr + String.format(" 2> %s > %s", STDERR_PIPE_FILE, STDOUT_PIPE_FILE);
if (usesStdin) {
entrypointStrWithPipes = String.format("cat %s | ", STDIN_PIPE_FILE) + entrypointStrWithPipes;
}
var entrypointWithArgs = entrypoint + " " + argsStr;
var optionalStdin = usesStdin ? String.format("cat %s | ", STDIN_PIPE_FILE) : "";

// communicates its completion to the heartbeat check via a file and closes itself if the heartbeat
// fails
var mainCommand = MoreResources.readResource("entrypoints/main.sh")
.replaceAll("TERMINATION_FILE_CHECK", TERMINATION_FILE_CHECK)
.replaceAll("TERMINATION_FILE_MAIN", TERMINATION_FILE_MAIN)
.replaceAll("OPTIONAL_STDIN", optionalStdin)
.replaceAll("ENTRYPOINT", entrypointWithArgs)
.replaceAll("STDERR_PIPE_FILE", STDERR_PIPE_FILE)
.replaceAll("STDOUT_PIPE_FILE", STDOUT_PIPE_FILE);

return new ContainerBuilder()
.withName("main")
.withImage(image)
.withCommand("sh", "-c", entrypointStrWithPipes)
.withCommand("sh", "-c", mainCommand)
.withWorkingDir(CONFIG_DIR)
.withVolumeMounts(mainVolumeMounts)
.build();
Expand Down Expand Up @@ -251,6 +267,7 @@ public KubePodProcess(KubernetesClient client,
String image,
int stdoutLocalPort,
int stderrLocalPort,
String kubeHeartbeatUrl,
boolean usesStdin,
final Map<String, String> files,
final String entrypointOverride,
Expand Down Expand Up @@ -291,36 +308,61 @@ public KubePodProcess(KubernetesClient client,
.withMountPath(CONFIG_DIR)
.build();

var volumes = List.of(pipeVolume, configVolume);
var mainVolumeMounts = List.of(pipeVolumeMount, configVolumeMount);
Volume terminationVolume = new VolumeBuilder()
.withName("airbyte-termination")
.withNewEmptyDir()
.endEmptyDir()
.build();

VolumeMount terminationVolumeMount = new VolumeMountBuilder()
.withName("airbyte-termination")
.withMountPath(TERMINATION_DIR)
.build();

Container init = getInit(usesStdin, mainVolumeMounts);
Container main = getMain(image, usesStdin, entrypoint, mainVolumeMounts, args);
Container init = getInit(usesStdin, List.of(pipeVolumeMount, configVolumeMount));
Container main = getMain(image, usesStdin, entrypoint, List.of(pipeVolumeMount, configVolumeMount, terminationVolumeMount), args);

Container remoteStdin = new ContainerBuilder()
.withName("remote-stdin")
.withImage("alpine/socat:1.7.4.1-r1")
.withCommand("sh", "-c", "socat -d -d -d TCP-L:9001 STDOUT > " + STDIN_PIPE_FILE)
.withVolumeMounts(pipeVolumeMount)
.withVolumeMounts(pipeVolumeMount, terminationVolumeMount)
.build();

var localIp = InetAddress.getLocalHost().getHostAddress();
Container relayStdout = new ContainerBuilder()
.withName("relay-stdout")
.withImage("alpine/socat:1.7.4.1-r1")
.withCommand("sh", "-c", String.format("cat %s | socat -d -d -d - TCP:%s:%s", STDOUT_PIPE_FILE, localIp, stdoutLocalPort))
.withVolumeMounts(pipeVolumeMount)
.withVolumeMounts(pipeVolumeMount, terminationVolumeMount)
.build();

Container relayStderr = new ContainerBuilder()
.withName("relay-stderr")
.withImage("alpine/socat:1.7.4.1-r1")
.withCommand("sh", "-c", String.format("cat %s | socat -d -d -d - TCP:%s:%s", STDERR_PIPE_FILE, localIp, stderrLocalPort))
.withVolumeMounts(pipeVolumeMount)
.withVolumeMounts(pipeVolumeMount, terminationVolumeMount)
.build();

List<Container> containers = usesStdin ? List.of(main, remoteStdin, relayStdout, relayStderr) : List.of(main, relayStdout, relayStderr);
// communicates via a file if it isn't able to reach the heartbeating server and succeeds if the
// main container completes
final String heartbeatCommand = MoreResources.readResource("entrypoints/check.sh")
.replaceAll("TERMINATION_FILE_CHECK", TERMINATION_FILE_CHECK)
.replaceAll("TERMINATION_FILE_MAIN", TERMINATION_FILE_MAIN)
.replaceAll("HEARTBEAT_URL", kubeHeartbeatUrl);

Container callHeartbeatServer = new ContainerBuilder()
.withName("call-heartbeat-server")
.withImage("curlimages/curl:7.77.0")
.withCommand("sh")
.withArgs("-c", heartbeatCommand)
.withVolumeMounts(terminationVolumeMount)
.build();

Pod pod = new PodBuilder()
List<Container> containers = usesStdin ? List.of(main, remoteStdin, relayStdout, relayStderr, callHeartbeatServer)
: List.of(main, relayStdout, relayStderr, callHeartbeatServer);

final Pod pod = new PodBuilder()
.withApiVersion("v1")
.withNewMetadata()
.withName(podName)
Expand All @@ -329,12 +371,13 @@ public KubePodProcess(KubernetesClient client,
.withRestartPolicy("Never")
.withInitContainers(init)
.withContainers(containers)
.withVolumes(volumes)
.withVolumes(pipeVolume, configVolume, terminationVolume)
.endSpec()
.build();

LOGGER.info("Creating pod...");
this.podDefinition = client.pods().inNamespace(namespace).createOrReplace(pod);

waitForInitPodToRun(client, podDefinition);

LOGGER.info("Copying files...");
Expand All @@ -346,7 +389,17 @@ public KubePodProcess(KubernetesClient client,
copyFilesToKubeConfigVolume(client, podName, namespace, filesWithSuccess);

LOGGER.info("Waiting until pod is ready...");
client.resource(podDefinition).waitUntilReady(30, TimeUnit.MINUTES);
// If a pod gets into a non-terminal error state it should be automatically killed by our
// heartbeating mechanism.
// This also handles the case where a very short pod already completes before this check completes
// the first time.
// This doesn't manage things like pods that are blocked from running for some cluster reason or if
// the init
// container got stuck somehow.
client.resource(podDefinition).waitUntilCondition(p -> {
boolean isReady = Objects.nonNull(p) && Readiness.getInstance().isReady(p);
return isReady || isTerminal(p);
}, 10, TimeUnit.DAYS);

// allow writing stdin to pod
LOGGER.info("Reading pod IP...");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@
import io.airbyte.workers.WorkerException;
import io.fabric8.kubernetes.client.KubernetesClient;
import java.nio.file.Path;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.function.Consumer;
import org.apache.commons.lang3.RandomStringUtils;
Expand All @@ -42,13 +40,21 @@ public class KubeProcessFactory implements ProcessFactory {

private final String namespace;
private final KubernetesClient kubeClient;
private final BlockingQueue<Integer> ports;
private final Set<Integer> claimedPorts = new HashSet<>();
private final String kubeHeartbeatUrl;
private final BlockingQueue<Integer> workerPorts;

public KubeProcessFactory(String namespace, KubernetesClient kubeClient, BlockingQueue<Integer> ports) {
/**
* @param namespace kubernetes namespace where spawned pods will live
* @param kubeClient kubernetes client
* @param kubeHeartbeatUrl a url where if the response is not 200 the spawned process will fail
* itself
* @param workerPorts a set of ports that can be used for IO socket servers
*/
public KubeProcessFactory(String namespace, KubernetesClient kubeClient, String kubeHeartbeatUrl, BlockingQueue<Integer> workerPorts) {
this.namespace = namespace;
this.kubeClient = kubeClient;
this.ports = ports;
this.kubeHeartbeatUrl = kubeHeartbeatUrl;
this.workerPorts = workerPorts;
}

@Override
Expand All @@ -66,17 +72,15 @@ public Process create(String jobId,
final String suffix = RandomStringUtils.randomAlphabetic(5).toLowerCase();
final String podName = "airbyte-worker-" + jobId + "-" + attempt + "-" + suffix;

final int stdoutLocalPort = ports.take();
claimedPorts.add(stdoutLocalPort);
final int stdoutLocalPort = workerPorts.take();
LOGGER.info("stdoutLocalPort = " + stdoutLocalPort);

final int stderrLocalPort = ports.take();
claimedPorts.add(stderrLocalPort);
final int stderrLocalPort = workerPorts.take();
LOGGER.info("stderrLocalPort = " + stderrLocalPort);

final Consumer<Integer> portReleaser = port -> {
if (!ports.contains(port)) {
ports.add(port);
if (!workerPorts.contains(port)) {
workerPorts.add(port);
LOGGER.info("Port consumer releasing: " + port);
} else {
LOGGER.info("Port consumer skipping releasing: " + port);
Expand All @@ -91,6 +95,7 @@ public Process create(String jobId,
imageName,
stdoutLocalPort,
stderrLocalPort,
kubeHeartbeatUrl,
usesStdin,
files,
entrypoint,
Expand Down
Loading

0 comments on commit ef85315

Please sign in to comment.