Skip to content

Commit

Permalink
Add startup timeout config parameter. Increase akka ask timeouts for …
Browse files Browse the repository at this point in the history
…integration tests. Increase akka logger startup timeout.
  • Loading branch information
tillrohrmann committed Dec 18, 2014
1 parent dd9a1ba commit 1cd44e0
Show file tree
Hide file tree
Showing 24 changed files with 36 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.IterativeDataStream;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.apache.flink.streaming.util.TestStreamEnvironment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.SplitDataStream;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.apache.flink.streaming.util.TestStreamEnvironment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import scala.concurrent.duration.FiniteDuration;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,8 @@ public final class ConfigConstants {

// ------------------------------ AKKA ------------------------------------

public static final String AKKA_STARTUP_TIMEOUT = "akka.startup-timeout";

/**
* Hearbeat interval of the transport failure detector
*/
Expand Down Expand Up @@ -578,6 +580,8 @@ public final class ConfigConstants {

// ------------------------------ Akka Values ------------------------------

public static String DEFAULT_AKKA_STARTUP_TIMEOUT = "10 s";

public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "1000 s";

public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_PAUSE = "6000 s";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.flink.api.java.typeutils.ValueTypeInfo;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.StringValue;
import org.junit.Assert;
import org.junit.Test;

import java.io.Serializable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.junit.Assert;
import org.junit.Test;

@SuppressWarnings("unused")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ object AkkaUtils {
}

def getConfigString(host: String, port: Int, configuration: Configuration): String = {
val startupTimeout = configuration.getString(ConfigConstants.AKKA_STARTUP_TIMEOUT,
ConfigConstants.DEFAULT_AKKA_STARTUP_TIMEOUT)
val transportHeartbeatInterval = configuration.getString(ConfigConstants.
AKKA_TRANSPORT_HEARTBEAT_INTERVAL,
ConfigConstants.DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL)
Expand Down Expand Up @@ -87,6 +89,8 @@ object AkkaUtils {
| log-dead-letters-during-shutdown = $logLifecycleEvents
|
| remote {
| startup-timeout = $startupTimeout
|
| transport-failure-detector{
| acceptable-heartbeat-pause = $transportHeartbeatPause
| heartbeat-interval = $transportHeartbeatInterval
Expand Down Expand Up @@ -135,6 +139,7 @@ object AkkaUtils {
| daemonic = on
|
| loggers = ["akka.event.slf4j.Slf4jLogger"]
| logger-startup-timeout = 30s
| loglevel = "WARNING"
| logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
| stdout-loglevel = "WARNING"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,8 +403,8 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
receiver ! UnregisterTask(executionID)
}
case Failure(t) =>
log.error(t, s"Execution state change notification failed for task ${executionID} " +
s"of job ${jobID}.")
log.warning(s"Execution state change notification failed for task ${executionID} " +
s"of job ${jobID}. Cause ${t.getMessage}.")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.io.IOException;

import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.ServerTestUtils;
import org.apache.flink.util.StringUtils;
import org.junit.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import org.apache.flink.runtime.io.network.channels.ChannelID;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.ServerTestUtils;
import org.apache.flink.util.StringUtils;
import org.junit.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.flink.runtime.instance.AllocatedSlot;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.io.FileReader;
import java.net.InetAddress;

import akka.actor.ActorRef;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.profiling.ProfilingException;
import org.apache.flink.runtime.profiling.impl.types.InternalInstanceProfilingData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,6 @@

package org.apache.flink.runtime.taskmanager;

import static org.junit.Assert.*;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
Expand All @@ -36,7 +29,6 @@
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.deployment.ChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.GateDeploymentDescriptor;
Expand All @@ -53,22 +45,30 @@
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.RegistrationMessages;
import org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult;
import org.apache.flink.runtime.messages.TaskManagerMessages.SubmitTask;
import org.apache.flink.runtime.messages.TaskManagerMessages.CancelTask;
import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager$;
import org.apache.flink.runtime.messages.TaskManagerMessages.SubmitTask;
import org.apache.flink.runtime.messages.TaskManagerMessages.TaskOperationResult;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.types.IntegerRecord;

import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;


public class TaskManagerTest {

Expand Down Expand Up @@ -544,7 +544,7 @@ public static ActorRef createTaskManager(ActorRef jm) {
timeout);

try {
FiniteDuration d = new FiniteDuration(2, TimeUnit.SECONDS);
FiniteDuration d = new FiniteDuration(20, TimeUnit.SECONDS);
Await.ready(response, d);
}catch(Exception e){
throw new RuntimeException("Exception while waiting for the task manager registration.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,8 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.util.jar.JarEntry;
import java.util.jar.JarOutputStream;
import java.util.jar.Manifest;

import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ WordSpecLike with Matchers with BeforeAndAfterAll {

expectMsg(SubmissionSuccess(jobGraph.getJobID))

Thread.sleep(300)
Thread.sleep(500)
BlockingOnceReceiver.blocking = false
taskManagers(0) ! PoisonPill

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ import scala.concurrent.ExecutionContext
object TestingUtils {
val testConfig = ConfigFactory.parseString(getDefaultTestingActorSystemConfigString)

val TESTING_DURATION = 20 second
val TESTING_DURATION = 1 minute

val DEFAULT_AKKA_ASK_TIMEOUT = 1000

def getDefaultTestingActorSystemConfigString: String = {
val ioRWSerializerClass = classOf[IOReadableWritableSerializer].getCanonicalName
Expand Down Expand Up @@ -103,6 +105,7 @@ object TestingUtils {
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers)
config.setInteger(ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY, 1000)
config.setInteger(ConfigConstants.AKKA_ASK_TIMEOUT, DEFAULT_AKKA_ASK_TIMEOUT)
val cluster = new TestingCluster(config)
cluster
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ public abstract class AbstractTestBase {

protected static final int DEFAULT_NUM_TASK_MANAGERS = 1;

protected static final int DEFAULT_AKKA_ASK_TIMEOUT = 1000;

protected static final String DEFAULT_AKKA_STARTUP_TIMEOUT = "60 s";

protected final Configuration config;

protected ForkableFlinkMiniCluster executor;
Expand Down Expand Up @@ -102,7 +106,8 @@ public void startCluster() throws Exception {
config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, TASK_MANAGER_MEMORY_SIZE);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, taskManagerNumSlots);
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers);
config.setInteger(ConfigConstants.AKKA_ASK_TIMEOUT, 1000000);
config.setInteger(ConfigConstants.AKKA_ASK_TIMEOUT, DEFAULT_AKKA_ASK_TIMEOUT);
config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, DEFAULT_AKKA_STARTUP_TIMEOUT);
this.executor = new ForkableFlinkMiniCluster(config);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.InputViewObjectInputStreamWrapper;
import org.apache.flink.core.memory.OutputViewObjectOutputStreamWrapper;
import org.apache.flink.runtime.util.SerializableHashSet;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.messages.JobClientMessages;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.junit.Assert;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.junit.Test;

@SuppressWarnings("serial")
public class DeltaIterationNotDependingOnSolutionSetITCase extends JavaProgramTestBase {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,11 @@

package org.apache.flink.test.iterative.nephele;

import java.io.IOException;

import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.io.network.channels.ChannelType;
import org.apache.flink.runtime.iterative.task.IterationSynchronizationSinkTask;
import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.Iterator;
import java.util.Set;
import java.util.StringTokenizer;

import org.apache.flink.api.common.JobExecutionResult;
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ under the License.
<!-- Need to use a user property here because the surefire/failsafe
forkCount is not exposed as a property. With this we can set
it on the "mvn" commandline in travis. -->
<flink.forkCount>1.5C</flink.forkCount>
<flink.forkCount>1C</flink.forkCount>
<flink.reuseForks>true</flink.reuseForks>
<slf4j.version>1.7.7</slf4j.version>
<guava.version>17.0</guava.version>
Expand Down

0 comments on commit 1cd44e0

Please sign in to comment.