Skip to content

Commit

Permalink
[FLINK-11048] Ability to programmatically execute streaming pipeline …
Browse files Browse the repository at this point in the history
…with savepoint restore

This closes apache#7249.
  • Loading branch information
tweise committed Dec 20, 2018
1 parent eadfae0 commit bb9b176
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.flink.streaming.api.environment;

import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.ExecutionEnvironment;
Expand All @@ -28,6 +30,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.streaming.api.graph.StreamGraph;

import org.slf4j.Logger;
Expand Down Expand Up @@ -65,6 +68,9 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
/** The classpaths that need to be attached to each job. */
private final List<URL> globalClasspaths;

/** The savepoint restore settings for job execution. */
private final SavepointRestoreSettings savepointRestoreSettings;

/**
* Creates a new RemoteStreamEnvironment that points to the master
* (JobManager) described by the given host name and port.
Expand Down Expand Up @@ -133,6 +139,36 @@ public RemoteStreamEnvironment(String host, int port, Configuration clientConfig
* The protocol must be supported by the {@link java.net.URLClassLoader}.
*/
public RemoteStreamEnvironment(String host, int port, Configuration clientConfiguration, String[] jarFiles, URL[] globalClasspaths) {
this(host, port, clientConfiguration, jarFiles, null, null);
}

/**
* Creates a new RemoteStreamEnvironment that points to the master
* (JobManager) described by the given host name and port.
*
* @param host
* The host name or address of the master (JobManager), where the
* program should be executed.
* @param port
* The port of the master (JobManager), where the program should
* be executed.
* @param clientConfiguration
* The configuration used to parametrize the client that connects to the
* remote cluster.
* @param jarFiles
* The JAR files with code that needs to be shipped to the
* cluster. If the program uses user-defined functions,
* user-defined input formats, or any libraries, those must be
* provided in the JAR files.
* @param globalClasspaths
* The paths of directories and JAR files that are added to each user code
* classloader on all nodes in the cluster. Note that the paths must specify a
* protocol (e.g. file://) and be accessible on all nodes (e.g. by means of a NFS share).
* The protocol must be supported by the {@link java.net.URLClassLoader}.
* @param savepointRestoreSettings
* Optional savepoint restore settings for job execution.
*/
public RemoteStreamEnvironment(String host, int port, Configuration clientConfiguration, String[] jarFiles, URL[] globalClasspaths, SavepointRestoreSettings savepointRestoreSettings) {
if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
throw new InvalidProgramException(
"The RemoteEnvironment cannot be used when submitting a program through a client, " +
Expand Down Expand Up @@ -167,35 +203,62 @@ public RemoteStreamEnvironment(String host, int port, Configuration clientConfig
else {
this.globalClasspaths = Arrays.asList(globalClasspaths);
}
this.savepointRestoreSettings = savepointRestoreSettings;
}

@Override
public JobExecutionResult execute(String jobName) throws ProgramInvocationException {
StreamGraph streamGraph = getStreamGraph();
/**
* Executes the job remotely.
*
* <p>This method can be used independent of the {@link StreamExecutionEnvironment} type.
* @return The result of the job execution, containing elapsed time and accumulators.
*/
@PublicEvolving
public static JobExecutionResult executeRemotely(StreamExecutionEnvironment streamExecutionEnvironment,
List<URL> jarFiles,
String host,
int port,
Configuration clientConfiguration,
List<URL> globalClasspaths,
String jobName,
SavepointRestoreSettings savepointRestoreSettings
) throws ProgramInvocationException {
StreamGraph streamGraph = streamExecutionEnvironment.getStreamGraph();
streamGraph.setJobName(jobName);
transformations.clear();
return executeRemotely(streamGraph, jarFiles);
return executeRemotely(streamGraph,
streamExecutionEnvironment.getClass().getClassLoader(),
streamExecutionEnvironment.getConfig(),
jarFiles,
host,
port,
clientConfiguration,
globalClasspaths,
savepointRestoreSettings);
}

/**
* Executes the remote job.
* Execute the given stream graph remotely.
*
* @param streamGraph
* Stream Graph to execute
* @param jarFiles
* List of jar file URLs to ship to the cluster
* @return The result of the job execution, containing elapsed time and accumulators.
* <p>Method for internal use since it exposes stream graph and other implementation details that are subject to change.
* @throws ProgramInvocationException
*/
protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List<URL> jarFiles) throws ProgramInvocationException {
private static JobExecutionResult executeRemotely(StreamGraph streamGraph,
ClassLoader envClassLoader,
ExecutionConfig executionConfig,
List<URL> jarFiles,
String host,
int port,
Configuration clientConfiguration,
List<URL> globalClasspaths,
SavepointRestoreSettings savepointRestoreSettings
) throws ProgramInvocationException {
if (LOG.isInfoEnabled()) {
LOG.info("Running remotely at {}:{}", host, port);
}

ClassLoader usercodeClassLoader = JobWithJars.buildUserCodeClassLoader(jarFiles, globalClasspaths,
getClass().getClassLoader());
ClassLoader userCodeClassLoader = JobWithJars.buildUserCodeClassLoader(jarFiles, globalClasspaths, envClassLoader);

Configuration configuration = new Configuration();
configuration.addAll(this.clientConfiguration);
configuration.addAll(clientConfiguration);

configuration.setString(JobManagerOptions.ADDRESS, host);
configuration.setInteger(JobManagerOptions.PORT, port);
Expand All @@ -211,10 +274,15 @@ protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List<URL>
streamGraph.getJobGraph().getJobID(), e);
}

client.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
client.setPrintStatusDuringExecution(executionConfig.isSysoutLoggingEnabled());

if (savepointRestoreSettings == null) {
savepointRestoreSettings = SavepointRestoreSettings.none();
}

try {
return client.run(streamGraph, jarFiles, globalClasspaths, usercodeClassLoader).getJobExecutionResult();
return client.run(streamGraph, jarFiles, globalClasspaths, userCodeClassLoader, savepointRestoreSettings)
.getJobExecutionResult();
}
catch (ProgramInvocationException e) {
throw e;
Expand All @@ -233,6 +301,37 @@ protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List<URL>
}
}

@Override
public JobExecutionResult execute(String jobName) throws ProgramInvocationException {
StreamGraph streamGraph = getStreamGraph();
streamGraph.setJobName(jobName);
transformations.clear();
return executeRemotely(streamGraph, jarFiles);
}

/**
* Executes the remote job.
*
* <p>Note: This method exposes stream graph internal in the public API, but cannot be removed for backward compatibility.
* @param streamGraph
* Stream Graph to execute
* @param jarFiles
* List of jar file URLs to ship to the cluster
* @return The result of the job execution, containing elapsed time and accumulators.
*/
@Deprecated
protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List<URL> jarFiles) throws ProgramInvocationException {
return executeRemotely(streamGraph,
this.getClass().getClassLoader(),
getConfig(),
jarFiles,
host,
port,
clientConfiguration,
globalClasspaths,
savepointRestoreSettings);
}

