Skip to content

Commit

Permalink
[FLINK-3667] refactor client communication classes
Browse files Browse the repository at this point in the history
- ClusterDescriptor: base interface for cluster deployment descriptors
- ClusterDescriptor: YarnClusterDescriptor

- ClusterClient: base class for ClusterClients, handles lifecycle of cluster
- ClusterClient: shares configuration with the implementations
- ClusterClient: StandaloneClusterClient, YarnClusterClient
- ClusterClient: remove run methods and enable detached mode via flag

- CliFrontend: remove all Yarn specific logic
- CliFrontend: remove all cluster setup logic

- CustomCommandLine: interface for other cluster implementations
- Customcommandline: enables creation of new cluster or resuming from existing

- Yarn: move Yarn classes and functionality to the yarn module (yarn
  properties, yarn interfaces)
- Yarn: improve reliability of cluster startup
- Yarn Tests: only disable parallel execution of ITCases

This closes apache#1978
  • Loading branch information
mxm committed Jun 17, 2016
1 parent efc344a commit f9b52a3
Show file tree
Hide file tree
Showing 57 changed files with 1,567 additions and 1,348 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,12 @@
package org.apache.flink.api.avro;

import java.io.File;
import java.net.InetAddress;

import org.apache.flink.api.common.Plan;
import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.RemoteExecutor;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.JobWithJars;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.plan.FlinkPlan;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;

import org.junit.Assert;
Expand Down Expand Up @@ -64,10 +57,10 @@ public void testExternalProgram() {
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, testMiniCluster.getLeaderRPCPort());

Client client = new Client(config);
ClusterClient client = new StandaloneClusterClient(config);

client.setPrintStatusDuringExecution(false);
client.runBlocking(program, 4);
client.run(program, 4);

}
catch (Throwable t) {
Expand Down
Loading

0 comments on commit f9b52a3

Please sign in to comment.