Skip to content

Commit

Permalink
PyFlink remote execution should support URLs with paths and https scheme
Browse files Browse the repository at this point in the history
  • Loading branch information
Elkhan Dadashov authored and tweise committed Sep 21, 2023
1 parent 4f09bbb commit 5682472
Show file tree
Hide file tree
Showing 9 changed files with 170 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@
<td>String</td>
<td>The port that the server binds itself. Accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple Rest servers are running on the same machine.</td>
</tr>
<tr>
<td><h5>rest.path</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The path that should be used by clients to interact to the server which is accessible via URL.</td>
</tr>
<tr>
<td><h5>rest.port</h5></td>
<td style="word-wrap: break-word;">8081</td>
Expand Down
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/rest_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@
<td>Long</td>
<td>The maximum time in ms for a connection to stay idle before failing.</td>
</tr>
<tr>
<td><h5>rest.path</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The path that should be used by clients to interact to the server which is accessible via URL.</td>
</tr>
<tr>
<td><h5>rest.port</h5></td>
<td style="word-wrap: break-word;">8081</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.flink.client.deployment.executors.RemoteExecutor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.NetUtils;

Expand All @@ -29,6 +31,7 @@
import org.apache.commons.cli.Options;

import java.net.InetSocketAddress;
import java.net.URL;

import static org.apache.flink.client.cli.CliFrontend.setJobManagerAddressInConfig;

Expand Down Expand Up @@ -60,6 +63,11 @@ public Configuration toConfiguration(CommandLine commandLine) throws FlinkExcept
String addressWithPort = commandLine.getOptionValue(addressOption.getOpt());
InetSocketAddress jobManagerAddress = NetUtils.parseHostPortAddress(addressWithPort);
setJobManagerAddressInConfig(resultingConfiguration, jobManagerAddress);

URL url = NetUtils.getCorrectHostnamePort(addressWithPort);
resultingConfiguration.setString(RestOptions.PATH, url.getPath());
resultingConfiguration.setBoolean(
SecurityOptions.SSL_REST_ENABLED, isHttpsProtocol(url));
}
resultingConfiguration.setString(DeploymentOptions.TARGET, RemoteExecutor.NAME);

Expand All @@ -68,6 +76,10 @@ public Configuration toConfiguration(CommandLine commandLine) throws FlinkExcept
return resultingConfiguration;
}

private static boolean isHttpsProtocol(URL url) {
return url.getProtocol() != null && (url.getProtocol().equalsIgnoreCase("https"));
}

