Skip to content

Commit

Permalink
[hotfix] Introduce builder for MiniClusterResourceConfiguration
Browse files Browse the repository at this point in the history
  • Loading branch information
tillrohrmann committed Jun 22, 2018
1 parent bf7e101 commit 6de6ed4
Show file tree
Hide file tree
Showing 56 changed files with 509 additions and 343 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;

import java.util.List;

Expand Down Expand Up @@ -137,7 +138,7 @@ private JobExecutorService createJobExecutorService(Configuration configuration)
configuration.getInteger(
ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER))
.setRpcServiceSharing(MiniClusterConfiguration.RpcServiceSharing.SHARED)
.setRpcServiceSharing(RpcServiceSharing.SHARED)
.setNumSlotsPerTaskManager(
configuration.getInteger(
TaskManagerOptions.NUM_TASK_SLOTS, 1))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.util.Collector;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TestLogger;
Expand Down Expand Up @@ -118,10 +119,10 @@ public static void setup() throws Exception {
+ "/";

miniClusterResource = new MiniClusterResource(
new MiniClusterResource.MiniClusterResourceConfiguration(
new org.apache.flink.configuration.Configuration(),
1,
4));
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(4)
.build());

miniClusterResource.before();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.modules.HadoopModule;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.SecureTestEnvironment;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.test.util.TestingSecurityContext;
Expand Down Expand Up @@ -147,10 +148,12 @@ public static void setup() throws Exception {

Configuration configuration = startSecureFlinkClusterWithRecoveryModeEnabled();

miniClusterResource = new MiniClusterResource(new MiniClusterResource.MiniClusterResourceConfiguration(
configuration,
1,
4));
miniClusterResource = new MiniClusterResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(configuration)
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(4)
.build());

miniClusterResource.before();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.util.InstantiationUtil;

import org.junit.AfterClass;
Expand Down Expand Up @@ -70,10 +71,11 @@ public class KafkaShortRetentionTestBase implements Serializable {

@ClassRule
public static MiniClusterResource flink = new MiniClusterResource(
new MiniClusterResource.MiniClusterResourceConfiguration(
getConfiguration(),
NUM_TMS,
TM_SLOTS));
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(getConfiguration())
.setNumberTaskManagers(NUM_TMS)
.setNumberSlotsPerTaskManager(TM_SLOTS)
.build());

@ClassRule
public static TemporaryFolder tempFolder = new TemporaryFolder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.TestLogger;
Expand Down Expand Up @@ -81,10 +82,11 @@ public abstract class KafkaTestBase extends TestLogger {

@ClassRule
public static MiniClusterResource flink = new MiniClusterResource(
new MiniClusterResource.MiniClusterResourceConfiguration(
getFlinkConfiguration(),
NUM_TMS,
TM_SLOTS),
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(getFlinkConfiguration())
.setNumberTaskManagers(NUM_TMS)
.setNumberSlotsPerTaskManager(TM_SLOTS)
.build(),
true);

protected static FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.flink.streaming.test.examples.windowing;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.examples.windowing.TopSpeedWindowing;
import org.apache.flink.streaming.examples.windowing.util.TopSpeedWindowingExampleData;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.TestLogger;

Expand All @@ -42,10 +42,10 @@ public class TopSpeedWindowingExampleITCase extends TestLogger {

@ClassRule
public static MiniClusterResource miniClusterResource = new MiniClusterResource(
new MiniClusterResource.MiniClusterResourceConfiguration(
new Configuration(),
1,
1));
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(1)
.build());

@Test
public void testTopSpeedWindowingExampleITCase() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.util.NetUtils;

import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -69,10 +70,10 @@ public class DistributedCacheDfsTest {

@ClassRule
public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
new MiniClusterResource.MiniClusterResourceConfiguration(
new org.apache.flink.configuration.Configuration(),
1,
1));
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(1)
.build());

private static MiniDFSCluster hdfsCluster;
private static Configuration conf = new Configuration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@

package org.apache.flink.ml.util

import org.apache.flink.configuration.Configuration
import org.apache.flink.test.util.MiniClusterResource
import org.apache.flink.test.util.MiniClusterResource.MiniClusterResourceConfiguration
import org.apache.flink.test.util.{MiniClusterResource, MiniClusterResourceConfiguration}
import org.scalatest.{BeforeAndAfter, Suite}

