Skip to content

Commit

Permalink
[pulsar-broker] Perform auto cert refresh for Pulsar-admin (apache#8831)
Browse files Browse the repository at this point in the history
### Motivation

We are frequently getting 500 on `pulsar-admin topics list <ns>` cli command. It happens because  `pulsar-admin topics` rest-api internally uses `pulsar-admin` to get list of non-persistent topics. `PulsarAdmin-HttpClient` crates persistent connection but it doesn't perform auto-cert refresh so, if cert is expired and reconnection happens then broker always gets 500 when it uses `pulsar-admin` internally due to invalid certs.
```
21:09:16.025 [AsyncHttpClient-48-9] ERROR org.apache.pulsar.broker.admin.v1.NonPersistentTopics - [role] Failed to get list of topics under namespace prop/cluster/ns
java.util.concurrent.ExecutionException: org.apache.pulsar.client.admin.PulsarAdminException: java.net.ConnectException: error:10000416:SSL routines:OPENSSL_internal:SSLV3_ALERT_CERTIFICATE_UNKNOWN
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) ~[?:?]
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) ~[?:?]
        at org.apache.pulsar.broker.admin.v1.NonPersistentTopics.lambda$getList$0(NonPersistentTopics.java:211) ~[pulsar-broker.jar:]
        at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930) ~[?:?]
        at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907) ~[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
        at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
        at org.apache.pulsar.client.admin.internal.NonPersistentTopicsImpl$4.failed(NonPersistentTopicsImpl.java:215) ~[pulsar-client-admin-original.jar:]
        at org.glassfish.jersey.client.JerseyInvocation$4.failed(JerseyInvocation.java:1030) ~[jersey-client-2.27.jar:?]
        at org.glassfish.jersey.client.ClientRuntime.processFailure(ClientRuntime.java:231) ~[jersey-client-2.27.jar:?]
        at org.glassfish.jersey.client.ClientRuntime.access$100(ClientRuntime.java:85) ~[jersey-client-2.27.jar:?]
        at org.glassfish.jersey.client.ClientRuntime$2.lambda$failure$1(ClientRuntime.java:183) ~[jersey-client-2.27.jar:?]
        at org.glassfish.jersey.internal.Errors$1.call(Errors.java:272) [jersey-common-2.27.jar:?]
        at org.glassfish.jersey.internal.Errors$1.call(Errors.java:268) [jersey-common-2.27.jar:?]
        at org.glassfish.jersey.internal.Errors.process(Errors.java:316) [jersey-common-2.27.jar:?]
        at org.glassfish.jersey.internal.Errors.process(Errors.java:298) [jersey-common-2.27.jar:?]
        at org.glassfish.jersey.internal.Errors.process(Errors.java:268) [jersey-common-2.27.jar:?]
```

### Modification
Add Capability in HttpClient to perform auto-cert refresh to avoid any tls handshake failure.
  • Loading branch information
rdhabalia authored Dec 23, 2020
1 parent 1ed6af5 commit 98bf97e
Show file tree
Hide file tree
Showing 12 changed files with 446 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,15 @@

import com.google.common.collect.ImmutableSet;

import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.security.cert.X509Certificate;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import javax.net.ssl.SSLContext;
import javax.ws.rs.NotAuthorizedException;
Expand All @@ -34,6 +40,7 @@

import lombok.extern.slf4j.Slf4j;

import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
Expand Down Expand Up @@ -371,4 +378,57 @@ public void testDeleteNamespace() throws Exception {
admin.namespaces().deleteNamespace("tenant1/ns1");
}
}

