Skip to content

Commit

Permalink
[FLINK-8465] [flip6] Retrieve correct leader component address in Clu…
Browse files Browse the repository at this point in the history
…sterClient

Rename ClusterClient#getJobManagerAddress into #getClusterConnectionInfo. The
returned LeaderConnectionInfo contains the address of the leading cluster
component. In the old code this is the JobManager whereas in Flip-6 it is the
Dispatcher.

This closes apache#5321.
  • Loading branch information
tillrohrmann committed Jan 29, 2018
1 parent d2211ed commit f53f846
Show file tree
Hide file tree
Showing 11 changed files with 162 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@
import javax.annotation.Nullable;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
Expand Down Expand Up @@ -294,20 +293,15 @@ public boolean getPrintStatusDuringExecution() {
}

/**
* Gets the current JobManager address (may change in case of a HA setup).
* @return The address (host and port) of the leading JobManager
* Gets the current cluster connection info (may change in case of a HA setup).
*
* @return The the connection info to the leader component of the cluster
* @throws LeaderRetrievalException if the leader could not be retrieved
*/
public InetSocketAddress getJobManagerAddress() {
try {
LeaderConnectionInfo leaderConnectionInfo =
LeaderRetrievalUtils.retrieveLeaderConnectionInfo(
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
timeout);

return AkkaUtils.getInetSocketAddressFromAkkaURL(leaderConnectionInfo.getAddress());
} catch (Exception e) {
throw new RuntimeException("Failed to retrieve JobManager address", e);
}
public LeaderConnectionInfo getClusterConnectionInfo() throws LeaderRetrievalException {
return LeaderRetrievalUtils.retrieveLeaderConnectionInfo(
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
timeout);
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
import org.apache.flink.client.deployment.StandaloneClusterId;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatus;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;

import java.net.InetSocketAddress;
import java.net.URL;
import java.util.Collections;
import java.util.List;
Expand All @@ -54,7 +56,15 @@ public void waitForClusterToBeReady() {}

@Override
public String getWebInterfaceURL() {
String host = getJobManagerAddress().getHostString();
final InetSocketAddress inetSocketAddressFromAkkaURL;

try {
inetSocketAddressFromAkkaURL = AkkaUtils.getInetSocketAddressFromAkkaURL(getClusterConnectionInfo().getAddress());
} catch (Exception e) {
throw new RuntimeException("Could not retrieve leader retrieval information.", e);
}

String host = inetSocketAddressFromAkkaURL.getHostName();
int port = getFlinkConfiguration().getInteger(WebOptions.PORT);
return "http://" + host + ":" + port;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
Expand Down Expand Up @@ -70,6 +71,8 @@
import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
import org.apache.flink.runtime.rest.util.RestClientException;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.LeaderConnectionInfo;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
Expand Down Expand Up @@ -365,6 +368,13 @@ public T getClusterId() {
return clusterId;
}

@Override
public LeaderConnectionInfo getClusterConnectionInfo() throws LeaderRetrievalException {
return LeaderRetrievalUtils.retrieveLeaderConnectionInfo(
highAvailabilityServices.getDispatcherLeaderRetriever(),
timeout);
}

/**
* Creates a {@code CompletableFuture} that polls a {@code AsynchronouslyCreatedResource} until
* its {@link AsynchronouslyCreatedResource#queueStatus() QueueStatus} becomes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,16 @@
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.util.LeaderConnectionInfo;
import org.apache.flink.util.TestLogger;

import org.apache.commons.cli.CommandLine;
import org.junit.Assert;
import org.hamcrest.Matchers;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.net.InetSocketAddress;
import static org.junit.Assert.assertThat;

/**
* Tests for the {@link DefaultCLI}.
Expand Down Expand Up @@ -60,13 +61,14 @@ public void testConfigurationPassing() throws Exception {

CommandLine commandLine = defaultCLI.parseCommandLineOptions(args, false);

final InetSocketAddress expectedAddress = new InetSocketAddress(localhost, port);

final StandaloneClusterDescriptor clusterDescriptor = defaultCLI.createClusterDescriptor(commandLine);

final ClusterClient<?> clusterClient = clusterDescriptor.retrieve(defaultCLI.getClusterId(commandLine));

Assert.assertEquals(expectedAddress, clusterClient.getJobManagerAddress());
final LeaderConnectionInfo clusterConnectionInfo = clusterClient.getClusterConnectionInfo();

assertThat(clusterConnectionInfo.getHostname(), Matchers.equalTo(localhost));
assertThat(clusterConnectionInfo.getPort(), Matchers.equalTo(port));
}

/**
Expand All @@ -93,9 +95,10 @@ public void testManualConfigurationOverride() throws Exception {

final ClusterClient<?> clusterClient = clusterDescriptor.retrieve(defaultCLI.getClusterId(commandLine));

final InetSocketAddress expectedAddress = new InetSocketAddress(manualHostname, manualPort);
final LeaderConnectionInfo clusterConnectionInfo = clusterClient.getClusterConnectionInfo();

Assert.assertEquals(expectedAddress, clusterClient.getJobManagerAddress());
assertThat(clusterConnectionInfo.getHostname(), Matchers.equalTo(manualHostname));
assertThat(clusterConnectionInfo.getPort(), Matchers.equalTo(manualPort));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

package org.apache.flink.runtime.leaderretrieval;

import org.apache.flink.util.FlinkException;

/**
* This exception is thrown by the {@link org.apache.flink.runtime.util.LeaderRetrievalUtils} when
* the method retrieveLeaderGateway fails to retrieve the current leader's gateway.
*/
public class LeaderRetrievalException extends Exception {
public class LeaderRetrievalException extends FlinkException {

private static final long serialVersionUID = 42;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@

package org.apache.flink.runtime.util;

import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.util.FlinkException;

import akka.actor.Address;

import java.net.MalformedURLException;
import java.util.UUID;

/**
Expand All @@ -29,9 +35,34 @@ public class LeaderConnectionInfo {

private final UUID leaderSessionID;

public LeaderConnectionInfo(String address, UUID leaderSessionID) {
private final String hostname;

private final int port;

public LeaderConnectionInfo(String address, UUID leaderSessionID) throws FlinkException {
this.address = address;
this.leaderSessionID = leaderSessionID;

final Address akkaAddress;
// this only works as long as the address is Akka based
try {
akkaAddress = AkkaUtils.getAddressFromAkkaURL(address);
} catch (MalformedURLException e) {
throw new FlinkException("Could not extract the hostname from the given address \'" +
address + "\'.", e);
}

if (akkaAddress.host().isDefined()) {
hostname = akkaAddress.host().get();
} else {
hostname = "localhost";
}

if (akkaAddress.port().isDefined()) {
port = (int) akkaAddress.port().get();
} else {
port = -1;
}
}

public String getAddress() {
Expand All @@ -41,4 +72,20 @@ public String getAddress() {
public UUID getLeaderSessionID() {
return leaderSessionID;
}

public String getHostname() {
return hostname;
}

public int getPort() {
return port;
}

@Override
public String toString() {
return "LeaderConnectionInfo{" +
"address='" + address + '\'' +
", leaderSessionID=" + leaderSessionID +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@

package org.apache.flink.runtime.util;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.dispatch.Mapper;
import akka.dispatch.OnComplete;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
Expand All @@ -34,16 +30,23 @@
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.net.ConnectionUtils;
import org.apache.flink.util.FlinkException;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.dispatch.Mapper;
import akka.dispatch.OnComplete;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetAddress;
import java.util.UUID;

import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.duration.FiniteDuration;

import java.net.InetAddress;
import java.util.UUID;

/**
* Utility class to work with {@link LeaderRetrievalService} class.
*/
Expand Down Expand Up @@ -241,8 +244,14 @@ public Future<LeaderConnectionInfo> getLeaderConnectionInfoFuture() {

@Override
public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
if(leaderAddress != null && !leaderAddress.equals("") && !connectionInfo.isCompleted()) {
connectionInfo.success(new LeaderConnectionInfo(leaderAddress, leaderSessionID));
if (leaderAddress != null && !leaderAddress.equals("") && !connectionInfo.isCompleted()) {
try {
final LeaderConnectionInfo leaderConnectionInfo = new LeaderConnectionInfo(leaderAddress, leaderSessionID);
connectionInfo.success(leaderConnectionInfo);
} catch (FlinkException e) {
connectionInfo.failure(e);
}

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -697,25 +697,32 @@ object AkkaUtils {
def getInetSocketAddressFromAkkaURL(akkaURL: String): InetSocketAddress = {
// AkkaURLs have the form schema://systemName@host:port/.... if it's a remote Akka URL
try {
// we need to manually strip the protocol, because "akka.tcp" is not
// a valid protocol for Java's URL class
val protocolonPos = akkaURL.indexOf("://")
if (protocolonPos == -1 || protocolonPos >= akkaURL.length - 4) {
throw new MalformedURLException()
}

val url = new URL("http://" + akkaURL.substring(protocolonPos + 3))
if (url.getHost == null || url.getPort == -1) {
throw new MalformedURLException()
val address = getAddressFromAkkaURL(akkaURL)

(address.host, address.port) match {
case (Some(hostname), Some(portValue)) => new InetSocketAddress(hostname, portValue)
case _ => throw new MalformedURLException()
}
new InetSocketAddress(url.getHost, url.getPort)
}
catch {
case _ : MalformedURLException =>
throw new Exception(s"Could not retrieve InetSocketAddress from Akka URL $akkaURL")
}
}

/**
* Extracts the [[Address]] from the given akka URL.
*
* @param akkaURL to extract the [[Address]] from
* @throws java.net.MalformedURLException if the [[Address]] could not be parsed from
* the given akka URL
* @return Extracted [[Address]] from the given akka URL
*/
@throws(classOf[MalformedURLException])
def getAddressFromAkkaURL(akkaURL: String): Address = {
AddressFromURIString(akkaURL)
}

def formatDurationParingErrorMessage: String = {
"Duration format must be \"val unit\", where 'val' is a number and 'unit' is " +
"(d|day)|(h|hour)|(min|minute)|s|sec|second)|(ms|milli|millisecond)|" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ package org.apache.flink.api.scala

import java.io._

import org.apache.flink.client.cli.{CliFrontend, CliFrontendParser, RunOptions}
import org.apache.flink.client.deployment.{ClusterDescriptor, StandaloneClusterId}
import org.apache.flink.client.cli.{CliFrontend, CliFrontendParser}
import org.apache.flink.client.deployment.ClusterDescriptor
import org.apache.flink.client.program.ClusterClient
import org.apache.flink.configuration.{Configuration, GlobalConfiguration, JobManagerOptions}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.minicluster.StandaloneMiniCluster

import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -270,8 +271,11 @@ object FlinkShell {

val cluster = clusterDescriptor.deploySessionCluster(clusterSpecification)

val address = cluster.getJobManagerAddress.getAddress.getHostAddress
val port = cluster.getJobManagerAddress.getPort
val inetSocketAddress = AkkaUtils.getInetSocketAddressFromAkkaURL(
cluster.getClusterConnectionInfo.getAddress)

val address = inetSocketAddress.getAddress.getHostAddress
val port = inetSocketAddress.getPort

(address, port, Some(Right(cluster)))
}
Expand Down Expand Up @@ -307,7 +311,8 @@ object FlinkShell {
throw new RuntimeException("Yarn Cluster could not be retrieved.")
}

val jobManager = cluster.getJobManagerAddress
val jobManager = AkkaUtils.getInetSocketAddressFromAkkaURL(
cluster.getClusterConnectionInfo.getAddress)

(jobManager.getHostString, jobManager.getPort, None)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ public void testJavaAPI() throws Exception {
}

// use the cluster
Assert.assertNotNull(yarnCluster.getJobManagerAddress());
Assert.assertNotNull(yarnCluster.getClusterConnectionInfo());
Assert.assertNotNull(yarnCluster.getWebInterfaceURL());

LOG.info("Shutting down cluster. All tests passed");
Expand Down
Loading

0 comments on commit f53f846

Please sign in to comment.