Skip to content

Commit

Permalink
NIFI-8978 Add KerberosUserService to DBCPConnectionPool/HadoopDBCPCon…
Browse files Browse the repository at this point in the history
…nectionPool

Signed-off-by: Pierre Villard <[email protected]>

This closes apache#5399.
  • Loading branch information
bbende authored and pvillard31 committed Sep 27, 2021
1 parent 3a8da7b commit 2ae4f90
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@
<version>1.15.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kerberos-user-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-sink-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.kerberos.KerberosUserService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
Expand Down Expand Up @@ -265,6 +266,14 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
.required(false)
.build();

public static final PropertyDescriptor KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder()
.name("kerberos-user-service")
.displayName("Kerberos User Service")
.description("Specifies the Kerberos User Controller Service that should be used for authenticating with Kerberos")
.identifiesControllerService(KerberosUserService.class)
.required(false)
.build();

public static final PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder()
.name("kerberos-principal")
.displayName("Kerberos Principal")
Expand All @@ -291,6 +300,7 @@ public class DBCPConnectionPool extends AbstractControllerService implements DBC
props.add(DATABASE_URL);
props.add(DB_DRIVERNAME);
props.add(DB_DRIVER_LOCATION);
props.add(KERBEROS_USER_SERVICE);
props.add(KERBEROS_CREDENTIALS_SERVICE);
props.add(KERBEROS_PRINCIPAL);
props.add(KERBEROS_PASSWORD);
Expand Down Expand Up @@ -359,6 +369,7 @@ protected Collection<ValidationResult> customValidate(ValidationContext context)
}

final KerberosCredentialsService kerberosCredentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);

if (kerberosCredentialsService != null && (kerberosPrincipalProvided || kerberosPasswordProvided)) {
results.add(new ValidationResult.Builder()
Expand All @@ -368,6 +379,22 @@ protected Collection<ValidationResult> customValidate(ValidationContext context)
.build());
}

if (kerberosUserService != null && (kerberosPrincipalProvided || kerberosPasswordProvided)) {
results.add(new ValidationResult.Builder()
.subject(KERBEROS_USER_SERVICE.getDisplayName())
.valid(false)
.explanation("kerberos principal/password and kerberos user service cannot be configured at the same time")
.build());
}

if (kerberosUserService != null && kerberosCredentialsService != null) {
results.add(new ValidationResult.Builder()
.subject(KERBEROS_USER_SERVICE.getDisplayName())
.valid(false)
.explanation("kerberos user service and kerberos credential service cannot be configured at the same time")
.build());
}

return results;
}

Expand Down Expand Up @@ -402,10 +429,13 @@ public void onConfigured(final ConfigurationContext context) throws Initializati
final Long minEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions());
final Long softMinEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(SOFT_MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions());
final KerberosCredentialsService kerberosCredentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
final KerberosUserService kerberosUserService = context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
final String kerberosPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
final String kerberosPassword = context.getProperty(KERBEROS_PASSWORD).getValue();

