Skip to content

Commit

Permalink
Merge pull request apache#6265: [BEAM-4130] Bring up Job Server conta…
Browse files Browse the repository at this point in the history
…iner for Python jobs
  • Loading branch information
tweise authored Aug 24, 2018
2 parents f9d3f55 + 5b3b9e0 commit 96d2887
Show file tree
Hide file tree
Showing 11 changed files with 224 additions and 32 deletions.
2 changes: 1 addition & 1 deletion runners/flink/job-server-container/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
FROM openjdk:8
MAINTAINER "Apache Beam <[email protected]>"

ADD target/beam-runners-flink_2.11-job-server.jar /opt/apache/beam/jars/
ADD target/beam-runners-flink-job-server.jar /opt/apache/beam/jars/
ADD target/flink-job-server.sh /opt/apache/beam/

WORKDIR /opt/apache/beam
Expand Down
2 changes: 1 addition & 1 deletion runners/flink/job-server-container/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ dependencies {
task copyDockerfileDependencies(type: Copy) {
// Required Jars
from configurations.dockerDependency
rename 'beam-runners-flink_2.11-job-server.*.jar', 'beam-runners-flink_2.11-job-server.jar'
rename 'beam-runners-flink_2.11-job-server.*.jar', 'beam-runners-flink-job-server.jar'
into "build/target"
// Entry script
from file("./flink-job-server.sh")
Expand Down
2 changes: 1 addition & 1 deletion runners/flink/job-server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ dependencies {
// task will not work because the flink runner classes only exist in the shadow
// jar.
runShadow {
def jobHost = project.hasProperty("jobHost") ? project.property("jobHost") : "localhost:8099"
def jobHost = project.hasProperty("jobHost") ? project.property("jobHost") : "localhost"
def artifactsDir = project.hasProperty("artifactsDir") ? project.property("artifactsDir") : "/tmp/flink-artifacts"
def cleanArtifactsPerJob = project.hasProperty("cleanArtifactsPerJob")
args = ["--job-host=${jobHost}", "--artifacts-dir=${artifactsDir}"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,22 @@ public class FlinkJobServerDriver implements Runnable {

private final ListeningExecutorService executor;
private final ServerConfiguration configuration;
private final ServerFactory serverFactory;
private final ServerFactory jobServerFactory;
private final ServerFactory artifactServerFactory;
private GrpcFnServer<InMemoryJobService> jobServer;
private GrpcFnServer<BeamFileSystemArtifactStagingService> artifactStagingServer;

/** Configuration for the jobServer. */
public static class ServerConfiguration {
@Option(name = "--job-host", usage = "The job server host string")
@Option(name = "--job-host", usage = "The job server host name")
private String host = "";

@Option(name = "--job-port", usage = "The job service port. (Default: 8099)")
private int port = 8099;

@Option(name = "--artifact-port", usage = "The artifact service port. (Default: 8098)")
private int artifactPort = 8098;

@Option(name = "--artifacts-dir", usage = "The location to store staged artifact files")
private String artifactStagingPath = "/tmp/beam-artifact-staging";

Expand Down Expand Up @@ -100,24 +107,30 @@ public static FlinkJobServerDriver fromConfig(ServerConfiguration configuration)
new ThreadFactoryBuilder().setNameFormat("flink-runner-job-server").setDaemon(true).build();
ListeningExecutorService executor =
MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(threadFactory));
ServerFactory serverFactory = ServerFactory.createDefault();
return create(configuration, executor, serverFactory);
ServerFactory jobServerFactory = ServerFactory.createWithPortSupplier(() -> configuration.port);
ServerFactory artifactServerFactory =
ServerFactory.createWithPortSupplier(() -> configuration.artifactPort);
return create(configuration, executor, jobServerFactory, artifactServerFactory);
}

public static FlinkJobServerDriver create(
ServerConfiguration configuration,
ListeningExecutorService executor,
ServerFactory serverFactory) {
return new FlinkJobServerDriver(configuration, executor, serverFactory);
ServerFactory jobServerFactory,
ServerFactory artifactServerFactory) {
return new FlinkJobServerDriver(
configuration, executor, jobServerFactory, artifactServerFactory);
}

private FlinkJobServerDriver(
ServerConfiguration configuration,
ListeningExecutorService executor,
ServerFactory serverFactory) {
ServerFactory jobServerFactory,
ServerFactory artifactServerFactory) {
this.configuration = configuration;
this.executor = executor;
this.serverFactory = serverFactory;
this.jobServerFactory = jobServerFactory;
this.artifactServerFactory = jobServerFactory;
}

@Override
Expand Down Expand Up @@ -166,11 +179,13 @@ private GrpcFnServer<InMemoryJobService> createJobServer() throws IOException {
InMemoryJobService service = createJobService();
GrpcFnServer<InMemoryJobService> jobServiceGrpcFnServer;
if (Strings.isNullOrEmpty(configuration.host)) {
jobServiceGrpcFnServer = GrpcFnServer.allocatePortAndCreateFor(service, serverFactory);
jobServiceGrpcFnServer = GrpcFnServer.allocatePortAndCreateFor(service, jobServerFactory);
} else {
Endpoints.ApiServiceDescriptor descriptor =
Endpoints.ApiServiceDescriptor.newBuilder().setUrl(configuration.host).build();
jobServiceGrpcFnServer = GrpcFnServer.create(service, descriptor, serverFactory);
Endpoints.ApiServiceDescriptor.newBuilder()
.setUrl(configuration.host + ":" + configuration.port)
.build();
jobServiceGrpcFnServer = GrpcFnServer.create(service, descriptor, jobServerFactory);
}
LOG.info("JobServer started on {}", jobServiceGrpcFnServer.getApiServiceDescriptor().getUrl());
return jobServiceGrpcFnServer;
Expand Down Expand Up @@ -200,8 +215,17 @@ private InMemoryJobService createJobService() throws IOException {
private GrpcFnServer<BeamFileSystemArtifactStagingService> createArtifactStagingService()
throws IOException {
BeamFileSystemArtifactStagingService service = new BeamFileSystemArtifactStagingService();
GrpcFnServer<BeamFileSystemArtifactStagingService> artifactStagingService =
GrpcFnServer.allocatePortAndCreateFor(service, serverFactory);
final GrpcFnServer<BeamFileSystemArtifactStagingService> artifactStagingService;
if (Strings.isNullOrEmpty(configuration.host)) {
artifactStagingService =
GrpcFnServer.allocatePortAndCreateFor(service, artifactServerFactory);
} else {
Endpoints.ApiServiceDescriptor descriptor =
Endpoints.ApiServiceDescriptor.newBuilder()
.setUrl(configuration.host + ":" + configuration.artifactPort)
.build();
artifactStagingService = GrpcFnServer.create(service, descriptor, artifactServerFactory);
}
LOG.info(
"ArtifactStagingService started on {}",
artifactStagingService.getApiServiceDescriptor().getUrl());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.function.Supplier;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.fn.channel.SocketAddressFactory;
import org.apache.beam.vendor.grpc.v1.io.grpc.BindableService;
Expand All @@ -44,6 +45,17 @@ public static ServerFactory createWithUrlFactory(UrlFactory urlFactory) {
return new InetSocketAddressServerFactory(urlFactory);
}

/** Create a {@link ServerFactory} that uses ports from a supplier. */
public static ServerFactory createWithPortSupplier(Supplier<Integer> portSupplier) {
return new InetSocketAddressServerFactory(UrlFactory.createDefault(), portSupplier);
}

/** Create a {@link ServerFactory} that uses the given url factory and ports from a supplier. */
public static ServerFactory createWithUrlFactoryAndPortSupplier(
UrlFactory urlFactory, Supplier<Integer> portSupplier) {
return new InetSocketAddressServerFactory(urlFactory, portSupplier);
}

/**
* Creates an instance of this server using an ephemeral port chosen automatically. The chosen
* port is accessible to the caller from the URL set in the input {@link
Expand All @@ -68,16 +80,23 @@ public abstract Server create(
*/
public static class InetSocketAddressServerFactory extends ServerFactory {
private final UrlFactory urlFactory;
private final Supplier<Integer> portSupplier;

private InetSocketAddressServerFactory(UrlFactory urlFactory) {
this(urlFactory, () -> 0);
}

private InetSocketAddressServerFactory(UrlFactory urlFactory, Supplier<Integer> portSupplier) {
this.urlFactory = urlFactory;
this.portSupplier = portSupplier;
}

@Override
public Server allocatePortAndCreate(
BindableService service, Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptor)
throws IOException {
InetSocketAddress address = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
InetSocketAddress address =
new InetSocketAddress(InetAddress.getLoopbackAddress(), portSupplier.get());
Server server = createServer(service, address);
apiServiceDescriptor.setUrl(urlFactory.createUrl(address.getHostName(), server.getPort()));
return server;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
Expand Down Expand Up @@ -69,6 +70,11 @@
public class DockerJobBundleFactory implements JobBundleFactory {
private static final Logger LOG = LoggerFactory.getLogger(DockerJobBundleFactory.class);

// Port offset for MacOS since we don't have host networking and need to use published ports
private static final int MAC_PORT_START = 8100;
private static final int MAC_PORT_END = 8200;
private static final AtomicInteger MAC_PORT = new AtomicInteger(MAC_PORT_START);

/** Factory that creates {@link JobBundleFactory} for the given {@link JobInfo}. */
public interface JobBundleFactoryFactory {
DockerJobBundleFactory create(JobInfo jobInfo) throws Exception;
Expand Down Expand Up @@ -218,8 +224,15 @@ protected ServerFactory getServerFactory() {
// https://docs.docker.com/docker-for-mac/networking/#use-cases-and-workarounds
// The special hostname has historically changed between versions, so this is subject to
// breakages and will likely only support the latest version at any time.
return ServerFactory.createWithUrlFactory(
(host, port) -> HostAndPort.fromParts(DOCKER_FOR_MAC_HOST, port).toString());

// We need to use a fixed port range due to non-existing host networking in Docker-for-Mac.
// The port range needs to be published when bringing up the Docker container, see
// DockerEnvironmentFactory.

return ServerFactory.createWithUrlFactoryAndPortSupplier(
(host, port) -> HostAndPort.fromParts(DOCKER_FOR_MAC_HOST, port).toString(),
// We only use the published Docker ports 8100-8200 in a round-robin fashion
() -> MAC_PORT.getAndUpdate(val -> val == MAC_PORT_END ? MAC_PORT_START : val + 1));
default:
LOG.warn("Unknown Docker platform. Falling back to default server factory");
return ServerFactory.createDefault();
Expand All @@ -229,7 +242,10 @@ protected ServerFactory getServerFactory() {
private static Platform getPlatform() {
String osName = System.getProperty("os.name").toLowerCase();
// TODO: Make this more robust?
if (osName.startsWith("mac")) {
// The DOCKER_MAC_CONTAINER environment variable is necessary to detect whether we run on
// a container on MacOs. MacOs internally uses a Linux VM which makes it indistinguishable from Linux.
// We still need to apply port mapping due to missing host networking.
if (osName.startsWith("mac") || "1".equals(System.getenv("DOCKER_MAC_CONTAINER"))) {
return Platform.MAC;
} else if (osName.startsWith("linux")) {
return Platform.LINUX;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ public RemoteEnvironment createEnvironment(Environment environment) throws Excep
.addAll(gcsCredentialArgs())
// NOTE: Host networking does not work on Mac, but the command line flag is accepted.
.add("--network=host")
// We need to pass on the information about Docker-on-Mac environment (due to missing host networking on Mac)
.add("--env=DOCKER_MAC_CONTAINER=" + System.getenv("DOCKER_MAC_CONTAINER"))
.build();

List<String> args =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ def _subprocess_command(cls, port):
'java',
'-jar', flinkJobServerJar,
'--artifacts-dir', tmp_dir,
'--job-host', 'localhost:%s' % port,
'--job-host', 'localhost',
'--job-port', str(port),
]
finally:
shutil.rmtree(tmp_dir)
Expand Down
101 changes: 101 additions & 0 deletions sdks/python/apache_beam/runners/portability/job_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from __future__ import absolute_import

import atexit
import logging
import os
import signal
import sys
import time
from subprocess import Popen
from threading import Lock


class DockerizedJobServer(object):
"""
Spins up the JobServer in a docker container for local execution
"""

def __init__(self, job_host="localhost",
job_port=8099,
artifact_port=8098,
harness_port_range=(8100, 8200),
max_connection_retries=5):
self.job_host = job_host
self.job_port = job_port
self.artifact_port = artifact_port
self.harness_port_range = harness_port_range
self.max_connection_retries = max_connection_retries
self.docker_process = None
self.process_lock = Lock()

def start(self):
# TODO This is hardcoded to Flink at the moment but should be changed
job_server_image_name = os.environ['USER'] + \
"-docker-apache.bintray.io/beam/flink-job-server:latest"
cmd = ["docker", "run",
# We mount the docker binary and socket to be able to spin up
# "sibling" containers for the SDK harness.
"-v", "/usr/local/bin/docker:/bin/docker",
"-v", "/var/run/docker.sock:/var/run/docker.sock"]
args = ["--job-host", self.job_host, "--job-port", str(self.job_port)]

if sys.platform == "darwin":
# Docker-for-Mac doesn't support host networking, so we need to explictly
# publish ports from the Docker container to be able to connect to it.
# Also, all other containers need to be aware that they run Docker-on-Mac
# to connect against the internal Docker-for-Mac address.
cmd += ["-e", "DOCKER_MAC_CONTAINER=1"]
cmd += ["-p", "{}:{}".format(self.job_port, self.job_port)]
cmd += ["-p", "{}:{}".format(self.artifact_port, self.artifact_port)]
cmd += ["-p", "{0}-{1}:{0}-{1}".format(
self.harness_port_range[0], self.harness_port_range[1])]
args += ["--artifact-port", "{}".format(self.artifact_port)]
else:
# This shouldn't be set for MacOS because it detroys port forwardings,
# even though host networking is not supported on MacOS.
cmd.append("--network=host")

cmd.append(job_server_image_name)
cmd += args

logging.debug("Starting container with %s", cmd)
try:
self.docker_process = Popen(cmd)
atexit.register(self.stop)
signal.signal(signal.SIGINT, self.stop)
except: # pylint:disable=bare-except
logging.exception("Error bringing up container")
self.stop()

return "{}:{}".format(self.job_host, self.job_port)

def stop(self):
with self.process_lock:
if not self.docker_process:
return
num_retries = 0
while self.docker_process.poll() is None and \
num_retries < self.max_connection_retries:
logging.debug("Sending SIGINT to job_server container")
self.docker_process.send_signal(signal.SIGINT)
num_retries += 1
time.sleep(1)
if self.docker_process.poll is None:
self.docker_process.kill()
Loading

0 comments on commit 96d2887

Please sign in to comment.