Skip to content

Commit

Permalink
[FLINK-2781] [core] Cleanup NetUtils.
Browse files Browse the repository at this point in the history
  - The NetUtils class (in flink-core) contains all methods usable without runtime dependency
  - The runtime NetUtils class (to find connectiong addresses) is now called ConnectionUtil.
  • Loading branch information
StephanEwen committed Oct 1, 2015
1 parent 28d36cc commit a3c0b44
Show file tree
Hide file tree
Showing 22 changed files with 66 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.util.NetUtils;
import org.junit.Test;

import java.net.InetAddress;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import akka.actor.Props;
import akka.actor.Status;


import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
Expand All @@ -44,20 +43,17 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.runtime.util.SerializedThrowable;
import org.apache.flink.util.NetUtils;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;


import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import java.util.Collections;


import java.util.UUID;

import static org.junit.Assert.*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.runtime.net.ConnectionUtils;

public final class DataStreamUtils {

Expand All @@ -46,7 +46,7 @@ public static <OUT> Iterator<OUT> collect(DataStream<OUT> stream) {
String host = ((RemoteStreamEnvironment)env).getHost();
int port = ((RemoteStreamEnvironment)env).getPort();
try {
clientAddress = NetUtils.findConnectingAddress(new InetSocketAddress(host, port), 2000, 400);
clientAddress = ConnectionUtils.findConnectingAddress(new InetSocketAddress(host, port), 2000, 400);
} catch (IOException e) {
throw new RuntimeException("IOException while trying to connect to the master", e);
}
Expand Down
21 changes: 21 additions & 0 deletions flink-core/src/main/java/org/apache/flink/util/NetUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.flink.util;


import java.io.IOException;
import java.net.MalformedURLException;
import java.net.ServerSocket;
import java.net.URL;

public class NetUtils {
Expand Down Expand Up @@ -65,4 +67,23 @@ public static URL getCorrectHostnamePort(String hostPort) {
throw new IllegalArgumentException("The given host:port ('"+hostPort+"') is invalid", e);
}
}

/**
* Find a non-occupied port.
*
* @return A non-occupied port.
*/
public static int getAvailablePort() {
for (int i = 0; i < 50; i++) {
try (ServerSocket serverSocket = new ServerSocket(0)) {
int port = serverSocket.getLocalPort();
if (port != 0) {
return port;
}
}
catch (IOException ignored) {}
}

throw new RuntimeException("Could not find a free permitted port on the machine.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.Enumeration;
Expand All @@ -41,9 +40,9 @@
* Utilities to determine the network interface and address that should be used to bind the
* TaskManager communication to.
*/
public class NetUtils {
public class ConnectionUtils {

private static final Logger LOG = LoggerFactory.getLogger(NetUtils.class);
private static final Logger LOG = LoggerFactory.getLogger(ConnectionUtils.class);

private static final long MIN_SLEEP_TIME = 50;
private static final long MAX_SLEEP_TIME = 20000;
Expand Down Expand Up @@ -282,8 +281,7 @@ private static boolean tryToConnect(InetAddress fromAddress, SocketAddress toSoc
LOG.debug("Trying to connect to (" + toSocket + ") from local address " + fromAddress
+ " with timeout " + timeout);
}
Socket socket = new Socket();
try {
try (Socket socket = new Socket()) {
// port 0 = let the OS choose the port
SocketAddress bindP = new InetSocketAddress(fromAddress, 0);
// machine
Expand All @@ -300,43 +298,6 @@ private static boolean tryToConnect(InetAddress fromAddress, SocketAddress toSoc
}
return false;
}
finally {
socket.close();
}
}

/**
* Find a non-occupied port.
*
* @return A non-occupied port.
*/
public static int getAvailablePort() {
for (int i = 0; i < 50; i++) {
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(0);
int port = serverSocket.getLocalPort();
if (port != 0) {
return port;
}
}
catch (IOException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Unable to allocate port " + e.getMessage(), e);
}
}
finally {
if (serverSocket != null) {
try {
serverSocket.close();
} catch (Throwable t) {
// ignored
}
}
}
}

throw new RuntimeException("Could not find a free permitted port on the machine.");
}

public static class LeaderConnectingAddressListener implements LeaderRetrievalListener {
Expand Down Expand Up @@ -408,7 +369,7 @@ public InetAddress findConnectingAddress(
}

do {
InetAddress address = NetUtils.findAddressUsingStrategy(strategy, targetAddress, logging);
InetAddress address = ConnectionUtils.findAddressUsingStrategy(strategy, targetAddress, logging);
if (address != null) {
return address;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.runtime.net.ConnectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
Expand Down Expand Up @@ -149,7 +149,7 @@ public static LeaderConnectionInfo retrieveLeaderConnectionInfo(
public static InetAddress findConnectingAddress(
LeaderRetrievalService leaderRetrievalService,
FiniteDuration timeout) throws LeaderRetrievalException {
NetUtils.LeaderConnectingAddressListener listener = new NetUtils.LeaderConnectingAddressListener();
ConnectionUtils.LeaderConnectingAddressListener listener = new ConnectionUtils.LeaderConnectingAddressListener();

try {
leaderRetrievalService.start(listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ import org.apache.flink.runtime.memory.MemoryManager
import org.apache.flink.runtime.messages.Messages._
import org.apache.flink.runtime.messages.RegistrationMessages._
import org.apache.flink.runtime.messages.TaskManagerMessages._
import org.apache.flink.runtime.net.NetUtils
import org.apache.flink.util.NetUtils
import org.apache.flink.runtime.process.ProcessReaper
import org.apache.flink.runtime.security.SecurityUtils
import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.NetUtils;

import org.junit.Test;

import scala.Some;
import scala.Tuple2;
import scala.concurrent.duration.FiniteDuration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
package org.apache.flink.runtime.io.network.netty;

import io.netty.channel.Channel;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.util.NetUtils;

import scala.Tuple2;

import java.net.InetAddress;
Expand Down Expand Up @@ -48,10 +50,7 @@ static NettyServer initServer(NettyConfig config, NettyProtocol protocol) throws
server.init(protocol);
}
catch (Exception e) {
if (server != null) {
server.shutdown();
}

server.shutdown();
throw e;
}

Expand All @@ -65,10 +64,7 @@ static NettyClient initClient(NettyConfig config, NettyProtocol protocol) throws
client.init(protocol);
}
catch (Exception e) {
if (client != null) {
client.shutdown();
}

client.shutdown();
throw e;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.util.NetUtils;

import org.apache.flink.util.OperatingSystem;
import org.junit.After;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.util.NetUtils;

import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

import scala.Tuple2;
import scala.concurrent.Await;
import scala.concurrent.Future;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@
/**
* Tests for the network utilities.
*/
public class NetUtilsTest {
public class ConnectionUtilsTest {

@Test
public void testFindConnectableAddress() {
int unusedPort;
try {
unusedPort = NetUtils.getAvailablePort();
unusedPort = org.apache.flink.util.NetUtils.getAvailablePort();
}
catch (Throwable t) {
// if this system cannot find an available port,
Expand All @@ -47,7 +47,7 @@ public void testFindConnectableAddress() {
InetSocketAddress unreachable = new InetSocketAddress("localhost", unusedPort);

final long start = System.currentTimeMillis();
InetAddress add = NetUtils.findConnectingAddress(unreachable, 2000, 400);
InetAddress add = ConnectionUtils.findConnectingAddress(unreachable, 2000, 400);

// check that it did not take forever
assertTrue(System.currentTimeMillis() - start < (OperatingSystem.isWindows() ? 30000 : 8000));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.akka.AkkaUtils;

import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.util.NetUtils;

import org.junit.Test;

import scala.Some;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@
import org.apache.flink.runtime.messages.TaskMessages.PartitionState;
import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;

import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.NetUtils;

import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}

import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.net.NetUtils
import org.apache.flink.util.NetUtils
import org.junit.Assert._
import org.junit.Test

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,9 @@ import org.apache.flink.runtime.StreamingMode
import org.apache.flink.runtime.jobmanager.JobManager
import org.apache.flink.runtime.leaderelection.LeaderElectionService
import org.apache.flink.runtime.minicluster.FlinkMiniCluster
import org.apache.flink.runtime.net.NetUtils
import org.apache.flink.util.NetUtils
import org.apache.flink.runtime.taskmanager.TaskManager
import org.apache.flink.runtime.testingUtils.TestingMessages.Alive
import org.apache.flink.runtime.webmonitor.WebMonitor

import scala.concurrent.{Await, Future}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.internals.ZooKeeperStringSerializer;
import org.apache.flink.streaming.connectors.kafka.testutils.SuccessException;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TestLogger;

import org.apache.kafka.common.PartitionInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.util.NetUtils;

import org.junit.Assert;

import java.io.BufferedReader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

package org.apache.flink.streaming.util;

import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.util.NetUtils;

import org.junit.Assert;

import java.io.PrintWriter;
Expand Down
Loading

0 comments on commit a3c0b44

Please sign in to comment.