if (kerberosCredentialsService != null) {
if (kerberosUserService != null) {
kerberosUser = kerberosUserService.createKerberosUser();
} else if (kerberosCredentialsService != null) {
kerberosUser = new KerberosKeytabUser(kerberosCredentialsService.getPrincipal(), kerberosCredentialsService.getKeytab());
} else if (!StringUtils.isBlank(kerberosPrincipal) && !StringUtils.isBlank(kerberosPassword)) {
kerberosUser = new KerberosPasswordUser(kerberosPrincipal, kerberosPassword);
Expand Down Expand Up @@ -497,7 +527,7 @@ private Long extractMillisWithInfinite(PropertyValue prop) {
* no exception while closing open connections
*/
@OnDisabled
public void shutdown() throws SQLException, LoginException {
public void shutdown() throws SQLException {
try {
if (kerberosUser != null) {
kerberosUser.logout();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import static org.apache.nifi.dbcp.DBCPConnectionPool.EVICTION_RUN_PERIOD
import static org.apache.nifi.dbcp.DBCPConnectionPool.KERBEROS_CREDENTIALS_SERVICE
import static org.apache.nifi.dbcp.DBCPConnectionPool.KERBEROS_PASSWORD
import static org.apache.nifi.dbcp.DBCPConnectionPool.KERBEROS_PRINCIPAL
import static org.apache.nifi.dbcp.DBCPConnectionPool.KERBEROS_USER_SERVICE
import static org.apache.nifi.dbcp.DBCPConnectionPool.MAX_CONN_LIFETIME
import static org.apache.nifi.dbcp.DBCPConnectionPool.MAX_IDLE
import static org.apache.nifi.dbcp.DBCPConnectionPool.MAX_TOTAL_CONNECTIONS
Expand Down Expand Up @@ -306,6 +307,7 @@ class DatabaseRecordSinkTest {
when(dbContext.getProperty(MIN_EVICTABLE_IDLE_TIME)).thenReturn(new MockPropertyValue('5 sec'))
when(dbContext.getProperty(SOFT_MIN_EVICTABLE_IDLE_TIME)).thenReturn(new MockPropertyValue('5 sec'))
when(dbContext.getProperty(KERBEROS_CREDENTIALS_SERVICE)).thenReturn(new MockPropertyValue(null))
when(dbContext.getProperty(KERBEROS_USER_SERVICE)).thenReturn(new MockPropertyValue(null))
when(dbContext.getProperty(KERBEROS_PRINCIPAL)).thenReturn(new MockPropertyValue(null))
when(dbContext.getProperty(KERBEROS_PASSWORD)).thenReturn(new MockPropertyValue(null))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.nifi.dbcp;

import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.kerberos.KerberosUserService;
import org.apache.nifi.kerberos.MockKerberosCredentialsService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
Expand All @@ -39,6 +40,8 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class DBCPServiceTest {
private static final String SERVICE_ID = DBCPConnectionPool.class.getName();
Expand Down Expand Up @@ -69,6 +72,38 @@ public void setService() throws InitializationException {
runner.setProperty(service, DBCPConnectionPool.DB_DRIVERNAME, "org.apache.derby.jdbc.EmbeddedDriver");
}

@Test
public void testCustomValidateOfKerberosProperties() throws InitializationException {
// direct principal + password and no kerberos services is valid
runner.setProperty(service, DBCPConnectionPool.KERBEROS_PRINCIPAL, "[email protected]");
runner.setProperty(service, DBCPConnectionPool.KERBEROS_PASSWORD, "fooPassword");
runner.assertValid(service);

// direct principal + password with kerberos credential service is invalid
final KerberosCredentialsService kerberosCredentialsService = enabledKerberosCredentialsService(runner);
runner.setProperty(service, DBCPConnectionPool.KERBEROS_CREDENTIALS_SERVICE, kerberosCredentialsService.getIdentifier());
runner.assertNotValid(service);

// kerberos credential service by itself is valid
runner.removeProperty(service, DBCPConnectionPool.KERBEROS_PRINCIPAL);
runner.removeProperty(service, DBCPConnectionPool.KERBEROS_PASSWORD);
runner.assertValid(service);

// kerberos credential service with kerberos user service is invalid
final KerberosUserService kerberosUserService = enableKerberosUserService(runner);
runner.setProperty(service, DBCPConnectionPool.KERBEROS_USER_SERVICE, kerberosUserService.getIdentifier());
runner.assertNotValid(service);

// kerberos user service by itself is valid
runner.removeProperty(service, DBCPConnectionPool.KERBEROS_CREDENTIALS_SERVICE);
runner.assertValid(service);

// kerberos user service with direct principal + password is invalid
runner.setProperty(service, DBCPConnectionPool.KERBEROS_PRINCIPAL, "[email protected]");
runner.setProperty(service, DBCPConnectionPool.KERBEROS_PASSWORD, "fooPassword");
runner.assertNotValid(service);
}

@Test
public void testNotValidWithNegativeMinIdleProperty() {
runner.setProperty(service, DBCPConnectionPool.MIN_IDLE, "-1");
Expand Down Expand Up @@ -249,4 +284,24 @@ private void assertConnectionNotNullDynamicProperty(final String propertyName, f
assertNotNull(connection);
}
}

private KerberosUserService enableKerberosUserService(final TestRunner runner) throws InitializationException {
final KerberosUserService kerberosUserService = mock(KerberosUserService.class);
when(kerberosUserService.getIdentifier()).thenReturn("userService1");
runner.addControllerService(kerberosUserService.getIdentifier(), kerberosUserService);
runner.enableControllerService(kerberosUserService);
return kerberosUserService;
}

private KerberosCredentialsService enabledKerberosCredentialsService(final TestRunner runner) throws InitializationException {
final KerberosCredentialsService credentialsService = mock(KerberosCredentialsService.class);
when(credentialsService.getIdentifier()).thenReturn("credsService1");
when(credentialsService.getPrincipal()).thenReturn("principal1");
when(credentialsService.getKeytab()).thenReturn("keytab1");

runner.addControllerService(credentialsService.getIdentifier(), credentialsService);
runner.enableControllerService(credentialsService);
return credentialsService;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kerberos-credentials-service-api</artifactId>
<version>1.15.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kerberos-user-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.kerberos.KerberosUserService;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
Expand Down Expand Up @@ -263,6 +264,14 @@ public class HadoopDBCPConnectionPool extends AbstractControllerService implemen
.required(false)
.build();

public static final PropertyDescriptor KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder()
.name("kerberos-user-service")
.displayName("Kerberos User Service")
.description("Specifies the Kerberos User Controller Service that should be used for authenticating with Kerberos")
.identifiesControllerService(KerberosUserService.class)
.required(false)
.build();


private File kerberosConfigFile = null;
private KerberosProperties kerberosProperties;
Expand All @@ -286,6 +295,7 @@ protected void init(final ControllerServiceInitializationContext context) {
props.add(DB_DRIVERNAME);
props.add(DB_DRIVER_LOCATION);
props.add(HADOOP_CONFIGURATION_RESOURCES);
props.add(KERBEROS_USER_SERVICE);
props.add(KERBEROS_CREDENTIALS_SERVICE);
props.add(kerberosProperties.getKerberosPrincipal());
props.add(kerberosProperties.getKerberosKeytab());
Expand Down Expand Up @@ -357,6 +367,7 @@ protected Collection<ValidationResult> customValidate(ValidationContext validati
final String explicitKeytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
final String explicitPassword = validationContext.getProperty(kerberosProperties.getKerberosPassword()).getValue();
final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
final KerberosUserService kerberosUserService = validationContext.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);

final String resolvedPrincipal;
final String resolvedKeytab;
Expand All @@ -382,9 +393,15 @@ protected Collection<ValidationResult> customValidate(ValidationContext validati
}

final Configuration hadoopConfig = resources.getConfiguration();

problems.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(getClass().getSimpleName(), hadoopConfig,
resolvedPrincipal, resolvedKeytab, explicitPassword, getLogger()));
if (kerberosUserService == null) {
problems.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(getClass().getSimpleName(), hadoopConfig,
resolvedPrincipal, resolvedKeytab, explicitPassword, getLogger()));
} else {
final boolean securityEnabled = SecurityUtil.isSecurityEnabled(hadoopConfig);
if (!securityEnabled) {
getLogger().warn("Hadoop Configuration does not have security enabled, KerberosUserService will be ignored");
}
}
}

if (credentialsService != null && (explicitPrincipal != null || explicitKeytab != null || explicitPassword != null)) {
Expand All @@ -395,6 +412,22 @@ protected Collection<ValidationResult> customValidate(ValidationContext validati
.build());
}

if (kerberosUserService != null && (explicitPrincipal != null || explicitKeytab != null || explicitPassword != null)) {
problems.add(new ValidationResult.Builder()
.subject("Kerberos User")
.valid(false)
.explanation("Cannot specify a Kerberos User Service while also specifying a Kerberos Principal, Kerberos Keytab, or Kerberos Password")
.build());
}

if (kerberosUserService != null && credentialsService != null) {
problems.add(new ValidationResult.Builder()
.subject("Kerberos User")
.valid(false)
.explanation("Cannot specify a Kerberos User Service while also specifying a Kerberos Credentials Service")
.build());
}

if (!isAllowExplicitKeytab() && explicitKeytab != null) {
problems.add(new ValidationResult.Builder()
.subject("Kerberos Credentials")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.kerberos.KerberosContext;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.kerberos.KerberosUserService;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockKerberosContext;
Expand All @@ -30,6 +31,9 @@

import java.io.File;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class HadoopDBCPConnectionPoolTest {

private File krbConfFile;
Expand Down Expand Up @@ -75,7 +79,7 @@ public void testCustomValidateWhenAllowExplicitKeytab() throws InitializationExc
runner.removeProperty(hadoopDBCPService, kerberosProps.getKerberosKeytab());
runner.assertValid(hadoopDBCPService);

// Configure a KeberosCredentialService, should become invalid
// Configure a KerberosCredentialService, should become invalid
final KerberosCredentialsService kerberosCredentialsService = new MockKerberosCredentialsService(
"[email protected]", "src/test/resources/fake.keytab");
runner.addControllerService("kerb-credentials", kerberosCredentialsService);
Expand All @@ -90,6 +94,32 @@ public void testCustomValidateWhenAllowExplicitKeytab() throws InitializationExc
// Remove principal property, only using keytab service, should become valid
runner.removeProperty(hadoopDBCPService, kerberosProps.getKerberosPrincipal());
runner.assertValid(hadoopDBCPService);

// Configure KerberosUserService, should be invalid since KerberosCredentialService also configured
final KerberosUserService kerberosUserService = mock(KerberosUserService.class);
when(kerberosUserService.getIdentifier()).thenReturn("userService1");
runner.addControllerService(kerberosUserService.getIdentifier(), kerberosUserService);
runner.enableControllerService(kerberosUserService);
runner.setProperty(hadoopDBCPService, HadoopDBCPConnectionPool.KERBEROS_USER_SERVICE, kerberosUserService.getIdentifier());
runner.assertNotValid(hadoopDBCPService);

// Remove KerberosCredentialService, should be valid with only KerberosUserService
runner.removeProperty(hadoopDBCPService, HadoopDBCPConnectionPool.KERBEROS_CREDENTIALS_SERVICE);
runner.assertValid(hadoopDBCPService);

// Configure explicit principal and keytab, should be invalid while kerberos user service is set
runner.setProperty(hadoopDBCPService, kerberosProps.getKerberosPrincipal(), "[email protected]");
runner.setProperty(hadoopDBCPService, kerberosProps.getKerberosKeytab(), "src/test/resources/fake.keytab");
runner.assertNotValid(hadoopDBCPService);

// Remove explicit keytab, set explicit password, still invalid while kerberos user service set
runner.removeProperty(hadoopDBCPService, kerberosProps.getKerberosKeytab());
runner.setProperty(hadoopDBCPService, kerberosProps.getKerberosPassword(), "password");
runner.assertNotValid(hadoopDBCPService);

// Remove kerberos user service, should be valid
runner.removeProperty(hadoopDBCPService, HadoopDBCPConnectionPool.KERBEROS_USER_SERVICE);
runner.assertValid(hadoopDBCPService);
}

@Test
Expand Down

0 comments on commit 2ae4f90

Please sign in to comment.