Skip to content

Commit

Permalink
[FLINK-8703][tests] Port KafkaShortRetentionTestBase to MiniClusterRe…
Browse files Browse the repository at this point in the history
…source

This closes apache#5666.
  • Loading branch information
zentol committed Mar 14, 2018
1 parent 511f388 commit aa86a86
Showing 1 changed file with 17 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,14 @@
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.util.InstantiationUtil;

import org.junit.AfterClass;
Expand Down Expand Up @@ -68,21 +67,32 @@ public class KafkaShortRetentionTestBase implements Serializable {

private static KafkaTestEnvironment kafkaServer;
private static Properties standardProps;
private static LocalFlinkMiniCluster flink;

@ClassRule
public static MiniClusterResource flink = new MiniClusterResource(
new MiniClusterResource.MiniClusterResourceConfiguration(
getConfiguration(),
NUM_TMS,
TM_SLOTS));

@ClassRule
public static TemporaryFolder tempFolder = new TemporaryFolder();

protected static Properties secureProps = new Properties();

private static Configuration getConfiguration() {
Configuration flinkConfig = new Configuration();
flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
return flinkConfig;
}

@BeforeClass
public static void prepare() throws IOException, ClassNotFoundException {
public static void prepare() throws ClassNotFoundException {
LOG.info("-------------------------------------------------------------------------");
LOG.info(" Starting KafkaShortRetentionTestBase ");
LOG.info("-------------------------------------------------------------------------");

Configuration flinkConfig = new Configuration();

// dynamically load the implementation for the test
Class<?> clazz = Class.forName("org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl");
kafkaServer = (KafkaTestEnvironment) InstantiationUtil.instantiate(clazz);
Expand All @@ -101,26 +111,10 @@ public static void prepare() throws IOException, ClassNotFoundException {
kafkaServer.prepare(kafkaServer.createConfig().setKafkaServerProperties(specificProperties));

standardProps = kafkaServer.getStandardProperties();

// start also a re-usable Flink mini cluster
flinkConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");

flink = new LocalFlinkMiniCluster(flinkConfig, false);
flink.start();

TestStreamEnvironment.setAsContext(flink, PARALLELISM);
}

@AfterClass
public static void shutDownServices() throws Exception {
TestStreamEnvironment.unsetAsContext();

if (flink != null) {
flink.stop();
}
kafkaServer.shutdown();

secureProps.clear();
Expand Down Expand Up @@ -238,8 +232,7 @@ public void runFailOnAutoOffsetResetNone() throws Exception {

kafkaServer.createTestTopic(topic, parallelism, 1);

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flink.getLeaderRPCPort());
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
env.setRestartStrategy(RestartStrategies.noRestart()); // fail immediately
env.getConfig().disableSysoutLogging();
Expand Down

0 comments on commit aa86a86

Please sign in to comment.