Skip to content

Commit

Permalink
Merge pull request apache#10621: [BEAM-9056] Staging artifacts from e…
Browse files Browse the repository at this point in the history
…nvironment
  • Loading branch information
chamikaramj authored Mar 12, 2020
2 parents b0e7afb + 6fce252 commit 80a2ff6
Show file tree
Hide file tree
Showing 17 changed files with 499 additions and 274 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,44 @@
package org.apache.beam.runners.core.construction;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.ArtifactInformation;
import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
import org.apache.beam.model.pipeline.v1.RunnerApi.DockerPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
import org.apache.beam.model.pipeline.v1.RunnerApi.ExternalPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
import org.apache.beam.model.pipeline.v1.RunnerApi.ProcessPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.StandardArtifacts;
import org.apache.beam.model.pipeline.v1.RunnerApi.StandardEnvironments;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.util.ReleaseInfo;
import org.apache.beam.sdk.util.ZipFiles;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Utilities for interacting with portability {@link Environment environments}. */
public class Environments {
private static final Logger LOG = LoggerFactory.getLogger(Environments.class);

private static final ObjectMapper MAPPER =
new ObjectMapper()
.registerModules(ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
Expand Down Expand Up @@ -65,23 +83,31 @@ public class Environments {

private Environments() {}

public static Environment createOrGetDefaultEnvironment(String type, String config) {
if (Strings.isNullOrEmpty(type)) {
return JAVA_SDK_HARNESS_ENVIRONMENT;
}
public static Environment createOrGetDefaultEnvironment(PortablePipelineOptions options) {
String type = options.getDefaultEnvironmentType();
String config = options.getDefaultEnvironmentConfig();

switch (type) {
case ENVIRONMENT_EMBEDDED:
return createEmbeddedEnvironment(config);
case ENVIRONMENT_EXTERNAL:
case ENVIRONMENT_LOOPBACK:
return createExternalEnvironment(config);
case ENVIRONMENT_PROCESS:
return createProcessEnvironment(config);
case ENVIRONMENT_DOCKER:
default:
return createDockerEnvironment(config);
Environment defaultEnvironment;
if (Strings.isNullOrEmpty(type)) {
defaultEnvironment = JAVA_SDK_HARNESS_ENVIRONMENT;
} else {
switch (type) {
case ENVIRONMENT_EMBEDDED:
defaultEnvironment = createEmbeddedEnvironment(config);
break;
case ENVIRONMENT_EXTERNAL:
case ENVIRONMENT_LOOPBACK:
defaultEnvironment = createExternalEnvironment(config);
break;
case ENVIRONMENT_PROCESS:
defaultEnvironment = createProcessEnvironment(config);
break;
case ENVIRONMENT_DOCKER:
default:
defaultEnvironment = createDockerEnvironment(config);
}
}
return defaultEnvironment.toBuilder().addAllDependencies(getArtifacts(options)).build();
}

public static Environment createDockerEnvironment(String dockerImageUrl) {
Expand Down Expand Up @@ -175,6 +201,69 @@ public static Optional<Environment> getEnvironment(
}
}

public static Collection<ArtifactInformation> getArtifacts(PipelineOptions options) {
Set<String> pathsToStage = Sets.newHashSet();
List<String> stagingFiles = options.as(PortablePipelineOptions.class).getFilesToStage();
if (stagingFiles != null) {
pathsToStage.addAll(stagingFiles);
}

ImmutableList.Builder<ArtifactInformation> filesToStage = ImmutableList.builder();
for (String path : pathsToStage) {
File file = new File(path);
if (new File(path).exists()) {
// Spurious items get added to the classpath. Filter by just those that exist.
if (file.isDirectory()) {
// Zip up directories so we can upload them to the artifact service.
try {
filesToStage.add(createArtifactInformation(zipDirectory(file)));
} catch (IOException e) {
throw new RuntimeException(e);
}
} else {
filesToStage.add(createArtifactInformation(file));
}
}
}
return filesToStage.build();
}

private static File zipDirectory(File directory) throws IOException {
File zipFile = File.createTempFile(directory.getName(), ".zip");
try (FileOutputStream fos = new FileOutputStream(zipFile)) {
ZipFiles.zipDirectory(directory, fos);
}
return zipFile;
}

private static String createStagingFileName(File file) {
// TODO: https://issues.apache.org/jira/browse/BEAM-4109 Support arbitrary names in the staging
// service itself.
// HACK: Encode the path name ourselves because the local artifact staging service currently
// assumes artifact names correspond to a flat directory. Artifact staging services should
// generally accept arbitrary artifact names.
// NOTE: Base64 url encoding does not work here because the stage artifact names tend to be long
// and exceed file length limits on the artifact stager.
return UUID.randomUUID().toString();
}

public static ArtifactInformation createArtifactInformation(File file) {
ArtifactInformation.Builder artifactBuilder = ArtifactInformation.newBuilder();
artifactBuilder.setTypeUrn(BeamUrns.getUrn(StandardArtifacts.Types.FILE));
artifactBuilder.setTypePayload(
RunnerApi.ArtifactFilePayload.newBuilder()
.setPath(file.getAbsolutePath())
.build()
.toByteString());
artifactBuilder.setRoleUrn(BeamUrns.getUrn(StandardArtifacts.Roles.STAGING_TO));
artifactBuilder.setRolePayload(
RunnerApi.ArtifactStagingToRolePayload.newBuilder()
.setStagedName(createStagingFileName(file))
.build()
.toByteString());
return artifactBuilder.build();
}

private static class ProcessPayloadReferenceJSON {
@Nullable private String os;
@Nullable private String arch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,8 @@ public static SdkComponents create(RunnerApi.Components components) {
public static SdkComponents create(PipelineOptions options) {
SdkComponents sdkComponents = new SdkComponents(RunnerApi.Components.getDefaultInstance(), "");
PortablePipelineOptions portablePipelineOptions = options.as(PortablePipelineOptions.class);
sdkComponents.defaultEnvironmentId =
sdkComponents.registerEnvironment(
Environments.createOrGetDefaultEnvironment(
portablePipelineOptions.getDefaultEnvironmentType(),
portablePipelineOptions.getDefaultEnvironmentConfig()));
sdkComponents.registerEnvironment(
Environments.createOrGetDefaultEnvironment(portablePipelineOptions));
return sdkComponents;
}

Expand All @@ -114,7 +111,7 @@ public void mergeFrom(RunnerApi.Components components) {
reservedIds.addAll(components.getCodersMap().keySet());
reservedIds.addAll(components.getEnvironmentsMap().keySet());

environmentIds.inverse().putAll(components.getEnvironmentsMap());
components.getEnvironmentsMap().forEach(environmentIds.inverse()::forcePut);

componentsBuilder.mergeFrom(components);
}
Expand Down Expand Up @@ -261,14 +258,20 @@ public String registerCoder(Coder<?> coder) throws IOException {
* return the same unique ID.
*/
public String registerEnvironment(Environment env) {
String environmentId;
String existing = environmentIds.get(env);
if (existing != null) {
return existing;
environmentId = existing;
} else {
String name = uniqify(env.getUrn(), environmentIds.values());
environmentIds.put(env, name);
componentsBuilder.putEnvironments(name, env);
environmentId = name;
}
String name = uniqify(env.getUrn(), environmentIds.values());
environmentIds.put(env, name);
componentsBuilder.putEnvironments(name, env);
return name;
if (defaultEnvironmentId == null) {
defaultEnvironmentId = environmentId;
}
return environmentId;
}

public String getOnlyEnvironmentId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.beam.model.pipeline.v1.RunnerApi.ProcessPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.StandardEnvironments;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.ParDo;
Expand All @@ -46,18 +48,22 @@
public class EnvironmentsTest implements Serializable {
@Test
public void createEnvironments() throws IOException {
PortablePipelineOptions options = PipelineOptionsFactory.as(PortablePipelineOptions.class);
options.setDefaultEnvironmentType(Environments.ENVIRONMENT_DOCKER);
options.setDefaultEnvironmentConfig("java");
assertThat(
Environments.createOrGetDefaultEnvironment(Environments.ENVIRONMENT_DOCKER, "java"),
Environments.createOrGetDefaultEnvironment(options),
is(
Environment.newBuilder()
.setUrn(BeamUrns.getUrn(StandardEnvironments.Environments.DOCKER))
.setPayload(
DockerPayload.newBuilder().setContainerImage("java").build().toByteString())
.build()));
options.setDefaultEnvironmentType(Environments.ENVIRONMENT_PROCESS);
options.setDefaultEnvironmentConfig(
"{\"os\": \"linux\", \"arch\": \"amd64\", \"command\": \"run.sh\", \"env\":{\"k1\": \"v1\", \"k2\": \"v2\"} }");
assertThat(
Environments.createOrGetDefaultEnvironment(
Environments.ENVIRONMENT_PROCESS,
"{\"os\": \"linux\", \"arch\": \"amd64\", \"command\": \"run.sh\", \"env\":{\"k1\": \"v1\", \"k2\": \"v2\"} }"),
Environments.createOrGetDefaultEnvironment(options),
is(
Environment.newBuilder()
.setUrn(BeamUrns.getUrn(StandardEnvironments.Environments.PROCESS))
Expand All @@ -71,9 +77,10 @@ public void createEnvironments() throws IOException {
.build()
.toByteString())
.build()));
options.setDefaultEnvironmentType(Environments.ENVIRONMENT_PROCESS);
options.setDefaultEnvironmentConfig("{\"command\": \"run.sh\"}");
assertThat(
Environments.createOrGetDefaultEnvironment(
Environments.ENVIRONMENT_PROCESS, "{\"command\": \"run.sh\"}"),
Environments.createOrGetDefaultEnvironment(options),
is(
Environment.newBuilder()
.setUrn(BeamUrns.getUrn(StandardEnvironments.Environments.PROCESS))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,8 +354,11 @@ public void getEnvironmentWithEnvironment() {

assertThat(qp.getEnvironment(environmentalRead).isPresent(), is(true));
assertThat(
qp.getEnvironment(environmentalRead).get(),
equalTo(Environments.JAVA_SDK_HARNESS_ENVIRONMENT));
qp.getEnvironment(environmentalRead).get().getUrn(),
equalTo(Environments.JAVA_SDK_HARNESS_ENVIRONMENT.getUrn()));
assertThat(
qp.getEnvironment(environmentalRead).get().getPayload(),
equalTo(Environments.JAVA_SDK_HARNESS_ENVIRONMENT.getPayload()));
assertThat(qp.getEnvironment(nonEnvironmentalTransform).isPresent(), is(false));
}

Expand Down
Loading

0 comments on commit 80a2ff6

Please sign in to comment.