/** Mixin to start and stop a LocalFlinkMiniCluster automatically for Scala based tests.
Expand Down Expand Up @@ -57,8 +55,10 @@ trait FlinkTestBase extends BeforeAndAfter {

before {
val cl = new MiniClusterResource(
new MiniClusterResourceConfiguration(new Configuration(), 1, parallelism)
)
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(parallelism)
.build())

cl.before()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.table.client.gateway.TypedResult;
import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.TestLogger;
Expand Down Expand Up @@ -71,10 +72,11 @@ public class LocalExecutorITCase extends TestLogger {

@ClassRule
public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
new MiniClusterResource.MiniClusterResourceConfiguration(
getConfig(),
NUM_TMS,
NUM_SLOTS_PER_TM),
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(getConfig())
.setNumberTaskManagers(NUM_TMS)
.setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
.build(),
true);

private static ClusterClient<?> clusterClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.util.TestLogger;

import org.junit.Assert;
import org.junit.ClassRule;
Expand All @@ -57,14 +59,15 @@
/**
* Tests to verify JMX reporter functionality on the JobManager.
*/
public class JMXJobManagerMetricTest {
public class JMXJobManagerMetricTest extends TestLogger {

@ClassRule
public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
new MiniClusterResource.MiniClusterResourceConfiguration(
getConfiguration(),
1,
1),
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(getConfiguration())
.setNumberSlotsPerTaskManager(1)
.setNumberTaskManagers(1)
.build(),
true);

private static Configuration getConfiguration() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;

import org.apache.curator.test.TestingServer;
import org.junit.AfterClass;
Expand Down Expand Up @@ -68,10 +69,11 @@ public static void setup() throws Exception {
// we have to manage this manually because we have to create the ZooKeeper server
// ahead of this
miniClusterResource = new MiniClusterResource(
new MiniClusterResource.MiniClusterResourceConfiguration(
getConfig(),
NUM_TMS,
NUM_SLOTS_PER_TM),
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(getConfig())
.setNumberTaskManagers(NUM_TMS)
.setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
.build(),
true);

miniClusterResource.before();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.queryablestate.client.QueryableStateClient;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;

import org.apache.curator.test.TestingServer;
import org.junit.AfterClass;
Expand Down Expand Up @@ -68,10 +69,11 @@ public static void setup() throws Exception {
// we have to manage this manually because we have to create the ZooKeeper server
// ahead of this
miniClusterResource = new MiniClusterResource(
new MiniClusterResource.MiniClusterResourceConfiguration(
getConfig(),
NUM_TMS,
NUM_SLOTS_PER_TM),
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(getConfig())
.setNumberTaskManagers(NUM_TMS)
.setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
.build(),
true);

miniClusterResource.before();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;

import org.junit.AfterClass;
import org.junit.BeforeClass;
Expand All @@ -52,10 +53,11 @@ public class NonHAQueryableStateFsBackendITCase extends AbstractQueryableStateTe

@ClassRule
public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
new MiniClusterResource.MiniClusterResourceConfiguration(
getConfig(),
NUM_TMS,
NUM_SLOTS_PER_TM),
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(getConfig())
.setNumberTaskManagers(NUM_TMS)
.setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
.build(),
true);

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.queryablestate.client.QueryableStateClient;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;

import org.junit.AfterClass;
import org.junit.BeforeClass;
Expand All @@ -52,10 +53,11 @@ public class NonHAQueryableStateRocksDBBackendITCase extends AbstractQueryableSt

@ClassRule
public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
new MiniClusterResource.MiniClusterResourceConfiguration(
getConfig(),
NUM_TMS,
NUM_SLOTS_PER_TM),
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(getConfig())
.setNumberTaskManagers(NUM_TMS)
.setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
.build(),
true);

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.TestLogger;

Expand Down Expand Up @@ -80,10 +81,11 @@ public class WebFrontendITCase extends TestLogger {

@ClassRule
public static final MiniClusterResource CLUSTER = new MiniClusterResource(
new MiniClusterResource.MiniClusterResourceConfiguration(
CLUSTER_CONFIGURATION,
NUM_TASK_MANAGERS,
NUM_SLOTS),
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(CLUSTER_CONFIGURATION)
.setNumberTaskManagers(NUM_TASK_MANAGERS)
.setNumberSlotsPerTaskManager(NUM_SLOTS)
.build(),
true
);

Expand Down Expand Up @@ -153,7 +155,7 @@ public void testResponseHeaders() throws Exception {
if (notFoundJobConnection.getResponseCode() >= 400) {
// we don't set the content-encoding header
Assert.assertNull(notFoundJobConnection.getContentEncoding());
if (Objects.equals(MiniClusterResource.NEW_CODEBASE, System.getProperty(MiniClusterResource.CODEBASE_KEY))) {
if (Objects.equals(CLUSTER.getMiniClusterType(), TestBaseUtils.CodebaseType.NEW)) {
Assert.assertEquals("application/json; charset=UTF-8", notFoundJobConnection.getContentType());
} else {
Assert.assertEquals("text/plain; charset=UTF-8", notFoundJobConnection.getContentType());
Expand Down Expand Up @@ -281,7 +283,7 @@ public void testStop() throws Exception {
final Deadline deadline = testTimeout.fromNow();

try (HttpTestClient client = new HttpTestClient("localhost", CLUSTER.getWebUIPort())) {
if (Objects.equals(MiniClusterResource.NEW_CODEBASE, System.getProperty(MiniClusterResource.CODEBASE_KEY))) {
if (Objects.equals(CLUSTER.getMiniClusterType(), TestBaseUtils.CodebaseType.NEW)) {
// stop the job
client.sendPatchRequest("/jobs/" + jid + "/?mode=stop", deadline.timeLeft());
HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft());
Expand Down Expand Up @@ -356,7 +358,7 @@ public void testStopYarn() throws Exception {
HttpTestClient.SimpleHttpResponse response = client
.getNextResponse(deadline.timeLeft());

if (Objects.equals(MiniClusterResource.NEW_CODEBASE, System.getProperty(MiniClusterResource.CODEBASE_KEY))) {
if (Objects.equals(CLUSTER.getMiniClusterType(), TestBaseUtils.CodebaseType.NEW)) {
assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
} else {
assertEquals(HttpResponseStatus.OK, response.getStatus());
Expand Down
Loading

0 comments on commit 6de6ed4

Please sign in to comment.