Skip to content

Commit

Permalink
[FLINK-14033][yarn] upload user artifacts for yarn job cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
HuangZhenQiu authored and aljoscha committed Dec 3, 2019
1 parent 95ad441 commit f985ff3
Show file tree
Hide file tree
Showing 8 changed files with 321 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,15 @@ public void setUserArtifactBlobKey(String entryName, PermanentBlobKey blobKey) t
));
}

public void setUserArtifactRemotePath(String entryName, String remotePath) {
userArtifacts.computeIfPresent(entryName, (key, originalEntry) -> new DistributedCache.DistributedCacheEntry(
remotePath,
originalEntry.isExecutable,
null,
originalEntry.isZipped
));
}

public void writeUserArtifactEntriesToConfiguration() {
for (Map.Entry<String, DistributedCache.DistributedCacheEntry> userArtifact : userArtifacts.entrySet()) {
DistributedCache.writeFileInfoToConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.yarn;

import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.AkkaOptions;
Expand All @@ -32,7 +31,6 @@

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.junit.BeforeClass;
import org.junit.Rule;
Expand All @@ -48,7 +46,6 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

/**
* Test cases for the deployment of Yarn Flink clusters.
Expand Down Expand Up @@ -129,31 +126,9 @@ private void deployPerjob(YarnConfigOptions.UserJarInclusion userJarInclusion) t
assertThat(jobResult, is(notNullValue()));
assertThat(jobResult.getSerializedThrowable().isPresent(), is(false));

waitApplicationFinishedElseKillIt(applicationId, yarnAppTerminateTimeout, yarnClusterDescriptor);
waitApplicationFinishedElseKillIt(
applicationId, yarnAppTerminateTimeout, yarnClusterDescriptor, sleepIntervalInMS);
}
}
}

private void waitApplicationFinishedElseKillIt(
ApplicationId applicationId,
Duration timeout,
YarnClusterDescriptor yarnClusterDescriptor) throws Exception {
Deadline deadline = Deadline.now().plus(timeout);
YarnApplicationState state = getYarnClient().getApplicationReport(applicationId).getYarnApplicationState();

while (state != YarnApplicationState.FINISHED) {
if (state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) {
fail("Application became FAILED or KILLED while expecting FINISHED");
}

if (deadline.isOverdue()) {
yarnClusterDescriptor.killCluster(applicationId);
fail("Application didn't finish before timeout");
}

sleep(sleepIntervalInMS);
state = getYarnClient().getApplicationReport(applicationId).getYarnApplicationState();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.yarn;

import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.yarn.testjob.YarnTestCacheJob;
import org.apache.flink.yarn.util.YarnTestUtils;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.File;
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;

import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertThat;

/**
* Test cases for the deployment of Yarn Flink with Distributed Cache.
*/
public class YarnDistributedCacheITCase extends YarnTestBase {

private final Duration yarnAppTerminateTimeout = Duration.ofSeconds(10);

private final int sleepIntervalInMS = 100;

@BeforeClass
public static void setup() {
YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-with-distributed-cache");
startYARNWithConfig(YARN_CONFIGURATION);
}

@Test
public void testPerJobModeWithDistributedCache() throws Exception {
runTest(() -> {
Configuration configuration = new Configuration();
configuration.setString(AkkaOptions.ASK_TIMEOUT, "30 s");
final YarnClient yarnClient = getYarnClient();

try (final YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
configuration,
getYarnConfiguration(),
yarnClient,
true)) {

yarnClusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkShadedHadoopDir.listFiles()));

final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(768)
.setTaskManagerMemoryMB(1024)
.setSlotsPerTaskManager(1)
.setNumberTaskManagers(1)
.createClusterSpecification();

final JobGraph jobGraph = YarnTestCacheJob.getDistributedCacheJobGraph();

File testingJar = YarnTestBase.findFile("..", new YarnTestUtils.TestJarFinder("flink-yarn-tests"));

jobGraph.addJar(new org.apache.flink.core.fs.Path(testingJar.toURI()));

try (ClusterClient<ApplicationId> clusterClient = yarnClusterDescriptor.deployJobCluster(
clusterSpecification,
jobGraph,
false)) {

ApplicationId applicationId = clusterClient.getClusterId();

final CompletableFuture<JobResult> jobResultCompletableFuture = clusterClient.requestJobResult(jobGraph.getJobID());

final JobResult jobResult = jobResultCompletableFuture.get();

assertThat(jobResult, is(notNullValue()));
assertThat(jobResult.getSerializedThrowable().isPresent(), is(false));

waitApplicationFinishedElseKillIt(applicationId, yarnAppTerminateTimeout, yarnClusterDescriptor, sleepIntervalInMS);
}
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
Expand Down Expand Up @@ -1001,6 +1002,29 @@ public static boolean isOnTravis() {
return System.getenv("TRAVIS") != null && System.getenv("TRAVIS").equals("true");
}

protected void waitApplicationFinishedElseKillIt(
ApplicationId applicationId,
Duration timeout,
YarnClusterDescriptor yarnClusterDescriptor,
int sleepIntervalInMS) throws Exception {
Deadline deadline = Deadline.now().plus(timeout);
YarnApplicationState state = getYarnClient().getApplicationReport(applicationId).getYarnApplicationState();

while (state != YarnApplicationState.FINISHED) {
if (state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) {
Assert.fail("Application became FAILED or KILLED while expecting FINISHED");
}

if (deadline.isOverdue()) {
yarnClusterDescriptor.killCluster(applicationId);
Assert.fail("Application didn't finish before timeout");
}

sleep(sleepIntervalInMS);
state = getYarnClient().getApplicationReport(applicationId).getYarnApplicationState();
}
}

/**
* Wrapper around a {@link Scanner} that buffers the last N lines read.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.yarn.testjob;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.List;
import java.util.Properties;

import static org.apache.flink.util.Preconditions.checkArgument;

/**
* Testing job for distributed cache in per job cluster mode.
*/
public class YarnTestCacheJob {
private static final List<String> LIST = ImmutableList.of("test1", "test2");

public static JobGraph getDistributedCacheJobGraph() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

String cacheFilePath = Thread.currentThread().getContextClassLoader().getResource("cache.properties").getFile();

env.registerCachedFile(cacheFilePath, "cacheFile", false);

env.addSource(new GenericSourceFunction(LIST, TypeInformation.of(String.class)))
.setParallelism(1)
.map(new MapperFunction(), TypeInformation.of(String.class))
.setParallelism(1)
.addSink(new DiscardingSink<String>())
.setParallelism(1);

return env.getStreamGraph().getJobGraph();
}

private static class MapperFunction extends RichMapFunction<String, String> {
private Properties properties;
private static final long serialVersionUID = -1238033916372648233L;

@Override
public void open(Configuration config) throws IOException {
// access cached file via RuntimeContext and DistributedCache
File cacheFile = getRuntimeContext().getDistributedCache().getFile("cacheFile");
FileInputStream inputStream = new FileInputStream(cacheFile);
properties = new Properties();
properties.load(inputStream);
checkArgument(properties.size() == 2, "The property file should contains 2 pair of key values");
}

@Override
public String map(String value) {
return (String) properties.getOrDefault(value, "null");
}
}

private static class GenericSourceFunction<T> implements SourceFunction<T>, ResultTypeQueryable<T> {
private List<T> inputDataset;
private TypeInformation returnType;

GenericSourceFunction(List<T> inputDataset, TypeInformation returnType) {
this.inputDataset = inputDataset;
this.returnType = returnType;
}

@Override
public void run(SourceContext<T> ctx) throws Exception {

for (T t : inputDataset) {
ctx.collect(t);
}
}

@Override
public void cancel() {}

@Override
public TypeInformation getProducedType() {
return this.returnType;
}
}
}
20 changes: 20 additions & 0 deletions flink-yarn-tests/src/test/resources/cache.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

test1=hello
test2=world
Loading

0 comments on commit f985ff3

Please sign in to comment.