Skip to content

Commit

Permalink
add basic authentication capabilities to Pulsar SQL (apache#4779)
Browse files Browse the repository at this point in the history
  • Loading branch information
jerrypeng authored and merlimat committed Jul 24, 2019
1 parent ecd7357 commit 075f28b
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 22 deletions.
20 changes: 19 additions & 1 deletion conf/presto/catalog/pulsar.properties
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,22 @@ pulsar.rewrite-namespace-delimiter=/
#pulsar.offloader-properties = \
# {"s3ManagedLedgerOffloadBucket": "offload-bucket", \
# "s3ManagedLedgerOffloadRegion": "us-west-2", \
# "s3ManagedLedgerOffloadServiceEndpoint": "http://s3.amazonaws.com"}
# "s3ManagedLedgerOffloadServiceEndpoint": "http://s3.amazonaws.com"}


####### AUTHENTICATION CONFIGS #######

## the authentication plugin to be used to authenticate to Pulsar cluster
#pulsar.auth-plugin =

## the authentication parameter to be used to authenticate to Pulsar cluster
#pulsar.auth-params =

## Accept untrusted TLS certificate
#pulsar.tls-allow-insecure-connection =

## Whether to enable hostname verification on TLS connections
#pulsar.tls-hostname-verification-enable =

## Path for the trusted TLS certificate file
#pulsar.tls-trust-cert-file-path =
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import io.airlift.configuration.Config;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.bookkeeper.stats.NullStatsProvider;
import org.apache.pulsar.common.naming.NamedEntity;
Expand All @@ -44,6 +46,11 @@ public class PulsarConnectorConfig implements AutoCloseable {
private String statsProvider = NullStatsProvider.class.getName();

private Map<String, String> statsProviderConfigs = new HashMap<>();
private String authPluginClassName;
private String authParams;
private String tlsTrustCertsFilePath;
private Boolean tlsAllowInsecureConnection;
private Boolean tlsHostnameVerificationEnable;

private boolean namespaceDelimiterRewriteEnable = false;
private String rewriteNamespaceDelimiter = "/";
Expand Down Expand Up @@ -154,6 +161,33 @@ public PulsarConnectorConfig setStatsProviderConfigs(String statsProviderConfigs
return this;
}

public String getRewriteNamespaceDelimiter() {
return rewriteNamespaceDelimiter;
}

@Config("pulsar.rewrite-namespace-delimiter")
public PulsarConnectorConfig setRewriteNamespaceDelimiter(String rewriteNamespaceDelimiter) {
Matcher m = NamedEntity.NAMED_ENTITY_PATTERN.matcher(rewriteNamespaceDelimiter);
if (m.matches()) {
throw new IllegalArgumentException(
"Can't use " + rewriteNamespaceDelimiter + "as delimiter, "
+ "because delimiter must contain characters which name of namespace not allowed"
);
}
this.rewriteNamespaceDelimiter = rewriteNamespaceDelimiter;
return this;
}

public boolean getNamespaceDelimiterRewriteEnable() {
return namespaceDelimiterRewriteEnable;
}

@Config("pulsar.namespace-delimiter-rewrite-enable")
public PulsarConnectorConfig setNamespaceDelimiterRewriteEnable(boolean namespaceDelimiterRewriteEnable) {
this.namespaceDelimiterRewriteEnable = namespaceDelimiterRewriteEnable;
return this;
}

/**** --- Ledger Offloading --- ****/

public int getManagedLedgerOffloadMaxThreads() {
Expand Down Expand Up @@ -197,37 +231,80 @@ public PulsarConnectorConfig setOffloaderProperties(String offloaderProperties)
return this;
}

public String getRewriteNamespaceDelimiter() {
return rewriteNamespaceDelimiter;
/**** --- Authentication --- ****/

public String getAuthPlugin() {
return this.authPluginClassName;
}

@Config("pulsar.rewrite-namespace-delimiter")
public PulsarConnectorConfig setRewriteNamespaceDelimiter(String rewriteNamespaceDelimiter) {
Matcher m = NamedEntity.NAMED_ENTITY_PATTERN.matcher(rewriteNamespaceDelimiter);
if (m.matches()) {
throw new IllegalArgumentException(
"Can't use " + rewriteNamespaceDelimiter + "as delimiter, "
+ "because delimiter must contain characters which name of namespace not allowed"
);
}
this.rewriteNamespaceDelimiter = rewriteNamespaceDelimiter;
@Config("pulsar.auth-plugin")
public PulsarConnectorConfig setAuthPlugin(String authPluginClassName) throws IOException {
this.authPluginClassName = authPluginClassName;
return this;
}

public boolean getNamespaceDelimiterRewriteEnable() {
return namespaceDelimiterRewriteEnable;
public String getAuthParams() {
return this.authParams;
}

@Config("pulsar.namespace-delimiter-rewrite-enable")
public PulsarConnectorConfig setNamespaceDelimiterRewriteEnable(boolean namespaceDelimiterRewriteEnable) {
this.namespaceDelimiterRewriteEnable = namespaceDelimiterRewriteEnable;
@Config("pulsar.auth-params")
public PulsarConnectorConfig setAuthParams(String authParams) throws IOException {
this.authParams = authParams;
return this;
}

public Boolean isTlsAllowInsecureConnection() {
return tlsAllowInsecureConnection;
}

@Config("pulsar.tls-allow-insecure-connection")
public PulsarConnectorConfig setTlsAllowInsecureConnection(boolean tlsAllowInsecureConnection) {
this.tlsAllowInsecureConnection = tlsAllowInsecureConnection;
return this;
}

public Boolean isTlsHostnameVerificationEnable() {
return tlsHostnameVerificationEnable;
}

@Config("pulsar.tls-hostname-verification-enable")
public PulsarConnectorConfig setTlsHostnameVerificationEnable(boolean tlsHostnameVerificationEnable) {
this.tlsHostnameVerificationEnable = tlsHostnameVerificationEnable;
return this;
}

public String getTlsTrustCertsFilePath() {
return tlsTrustCertsFilePath;
}

@Config("pulsar.tls-trust-cert-file-path")
public PulsarConnectorConfig setTlsTrustCertsFilePath(String tlsTrustCertsFilePath) {
this.tlsTrustCertsFilePath = tlsTrustCertsFilePath;
return this;
}

@NotNull
public PulsarAdmin getPulsarAdmin() throws PulsarClientException {
if (this.pulsarAdmin == null) {
this.pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(getBrokerServiceUrl()).build();
PulsarAdminBuilder builder = PulsarAdmin.builder();

if (getAuthPlugin() != null) {
builder.authentication(getAuthPlugin(), getAuthParams());
}

if (isTlsAllowInsecureConnection() != null) {
builder.allowTlsInsecureConnection(isTlsAllowInsecureConnection());
}

if (isTlsHostnameVerificationEnable() != null) {
builder.enableTlsHostnameVerification(isTlsHostnameVerificationEnable());
}

if (getTlsTrustCertsFilePath() != null) {
builder.tlsTrustCertsFilePath(getTlsTrustCertsFilePath());
}

this.pulsarAdmin = builder.serviceHttpUrl(getBrokerServiceUrl()).build();
}
return this.pulsarAdmin;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@

import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.spi.StandardErrorCode.QUERY_REJECTED;
import static com.facebook.presto.spi.type.DateType.DATE;
import static com.facebook.presto.spi.type.TimeType.TIME;
import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP;
Expand Down Expand Up @@ -116,6 +117,9 @@ public List<String> listSchemaNames(ConnectorSession session) {
rewriteNamespaceDelimiterIfNeeded(namespace, pulsarConnectorConfig)).collect(Collectors.toList()));
}
} catch (PulsarAdminException e) {
if (e.getStatusCode() == 401) {
throw new PrestoException(QUERY_REJECTED, "Failed to get schemas from pulsar: Unauthorized");
}
throw new RuntimeException("Failed to get schemas from pulsar: "
+ ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
}
Expand Down Expand Up @@ -174,6 +178,9 @@ public List<SchemaTableName> listTables(ConnectorSession session, String schemaN
if (e.getStatusCode() == 404) {
log.warn("Schema " + schemaNameOrNull + " does not exsit");
return builder.build();
} else if (e.getStatusCode() == 401) {
throw new PrestoException(QUERY_REJECTED,
String.format("Failed to get tables/topics in %s: Unauthorized", schemaNameOrNull));
}
throw new RuntimeException("Failed to get tables/topics in " + schemaNameOrNull + ": "
+ ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
Expand Down Expand Up @@ -277,6 +284,9 @@ private ConnectorTableMetadata getTableMetadata(SchemaTableName schemaTableName,
} catch (PulsarAdminException e) {
if (e.getStatusCode() == 404) {
throw new PrestoException(NOT_FOUND, "Schema " + namespace + " does not exist");
} else if (e.getStatusCode() == 401) {
throw new PrestoException(QUERY_REJECTED,
String.format("Failed to get topics in schema %s: Unauthorized", namespace));
}
throw new RuntimeException("Failed to get topics in schema " + namespace
+ ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
Expand All @@ -297,8 +307,13 @@ private ConnectorTableMetadata getTableMetadata(SchemaTableName schemaTableName,
if (e.getStatusCode() == 404) {
// to indicate that we can't read from topic because there is no schema
return null;
} else if (e.getStatusCode() == 401) {
throw new PrestoException(QUERY_REJECTED,
String.format("Failed to get pulsar topic schema information for topic %s/%s: Unauthorized",
namespace, schemaTableName.getTableName()));
}
throw new RuntimeException("Failed to get schema information for topic "

throw new RuntimeException("Failed to get pulsar topic schema information for topic "
+ String.format("%s/%s", namespace, schemaTableName.getTableName())
+ ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.ConnectorTableLayoutHandle;
import com.facebook.presto.spi.FixedSplitSource;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.connector.ConnectorSplitManager;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.facebook.presto.spi.predicate.Domain;
import com.facebook.presto.spi.predicate.Range;
import com.facebook.presto.spi.predicate.TupleDomain;
import com.facebook.presto.spi.type.SqlTimestampWithTimeZone;
import com.google.common.annotations.VisibleForTesting;
import io.airlift.log.Logger;
import lombok.Data;
Expand Down Expand Up @@ -57,6 +57,7 @@
import java.util.LinkedList;
import java.util.List;

import static com.facebook.presto.spi.StandardErrorCode.QUERY_REJECTED;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
import static org.apache.bookkeeper.mledger.ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries;
Expand Down Expand Up @@ -105,7 +106,13 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHand
schemaInfo = this.pulsarAdmin.schemas().getSchemaInfo(
String.format("%s/%s", namespace, tableHandle.getTableName()));
} catch (PulsarAdminException e) {
throw new RuntimeException("Failed to get schema for topic "
if (e.getStatusCode() == 401) {
throw new PrestoException(QUERY_REJECTED,
String.format("Failed to get pulsar topic schema for topic %s/%s: Unauthorized",
namespace, tableHandle.getTableName()));
}

throw new RuntimeException("Failed to get pulsar topic schema for topic "
+ String.format("%s/%s", namespace, tableHandle.getTableName())
+ ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
}
Expand Down Expand Up @@ -143,6 +150,11 @@ Collection<PulsarSplit> getSplitsPartitionedTopic(int numSplits, TopicName topic
try {
numPartitions = (this.pulsarAdmin.topics().getPartitionedTopicMetadata(topicName.toString())).partitions;
} catch (PulsarAdminException e) {
if (e.getStatusCode() == 401) {
throw new PrestoException(QUERY_REJECTED,
String.format("Failed to get metadata for partitioned topic %s: Unauthorized", topicName));
}

throw new RuntimeException("Failed to get metadata for partitioned topic "
+ topicName + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(),e);
}
Expand Down

0 comments on commit 075f28b

Please sign in to comment.