@Override
public String toString() {
return "Remote Environment (" + this.host + ":" + this.port + " - parallelism = "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,53 +18,78 @@

package org.apache.flink.streaming.api.environment;

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamUtils;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.util.TestLogger;

import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

import java.util.Iterator;

import static org.mockito.Mockito.when;

/**
* Tests for the {@link RemoteStreamEnvironment}.
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({RemoteStreamEnvironment.class})
public class RemoteStreamExecutionEnvironmentTest extends TestLogger {

@ClassRule
public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(1)
.build());

/**
* Verifies that the port passed to the RemoteStreamEnvironment is used for connecting to the cluster.
*/
@Test
public void testPortForwarding() throws Exception {
final Configuration clientConfiguration = new Configuration();
clientConfiguration.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 0);

final MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
String host = "fakeHost";
int port = 99;
JobExecutionResult expectedResult = new JobExecutionResult(null, 0, null);

RestClusterClient mockedClient = Mockito.mock(RestClusterClient.class);
when(mockedClient.run(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()))
.thenReturn(expectedResult);

PowerMockito.whenNew(RestClusterClient.class).withAnyArguments().thenAnswer((invocation) -> {
Object[] args = invocation.getArguments();
Configuration config = (Configuration) args[0];

Assert.assertEquals(host, config.getString(RestOptions.ADDRESS));
Assert.assertEquals(port, config.getInteger(RestOptions.PORT));
return mockedClient;
}
);

final Configuration clientConfiguration = new Configuration();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
miniCluster.getRestAddress().getHost(),
miniCluster.getRestAddress().getPort(),
clientConfiguration);
host, port, clientConfiguration);
env.fromElements(1).map(x -> x * 2);
JobExecutionResult actualResult = env.execute("fakeJobName");
Assert.assertEquals(expectedResult, actualResult);
}

@Test
public void testRemoteExecutionWithSavepoint() throws Exception {
SavepointRestoreSettings restoreSettings = SavepointRestoreSettings.forPath("fakePath");
RemoteStreamEnvironment env = new RemoteStreamEnvironment("fakeHost", 1,
null, new String[]{}, null, restoreSettings);
env.fromElements(1).map(x -> x * 2);

RestClusterClient mockedClient = Mockito.mock(RestClusterClient.class);
JobExecutionResult expectedResult = new JobExecutionResult(null, 0, null);

final DataStream<Integer> resultStream = env.fromElements(1)
.map(x -> x * 2);
PowerMockito.whenNew(RestClusterClient.class).withAnyArguments().thenReturn(mockedClient);
when(mockedClient.run(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(restoreSettings)))
.thenReturn(expectedResult);

final Iterator<Integer> result = DataStreamUtils.collect(resultStream);
Assert.assertTrue(result.hasNext());
Assert.assertEquals(2, result.next().intValue());
Assert.assertFalse(result.hasNext());
JobExecutionResult actualResult = env.execute("fakeJobName");
Assert.assertEquals(expectedResult, actualResult);
}
}

0 comments on commit bb9b176

Please sign in to comment.