/**
* Validates Pulsar-admin performs auto cert refresh.
* @throws Exception
*/
@Test
public void testCertRefreshForPulsarAdmin() throws Exception {
String adminUser = "admin";
String user2 = "user1";
File keyFile = new File(getTLSFile("temp" + ".key-pk8"));
Path keyFilePath = Paths.get(keyFile.getAbsolutePath());
int autoCertRefreshTimeSec = 1;
try {
Files.copy(Paths.get(getTLSFile(user2 + ".key-pk8")), keyFilePath, StandardCopyOption.REPLACE_EXISTING);
PulsarAdmin admin = PulsarAdmin.builder()
.allowTlsInsecureConnection(false)
.enableTlsHostnameVerification(false)
.serviceHttpUrl(brokerUrlTls.toString())
.autoCertRefreshTime(1, TimeUnit.SECONDS)
.authentication("org.apache.pulsar.client.impl.auth.AuthenticationTls",
String.format("tlsCertFile:%s,tlsKeyFile:%s",
getTLSFile(adminUser + ".cert"), keyFile))
.tlsTrustCertsFilePath(getTLSFile("ca.cert")).build();
// try to call admin-api which should fail due to incorrect key-cert
try {
admin.tenants().createTenant("tenantX",
new TenantInfo(ImmutableSet.of("foobar"), ImmutableSet.of("test")));
Assert.fail("should have failed due to invalid key file");
} catch (Exception e) {
//OK
}
// replace correct key file
Files.delete(keyFile.toPath());
Thread.sleep(2 * autoCertRefreshTimeSec * 1000);
Files.copy(Paths.get(getTLSFile(adminUser + ".key-pk8")), keyFilePath);
MutableBoolean success = new MutableBoolean(false);
retryStrategically((test) -> {
try {
admin.tenants().createTenant("tenantX",
new TenantInfo(ImmutableSet.of("foobar"), ImmutableSet.of("test")));
success.setValue(true);
return true;
}catch(Exception e) {
return false;
}
}, 5, 1000);
Assert.assertTrue(success.booleanValue());
Assert.assertEquals(ImmutableSet.of("tenantX"), admin.tenants().getTenants());
admin.close();
}finally {
Files.delete(keyFile.toPath());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public class PulsarAdmin implements Closeable {
public static final int DEFAULT_CONNECT_TIMEOUT_SECONDS = 60;
public static final int DEFAULT_READ_TIMEOUT_SECONDS = 60;
public static final int DEFAULT_REQUEST_TIMEOUT_SECONDS = 300;
public static final int DEFAULT_CERT_REFRESH_SECONDS = 300;

private final Clusters clusters;
private final Brokers brokers;
Expand Down Expand Up @@ -133,9 +134,8 @@ public static PulsarAdminBuilder builder() {

public PulsarAdmin(String serviceUrl, ClientConfigurationData clientConfigData) throws PulsarClientException {
this(serviceUrl, clientConfigData, DEFAULT_CONNECT_TIMEOUT_SECONDS, TimeUnit.SECONDS,
DEFAULT_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS,
DEFAULT_REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS, null);

DEFAULT_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS, DEFAULT_REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS,
DEFAULT_CERT_REFRESH_SECONDS, TimeUnit.SECONDS, null);
}

public PulsarAdmin(String serviceUrl,
Expand All @@ -146,6 +146,8 @@ public PulsarAdmin(String serviceUrl,
TimeUnit readTimeoutUnit,
int requestTimeout,
TimeUnit requestTimeoutUnit,
int autoCertRefreshTime,
TimeUnit autoCertRefreshTimeUnit,
ClassLoader clientBuilderClassLoader) throws PulsarClientException {
this.connectTimeout = connectTimeout;
this.connectTimeoutUnit = connectTimeoutUnit;
Expand All @@ -166,7 +168,8 @@ public PulsarAdmin(String serviceUrl,
clientConfigData.setServiceUrl(serviceUrl);
}

AsyncHttpConnectorProvider asyncConnectorProvider = new AsyncHttpConnectorProvider(clientConfigData);
AsyncHttpConnectorProvider asyncConnectorProvider = new AsyncHttpConnectorProvider(clientConfigData,
(int) autoCertRefreshTimeUnit.toSeconds(autoCertRefreshTime));

ClientConfig httpConfig = new ClientConfig();
httpConfig.property(ClientProperties.FOLLOW_REDIRECTS, true);
Expand Down Expand Up @@ -200,7 +203,8 @@ public PulsarAdmin(String serviceUrl,
this.asyncHttpConnector = asyncConnectorProvider.getConnector(
Math.toIntExact(connectTimeoutUnit.toMillis(this.connectTimeout)),
Math.toIntExact(readTimeoutUnit.toMillis(this.readTimeout)),
Math.toIntExact(requestTimeoutUnit.toMillis(this.requestTimeout)));
Math.toIntExact(requestTimeoutUnit.toMillis(this.requestTimeout)),
(int) autoCertRefreshTimeUnit.toSeconds(autoCertRefreshTime));

long readTimeoutMs = readTimeoutUnit.toMillis(this.readTimeout);
this.clusters = new ClustersImpl(root, auth, readTimeoutMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,13 @@ PulsarAdminBuilder authentication(String authPluginClassName, Map<String, String
*/
PulsarAdminBuilder requestTimeout(int requestTimeout, TimeUnit requestTimeoutUnit);

/**
* This sets auto cert refresh time if Pulsar admin uses tls authentication.
*
* @param autoCertRefreshTime
* @param autoCertRefreshTimeUnit
*/
PulsarAdminBuilder autoCertRefreshTime(int autoCertRefreshTime, TimeUnit autoCertRefreshTimeUnit);
/**
*
* @return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,18 @@ public class PulsarAdminBuilderImpl implements PulsarAdminBuilder {
private int connectTimeout = PulsarAdmin.DEFAULT_CONNECT_TIMEOUT_SECONDS;
private int readTimeout = PulsarAdmin.DEFAULT_READ_TIMEOUT_SECONDS;
private int requestTimeout = PulsarAdmin.DEFAULT_REQUEST_TIMEOUT_SECONDS;
private int autoCertRefreshTime = PulsarAdmin.DEFAULT_CERT_REFRESH_SECONDS;
private TimeUnit connectTimeoutUnit = TimeUnit.SECONDS;
private TimeUnit readTimeoutUnit = TimeUnit.SECONDS;
private TimeUnit requestTimeoutUnit = TimeUnit.SECONDS;
private TimeUnit autoCertRefreshTimeUnit = TimeUnit.SECONDS;
private ClassLoader clientBuilderClassLoader = null;

@Override
public PulsarAdmin build() throws PulsarClientException {
return new PulsarAdmin(conf.getServiceUrl(),
conf, connectTimeout, connectTimeoutUnit, readTimeout, readTimeoutUnit,
requestTimeout, requestTimeoutUnit, clientBuilderClassLoader);
return new PulsarAdmin(conf.getServiceUrl(), conf, connectTimeout, connectTimeoutUnit, readTimeout,
readTimeoutUnit, requestTimeout, requestTimeoutUnit, autoCertRefreshTime,
autoCertRefreshTimeUnit, clientBuilderClassLoader);
}

public PulsarAdminBuilderImpl() {
Expand Down Expand Up @@ -167,6 +169,13 @@ public PulsarAdminBuilder requestTimeout(int requestTimeout, TimeUnit requestTim
return this;
}

@Override
public PulsarAdminBuilder autoCertRefreshTime(int autoCertRefreshTime, TimeUnit autoCertRefreshTimeUnit) {
this.autoCertRefreshTime = autoCertRefreshTime;
this.autoCertRefreshTimeUnit = autoCertRefreshTimeUnit;
return this;
}

@Override
public PulsarAdminBuilder setContextClassLoader(ClassLoader clientBuilderClassLoader) {
this.clientBuilderClassLoader = clientBuilderClassLoader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,18 @@ public class AsyncHttpConnector implements Connector {
private final ScheduledExecutorService delayer = Executors.newScheduledThreadPool(1,
new DefaultThreadFactory("delayer"));

public AsyncHttpConnector(Client client, ClientConfigurationData conf) {
public AsyncHttpConnector(Client client, ClientConfigurationData conf, int autoCertRefreshTimeSeconds) {
this((int) client.getConfiguration().getProperty(ClientProperties.CONNECT_TIMEOUT),
(int) client.getConfiguration().getProperty(ClientProperties.READ_TIMEOUT),
PulsarAdmin.DEFAULT_REQUEST_TIMEOUT_SECONDS * 1000,
autoCertRefreshTimeSeconds,
conf);
}

@SneakyThrows
public AsyncHttpConnector(int connectTimeoutMs, int readTimeoutMs,
int requestTimeoutMs, ClientConfigurationData conf) {
int requestTimeoutMs,
int autoCertRefreshTimeSeconds, ClientConfigurationData conf) {
DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder();
confBuilder.setFollowRedirect(true);
confBuilder.setRequestTimeout(conf.getRequestTimeoutMs());
Expand Down Expand Up @@ -136,10 +138,10 @@ public boolean keepAlive(InetSocketAddress remoteAddress, Request ahcRequest,
SslContext sslCtx = null;
if (authData.hasDataForTls()) {
sslCtx = authData.getTlsTrustStoreStream() == null
? SecurityUtility.createNettySslContextForClient(
? SecurityUtility.createAutoRefreshSslContextForClient(
conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(),
conf.getTlsTrustCertsFilePath(), authData.getTlsCertificates(),
authData.getTlsPrivateKey())
conf.getTlsTrustCertsFilePath(), authData.getTlsCerificateFilePath(),
authData.getTlsPrivateKeyFilePath(), null, autoCertRefreshTimeSeconds, delayer)
: SecurityUtility.createNettySslContextForClient(
conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(),
authData.getTlsTrustStoreStream(), authData.getTlsCertificates(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,25 @@ public class AsyncHttpConnectorProvider implements ConnectorProvider {

private final ClientConfigurationData conf;
private Connector connector;
private final int autoCertRefreshTimeSeconds;

public AsyncHttpConnectorProvider(ClientConfigurationData conf) {
public AsyncHttpConnectorProvider(ClientConfigurationData conf, int autoCertRefreshTimeSeconds) {
this.conf = conf;
this.autoCertRefreshTimeSeconds = autoCertRefreshTimeSeconds;
}

@Override
public Connector getConnector(Client client, Configuration runtimeConfig) {
if (connector == null) {
connector = new AsyncHttpConnector(client, conf);
connector = new AsyncHttpConnector(client, conf, autoCertRefreshTimeSeconds);
}
return connector;
}


public AsyncHttpConnector getConnector(int connectTimeoutMs, int readTimeoutMs, int requestTimeoutMs) {
return new AsyncHttpConnector(connectTimeoutMs, readTimeoutMs, requestTimeoutMs, conf);
public AsyncHttpConnector getConnector(int connectTimeoutMs, int readTimeoutMs, int requestTimeoutMs,
int autoCertRefreshTimeSeconds) {
return new AsyncHttpConnector(connectTimeoutMs, readTimeoutMs, requestTimeoutMs, autoCertRefreshTimeSeconds,
conf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ default Certificate[] getTlsCertificates() {
return null;
}

/**
* @return a client certificate file path
*/
default String getTlsCerificateFilePath() {
return null;
}

/**
*
* @return a private key for the client certificate, or null if the data are not available
Expand All @@ -65,6 +72,14 @@ default PrivateKey getTlsPrivateKey() {
return null;
}

/**
*
* @return a private key file path
*/
default String getTlsPrivateKeyFilePath() {
return null;
}

/**
*
* @return an input-stream of the trust store, or null if the trust-store provided at
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,5 +129,15 @@ public InputStream getTlsTrustStoreStream() {
return trustStoreStreamProvider != null ? trustStoreStreamProvider.get() : null;
}

@Override
public String getTlsCerificateFilePath() {
return certFile != null ? certFile.getFileName() : null;
}

@Override
public String getTlsPrivateKeyFilePath() {
return keyFile != null ? keyFile.getFileName() : null;
}

private static final Logger LOG = LoggerFactory.getLogger(AuthenticationDataTls.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@
import java.nio.file.Paths;
import java.nio.file.attribute.FileTime;
import lombok.Getter;
import lombok.ToString;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Class working with file's modified time.
*/
@ToString
public class FileModifiedTimeUpdater {
@Getter
String fileName;
Expand Down
Loading

0 comments on commit 98bf97e

Please sign in to comment.