Skip to content

Commit

Permalink
NIFI-8312: Support PKCS12 and BCFKS truststores in Atlas reporting task
Browse files Browse the repository at this point in the history
- Generate ssl-client.xml on NiFi side in order to be able to configure non-JKS truststores.
- Close FileOutputStream in tests to prevent error during clean-up.
- Removed generating Hadoop Credential Store.
- The credential store is not related to Atlas REST API SSL connection but would eliminate a warning from Atlas Kafka client. Removed because it caused test failure on Windows due to missing Hadoop native libraries.

This closes apache#4893

Signed-off-by: David Handermann <[email protected]>
  • Loading branch information
turcsanyip authored and exceptionfactory committed Mar 17, 2021
1 parent f9e469f commit 04cd418
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,5 +106,11 @@
<artifactId>jetty-servlet</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.xmlunit</groupId>
<artifactId>xmlunit-core</artifactId>
<version>2.8.2</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import org.apache.atlas.utils.AtlasPathExtractorUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.alias.CredentialProvider;
import org.apache.hadoop.security.alias.LocalJavaKeyStoreProvider;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.nifi.annotation.behavior.DynamicProperty;
Expand Down Expand Up @@ -71,17 +69,16 @@
import org.apache.nifi.reporting.EventAccess;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer;
import org.apache.nifi.security.util.KeystoreType;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.StringSelector;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -368,16 +365,14 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
private static final String ATLAS_PROPERTY_CLUSTER_NAME = "atlas.cluster.name";
private static final String ATLAS_PROPERTY_REST_ADDRESS = "atlas.rest.address";
private static final String ATLAS_PROPERTY_ENABLE_TLS = SecurityProperties.TLS_ENABLED;
private static final String ATLAS_PROPERTY_TRUSTSTORE_FILE = SecurityProperties.TRUSTSTORE_FILE_KEY;
private static final String ATLAS_PROPERTY_CRED_STORE_PATH = SecurityProperties.CERT_STORES_CREDENTIAL_PROVIDER_PATH;
private static final String ATLAS_KAFKA_PREFIX = "atlas.kafka.";
private static final String ATLAS_PROPERTY_KAFKA_BOOTSTRAP_SERVERS = ATLAS_KAFKA_PREFIX + "bootstrap.servers";
private static final String ATLAS_PROPERTY_KAFKA_CLIENT_ID = ATLAS_KAFKA_PREFIX + ProducerConfig.CLIENT_ID_CONFIG;

private static final String CRED_STORE_FILENAME = "atlas.jceks";
private static final String SSL_CLIENT_XML_FILENAME = SecurityProperties.SSL_CLIENT_PROPERTIES;

private static final String TRUSTSTORE_PASSWORD_ALIAS = "ssl.client.truststore.password";
private static final String SSL_CLIENT_XML_TRUSTSTORE_LOCATION = "ssl.client.truststore.location";
private static final String SSL_CLIENT_XML_TRUSTSTORE_PASSWORD = "ssl.client.truststore.password";
private static final String SSL_CLIENT_XML_TRUSTSTORE_TYPE = "ssl.client.truststore.type";

private final ServiceLoader<NamespaceResolver> namespaceResolverLoader = ServiceLoader.load(NamespaceResolver.class);
private volatile AtlasAuthN atlasAuthN;
Expand Down Expand Up @@ -704,33 +699,47 @@ private void setAtlasSSLConfig(Properties atlasProperties, ConfigurationContext
boolean isAtlasApiSecure = urls.stream().anyMatch(url -> url.toLowerCase().startsWith("https"));
atlasProperties.put(ATLAS_PROPERTY_ENABLE_TLS, String.valueOf(isAtlasApiSecure));

// ssl-client.xml must be deleted, Atlas will not regenerate it otherwise
Path credStorePath = new File(confDir, CRED_STORE_FILENAME).toPath();
Files.deleteIfExists(credStorePath);
Path sslClientXmlPath = new File(confDir, SSL_CLIENT_XML_FILENAME).toPath();
Files.deleteIfExists(sslClientXmlPath);
deleteSslClientXml(confDir);

if (isAtlasApiSecure) {
SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sslContextService == null) {
getLogger().warn("No SSLContextService configured, the system default truststore will be used.");
} else if (!sslContextService.isTrustStoreConfigured()) {
getLogger().warn("No truststore configured on SSLContextService, the system default truststore will be used.");
} else if (!KeystoreType.JKS.getType().equalsIgnoreCase(sslContextService.getTrustStoreType())) {
getLogger().warn("The configured truststore type is not supported by Atlas (not JKS), the system default truststore will be used.");
} else {
atlasProperties.put(ATLAS_PROPERTY_TRUSTSTORE_FILE, sslContextService.getTrustStoreFile());
// create ssl-client.xml config file for Hadoop Security used by Atlas REST client,
// Atlas would generate this file with hardcoded JKS keystore type,
// in order to support other keystore types, we generate it ourselves
createSslClientXml(confDir, sslContextService);
}
}
}

private void deleteSslClientXml(File confDir) throws Exception {
Path sslClientXmlPath = new File(confDir, SSL_CLIENT_XML_FILENAME).toPath();
try {
Files.deleteIfExists(sslClientXmlPath);
} catch (Exception e) {
getLogger().error("Unable to delete SSL Client Configuration File {}", sslClientXmlPath, e);
throw e;
}
}

String password = sslContextService.getTrustStorePassword();
// Hadoop Credential Provider JCEKS URI format: localjceks://file/PATH/TO/JCEKS
String credStoreUri = credStorePath.toUri().toString().replaceFirst("^file://", "localjceks://file");
private void createSslClientXml(File confDir, SSLContextService sslContextService) throws Exception {
File sslClientXmlFile = new File(confDir, SSL_CLIENT_XML_FILENAME);

CredentialProvider credentialProvider = new LocalJavaKeyStoreProvider.Factory().createProvider(new URI(credStoreUri), new Configuration());
credentialProvider.createCredentialEntry(TRUSTSTORE_PASSWORD_ALIAS, password.toCharArray());
credentialProvider.flush();
Configuration configuration = new Configuration(false);

atlasProperties.put(ATLAS_PROPERTY_CRED_STORE_PATH, credStoreUri);
}
configuration.set(SSL_CLIENT_XML_TRUSTSTORE_LOCATION, sslContextService.getTrustStoreFile());
configuration.set(SSL_CLIENT_XML_TRUSTSTORE_PASSWORD, sslContextService.getTrustStorePassword());
configuration.set(SSL_CLIENT_XML_TRUSTSTORE_TYPE, sslContextService.getTrustStoreType());

try (FileWriter fileWriter = new FileWriter(sslClientXmlFile)) {
configuration.writeXml(fileWriter);
} catch (Exception e) {
getLogger().error("Unable to create SSL Client Configuration File {}", sslClientXmlFile, e);
throw e;
}
}

Expand Down
Loading

0 comments on commit 04cd418

Please sign in to comment.