@Override
public String getId() {
return ID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,15 @@
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy;
import org.apache.flink.client.program.rest.retry.WaitStrategy;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.client.JobStatusMessage;
Expand All @@ -48,11 +53,13 @@
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.rest.FileUpload;
import org.apache.flink.runtime.rest.HttpHeader;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo;
import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterOverviewWithVersion;
import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
import org.apache.flink.runtime.rest.messages.CustomHeadersDecorator;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
Expand Down Expand Up @@ -192,6 +199,10 @@ public class RestClusterClient<T> implements ClusterClient<T> {
ExceptionUtils.findThrowable(exception, JobStateUnknownException.class)
.isPresent();

private final URL jobmanagerUrl;

private final Collection<HttpHeader> customHttpHeaders;

public RestClusterClient(Configuration config, T clusterId) throws Exception {
this(config, clusterId, DefaultClientHighAvailabilityServicesFactory.INSTANCE);
}
Expand Down Expand Up @@ -229,10 +240,20 @@ private RestClusterClient(
this.restClusterClientConfiguration =
RestClusterClientConfiguration.fromConfiguration(configuration);

this.customHttpHeaders =
ClientUtils.readHeadersFromEnvironmentVariable(
ConfigConstants.FLINK_REST_CLIENT_HEADERS);
jobmanagerUrl =
new URL(
SecurityOptions.isRestSSLEnabled(configuration) ? "https" : "http",
configuration.getString(JobManagerOptions.ADDRESS),
configuration.getInteger(JobManagerOptions.PORT),
configuration.getString(RestOptions.PATH));

if (restClient != null) {
this.restClient = restClient;
} else {
this.restClient = new RestClient(configuration, executorService);
this.restClient = RestClient.forUrl(configuration, executorService, jobmanagerUrl);
}

this.waitStrategy = checkNotNull(waitStrategy);
Expand Down Expand Up @@ -828,6 +849,16 @@ public CompletableFuture<Acknowledge> updateJobResourceRequirements(
.thenApply(ignored -> Acknowledge.get());
}

@VisibleForTesting
URL getJobmanagerUrl() {
return jobmanagerUrl;
}

@VisibleForTesting
Collection<HttpHeader> getCustomHttpHeaders() {
return customHttpHeaders;
}

/**
* Get an overview of the Flink cluster.
*
Expand Down Expand Up @@ -980,6 +1011,12 @@ CompletableFuture<P> sendRetriableRequest(
.thenCompose(
webMonitorBaseUrl -> {
try {
CustomHeadersDecorator<R, P, U> headers =
new CustomHeadersDecorator<>(
new UrlPrefixDecorator<>(
messageHeaders,
jobmanagerUrl.getPath()));
headers.setCustomHeaders(customHttpHeaders);
final CompletableFuture<P> future =
restClient.sendRequest(
webMonitorBaseUrl.getHost(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.SecurityOptions;

import org.apache.commons.cli.CommandLine;
import org.junit.jupiter.api.Test;
Expand All @@ -36,8 +37,9 @@ class DefaultCLITest {
@Test
void testCommandLineMaterialization() throws Exception {
final String hostname = "home-sweet-home";
final String urlPath = "/some/other/path/index.html";
final int port = 1234;
final String[] args = {"-m", hostname + ':' + port};
final String[] args = {"-m", hostname + ':' + port + urlPath};

final AbstractCustomCommandLine defaultCLI = new DefaultCLI();
final CommandLine commandLine = defaultCLI.parseCommandLineOptions(args, false);
Expand All @@ -46,6 +48,34 @@ void testCommandLineMaterialization() throws Exception {

assertThat(configuration.get(RestOptions.ADDRESS)).isEqualTo(hostname);
assertThat(configuration.get(RestOptions.PORT)).isEqualTo(port);

final String httpProtocol = "http";
assertThat(configuration.get(SecurityOptions.SSL_REST_ENABLED)).isEqualTo(false);
assertThat(configuration.get(RestOptions.PATH)).isEqualTo(urlPath);

final String hostnameWithHttpScheme = httpProtocol + "://" + hostname;
final String[] httpArgs = {"-m", hostnameWithHttpScheme + ':' + port + urlPath};
final CommandLine httpCommandLine = defaultCLI.parseCommandLineOptions(httpArgs, false);

Configuration httpConfiguration = defaultCLI.toConfiguration(httpCommandLine);

assertThat(httpConfiguration.get(RestOptions.ADDRESS)).isEqualTo(hostname);
assertThat(httpConfiguration.get(RestOptions.PORT)).isEqualTo(port);
assertThat(httpConfiguration.get(SecurityOptions.SSL_REST_ENABLED)).isEqualTo(false);
assertThat(httpConfiguration.get(RestOptions.PATH)).isEqualTo(urlPath);

final String httpsProtocol = "https";

final String hostnameWithHttpsScheme = httpsProtocol + "://" + hostname;
final String[] httpsArgs = {"-m", hostnameWithHttpsScheme + ':' + port + urlPath};
final CommandLine httpsCommandLine = defaultCLI.parseCommandLineOptions(httpsArgs, false);

Configuration httpsConfiguration = defaultCLI.toConfiguration(httpsCommandLine);

assertThat(httpsConfiguration.get(RestOptions.ADDRESS)).isEqualTo(hostname);
assertThat(httpsConfiguration.get(RestOptions.PORT)).isEqualTo(port);
assertThat(httpsConfiguration.get(SecurityOptions.SSL_REST_ENABLED)).isEqualTo(true);
assertThat(httpsConfiguration.get(RestOptions.PATH)).isEqualTo(urlPath);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.client.cli.CliArgsException;
import org.apache.flink.client.cli.DefaultCLI;
import org.apache.flink.client.deployment.ClusterClientFactory;
import org.apache.flink.client.deployment.ClusterClientServiceLoader;
Expand Down Expand Up @@ -50,6 +51,7 @@
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.rest.FileUpload;
import org.apache.flink.runtime.rest.HttpHeader;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
Expand Down Expand Up @@ -688,21 +690,59 @@ void testGetAccumulators() throws Exception {
/** Tests that command line options override the configuration settings. */
@Test
void testRESTManualConfigurationOverride() throws Exception {
final String configuredHostname = "localhost";
final int configuredPort = 1234;
final Configuration configuration = new Configuration();

configuration.setString(JobManagerOptions.ADDRESS, configuredHostname);
configuration.setInteger(JobManagerOptions.PORT, configuredPort);
configuration.setString(RestOptions.ADDRESS, configuredHostname);
configuration.setInteger(RestOptions.PORT, configuredPort);

final DefaultCLI defaultCLI = new DefaultCLI();

final String manualHostname = "123.123.123.123";
final int manualPort = 4321;
final String httpProtocol = "http";
final String[] args = {"-m", manualHostname + ':' + manualPort};

final RestClusterClient<?> clusterClient = getRestClusterClient(args);

URL webMonitorBaseUrl = clusterClient.getWebMonitorBaseUrl().get();
assertThat(webMonitorBaseUrl).hasHost(manualHostname).hasPort(manualPort);
assertThat(clusterClient.getJobmanagerUrl())
.hasHost(manualHostname)
.hasPort(manualPort)
.hasNoPath()
.hasProtocol(httpProtocol);
assertThat(clusterClient.getCustomHttpHeaders()).isEmpty();

final String urlPath = "/some/path/here/index.html";
final String httpsProtocol = "https";
final String[] httpsUrlArgs = {
"-m", httpsProtocol + "://" + manualHostname + ':' + manualPort + urlPath
};

final Map<String, String> envMap =
Collections.singletonMap(
ConfigConstants.FLINK_REST_CLIENT_HEADERS,
"Cookie:authCookie=12:345\nCustomHeader:value1,value2\nMalformedHeaderSkipped");
org.apache.flink.core.testutils.CommonTestUtils.setEnv(envMap);

final RestClusterClient<?> newClusterClient = getRestClusterClient(httpsUrlArgs);
assertThat(newClusterClient.getWebMonitorBaseUrl().get())
.hasHost(manualHostname)
.hasPort(manualPort);

final URL jobManagerUrl = newClusterClient.getJobmanagerUrl();
assertThat(jobManagerUrl)
.hasHost(manualHostname)
.hasPort(manualPort)
.hasPath(urlPath)
.hasProtocol(httpsProtocol);

final List<HttpHeader> customHttpHeaders =
new ArrayList<>(newClusterClient.getCustomHttpHeaders());
final HttpHeader expectedHeader1 = new HttpHeader("Cookie", "authCookie=12:345");
final HttpHeader expectedHeader2 = new HttpHeader("CustomHeader", "value1,value2");
assertThat(customHttpHeaders).hasSize(2);
assertThat(customHttpHeaders.get(0)).isEqualTo(expectedHeader1);
assertThat(customHttpHeaders.get(1)).isEqualTo(expectedHeader2);
}

private static RestClusterClient<?> getRestClusterClient(String[] args)
throws CliArgsException, FlinkException {
final DefaultCLI defaultCLI = new DefaultCLI();

CommandLine commandLine = defaultCLI.parseCommandLineOptions(args, false);

final ClusterClientServiceLoader serviceLoader = new DefaultClusterClientServiceLoader();
Expand All @@ -719,9 +759,7 @@ void testRESTManualConfigurationOverride() throws Exception {
clusterDescriptor
.retrieve(clusterFactory.getClusterId(executorConfig))
.getClusterClient();

URL webMonitorBaseUrl = clusterClient.getWebMonitorBaseUrl().get();
assertThat(webMonitorBaseUrl).hasHost(manualHostname).hasPort(manualPort);
return clusterClient;
}

/** Tests that the send operation is being retried. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,15 @@ public class RestOptions {
.withDescription(
"The address that should be used by clients to connect to the server. Attention: This option is respected only if the high-availability configuration is NONE.");

/** The path that should be used by clients to interact with the server. */
@Documentation.Section(Documentation.Sections.COMMON_HOST_PORT)
public static final ConfigOption<String> PATH =
key("rest.path")
.stringType()
.defaultValue("")
.withDescription(
"The path that should be used by clients to interact to the server which is accessible via URL.");

/**
* The port that the REST client connects to and the REST server binds to if {@link #BIND_PORT}
* has not been specified.
Expand Down
9 changes: 8 additions & 1 deletion flink-core/src/main/java/org/apache/flink/util/NetUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,15 @@ public static InetSocketAddress parseHostPortAddress(String hostPort) {
* @return URL object for accessing host and port
*/
private static URL validateHostPortString(String hostPort) {
if (StringUtils.isNullOrWhitespaceOnly(hostPort)) {
throw new IllegalArgumentException("hostPort should not be null or empty");
}
try {
URL u = new URL("http://" + hostPort);
URL u =
(hostPort.toLowerCase().startsWith("http://")
|| hostPort.toLowerCase().startsWith("https://"))
? new URL(hostPort)
: new URL("http://" + hostPort);
if (u.getHost() == null) {
throw new IllegalArgumentException(
"The given host:port ('" + hostPort + "') doesn't contain a valid host");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ public void testCorrectHostnamePort() throws Exception {
assertEquals(url, NetUtils.getCorrectHostnamePort("foo.com:8080/index.html"));
}

@Test
public void testCorrectHostnamePortWithHttpsScheme() throws Exception {
final URL url = new URL("https", "foo.com", 8080, "/some/other/path/index.html");
assertEquals(
url,
NetUtils.getCorrectHostnamePort("https://foo.com:8080/some/other/path/index.html"));
}

@Test
public void testParseHostPortAddress() {
final InetSocketAddress socketAddress = new InetSocketAddress("foo.com", 8080);
Expand Down

0 comments on commit 5682472

Please sign in to comment.