Skip to content

Commit

Permalink
NIFI-8975 Integrate KerberosUserService into HBase processors/services
Browse files Browse the repository at this point in the history
This closes apache#5322

Signed-off-by: David Handermann <[email protected]>
  • Loading branch information
bbende authored and exceptionfactory committed Aug 27, 2021
1 parent 4ccb2b6 commit e436381
Show file tree
Hide file tree
Showing 26 changed files with 452 additions and 162 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.security.krb;

/**
* Exception thrown by KerberosUser when an error happens during login/logout.
*/
public class KerberosLoginException extends RuntimeException {

public KerberosLoginException(String message) {
super(message);
}

public KerberosLoginException(String message, Throwable cause) {
super(message, cause);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.nifi.security.krb;

import javax.security.auth.login.AppConfigurationEntry;
import javax.security.auth.login.LoginException;
import java.security.PrivilegedAction;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
Expand All @@ -30,16 +29,16 @@ public interface KerberosUser {
/**
* Performs a login for the given user.
*
* @throws LoginException if the login fails
* @throws KerberosLoginException if the login fails
*/
void login() throws LoginException;
void login();

/**
* Performs a logout for the given user.
*
* @throws LoginException if the logout fails
* @throws KerberosLoginException if the logout fails
*/
void logout() throws LoginException;
void logout();

/**
* Executes the given action as the given user.
Expand Down Expand Up @@ -107,9 +106,9 @@ default <T> T doAs(PrivilegedExceptionAction<T> action, ClassLoader contextClass
* Performs a re-login if the TGT is close to expiration.
*
* @return true if a relogin was performed, false otherwise
* @throws LoginException if the relogin fails
* @throws KerberosLoginException if the relogin fails
*/
boolean checkTGTAndRelogin() throws LoginException;
boolean checkTGTAndRelogin();

/**
* @return true if this user is currently logged in, false otherwise
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ public AbstractKerberosUser(final String principal) {
/**
* Performs a login using the specified principal and keytab.
*
* @throws LoginException if the login fails
* @throws KerberosLoginException if the login fails
*/
@Override
public synchronized void login() throws LoginException {
public synchronized void login() {
if (isLoggedIn()) {
return;
}
Expand All @@ -100,10 +100,8 @@ public synchronized void login() throws LoginException {
loginContext.login();
loggedIn.set(true);
LOGGER.debug("Successful login for {}", new Object[]{principal});
} catch (LoginException le) {
LoginException loginException = new LoginException("Unable to login with " + principal + " due to: " + le.getMessage());
loginException.setStackTrace(le.getStackTrace());
throw loginException;
} catch (final LoginException le) {
throw new KerberosLoginException("Unable to login with " + principal + " due to: " + le.getMessage(), le);
}
}

Expand Down Expand Up @@ -134,10 +132,10 @@ public AppConfigurationEntry getConfigurationEntry() {
/**
* Performs a logout of the current user.
*
* @throws LoginException if the logout fails
* @throws KerberosLoginException if the logout fails
*/
@Override
public synchronized void logout() throws LoginException {
public synchronized void logout() {
if (!isLoggedIn()) {
return;
}
Expand All @@ -148,8 +146,8 @@ public synchronized void logout() throws LoginException {
LOGGER.debug("Successful logout for {}", new Object[]{principal});

loginContext = null;
} catch (LoginException e) {
throw new LoginException("Logout failed due to: " + e.getMessage());
} catch (final LoginException e) {
throw new KerberosLoginException("Logout failed due to: " + e.getMessage(), e);
}
}

Expand Down Expand Up @@ -195,7 +193,7 @@ public <T> T doAs(final PrivilegedExceptionAction<T> action)
* @throws LoginException if an error happens performing the re-login
*/
@Override
public synchronized boolean checkTGTAndRelogin() throws LoginException {
public synchronized boolean checkTGTAndRelogin() {
final KerberosTicket tgt = getTGT();
if (tgt == null) {
LOGGER.debug("TGT for {} was not found, performing logout/login", principal);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;

import javax.security.auth.login.LoginException;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;

Expand Down Expand Up @@ -60,15 +59,15 @@ public T execute() {
try {
kerberosUser.login();
logger.info("Successful login for {}", new Object[]{kerberosUser.getPrincipal()});
} catch (LoginException e) {
} catch (final KerberosLoginException e) {
throw new ProcessException("Login failed due to: " + e.getMessage(), e);
}
}

// check if we need to re-login, will only happen if re-login window is reached (80% of TGT life)
try {
kerberosUser.checkTGTAndRelogin();
} catch (LoginException e) {
} catch (final KerberosLoginException e) {
throw new ProcessException("Relogin check failed due to: " + e.getMessage(), e);
}

Expand All @@ -79,7 +78,7 @@ public T execute() {
} else {
result = kerberosUser.doAs(action, contextClassLoader);
}
} catch (SecurityException se) {
} catch (final SecurityException se) {
logger.info("Privileged action failed, attempting relogin and retrying...");
logger.debug("", se);

Expand All @@ -90,7 +89,7 @@ public T execute() {
} catch (Exception e) {
throw new ProcessException("Retrying privileged action failed due to: " + e.getMessage(), e);
}
} catch (PrivilegedActionException pae) {
} catch (final PrivilegedActionException pae) {
final Exception cause = pae.getException();
throw new ProcessException("Privileged action failed due to: " + cause.getMessage(), cause);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.security.krb.KerberosLoginException;
import org.apache.nifi.security.krb.KerberosUser;

import javax.security.auth.Subject;
import javax.security.auth.kerberos.KerberosPrincipal;
import javax.security.auth.login.LoginException;
import java.io.IOException;
import java.security.AccessControlContext;
import java.security.AccessController;
Expand Down Expand Up @@ -110,9 +109,9 @@ public static synchronized UserGroupInformation getUgiForKerberosUser(final Conf
UserGroupInformation.setLoginUser(ugi);
return ugi;
});
} catch (PrivilegedActionException e) {
} catch (final PrivilegedActionException e) {
throw new IOException("Unable to acquire UGI for KerberosUser: " + e.getException().getLocalizedMessage(), e.getException());
} catch (LoginException e) {
} catch (final KerberosLoginException e) {
throw new IOException("Unable to acquire UGI for KerberosUser: " + e.getLocalizedMessage(), e);
}
}
Expand Down Expand Up @@ -149,7 +148,16 @@ public static boolean isSecurityEnabled(final Configuration config) {
return KERBEROS.equalsIgnoreCase(config.get(HADOOP_SECURITY_AUTHENTICATION));
}

public static <T> T callWithUgi(UserGroupInformation ugi, PrivilegedExceptionAction<T> action) throws IOException {
/**
* Helper method to execute the given action as the given user.
*
* @param ugi the user
* @param action the action
* @param <T> the result type of the action
* @return the result of the action
* @throws IOException if the action was interrupted
*/
public static <T> T callWithUgi(final UserGroupInformation ugi, final PrivilegedExceptionAction<T> action) throws IOException {
try {
T result;
if (ugi == null) {
Expand All @@ -171,19 +179,21 @@ public static <T> T callWithUgi(UserGroupInformation ugi, PrivilegedExceptionAct
}
}

public static void checkTGTAndRelogin(ComponentLog log, KerberosUser kerberosUser) {
log.trace("getting UGI instance");
if (kerberosUser != null) {
// if there's a KerberosUser associated with this UGI, check the TGT and relogin if it is close to expiring
log.debug("kerberosUser is " + kerberosUser);
try {
log.debug("checking TGT on kerberosUser " + kerberosUser);
kerberosUser.checkTGTAndRelogin();
} catch (LoginException e) {
throw new ProcessException("Unable to relogin with kerberos credentials for " + kerberosUser.getPrincipal(), e);
}
} else {
/**
* Helper method to call checkTGTAndRelogin on a given KerberosUser that may be null.
*
* @param log the logger
* @param kerberosUser the kerberos user
* @throws KerberosLoginException if an error occurs when checkTGTAndRelogin calls login or logout
*/
public static void checkTGTAndRelogin(final ComponentLog log, final KerberosUser kerberosUser) {
if (kerberosUser == null) {
log.debug("kerberosUser was null, will not refresh TGT with KerberosUser");
return;
}

log.debug("checking TGT on kerberosUser {}", kerberosUser);
kerberosUser.checkTGTAndRelogin();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,12 @@
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.krb.KerberosKeytabUser;
import org.apache.nifi.security.krb.KerberosPasswordUser;
import org.apache.nifi.security.krb.KerberosUser;

import javax.net.SocketFactory;
import javax.security.auth.login.LoginException;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
Expand Down Expand Up @@ -360,7 +358,7 @@ public final void abstractOnStopped() {
if (kerberosUser != null) {
try {
kerberosUser.logout();
} catch (LoginException e) {
} catch (final Exception e) {
getLogger().warn("Error logging out KerberosUser: {}", e.getMessage(), e);
}
}
Expand Down Expand Up @@ -623,19 +621,8 @@ protected FileSystem getFileSystem() {

protected UserGroupInformation getUserGroupInformation() {
getLogger().trace("getting UGI instance");
if (hdfsResources.get().getKerberosUser() != null) {
// if there's a KerberosUser associated with this UGI, check the TGT and relogin if it is close to expiring
KerberosUser kerberosUser = hdfsResources.get().getKerberosUser();
getLogger().debug("kerberosUser is " + kerberosUser);
try {
getLogger().debug("checking TGT on kerberosUser " + kerberosUser);
kerberosUser.checkTGTAndRelogin();
} catch (LoginException e) {
throw new ProcessException("Unable to relogin with kerberos credentials for " + kerberosUser.getPrincipal(), e);
}
} else {
getLogger().debug("kerberosUser was null, will not refresh TGT with KerberosUser");
}
// if there is a KerberosUser associated with UGI, call checkTGTAndRelogin to ensure UGI's underlying Subject has a valid ticket
SecurityUtil.checkTGTAndRelogin(getLogger(), hdfsResources.get().getKerberosUser());
return hdfsResources.get().getUserGroupInformation();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@
<version>1.15.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-kerberos-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-distributed-cache-client-service-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.krb.KerberosLoginException;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
Expand Down Expand Up @@ -221,9 +222,11 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
try {
clientService.put(entry.getKey(), entry.getValue());
successes.addAll(entry.getValue());
} catch (Exception e) {
} catch (final KerberosLoginException kle) {
getLogger().error("Failed to connect to HBase due to {}: Rolling back session, and penalizing flow files", kle, kle);
session.rollback(true);
} catch (final Exception e) {
getLogger().error(e.getMessage(), e);

for (PutFlowFile putFlowFile : entry.getValue()) {
getLogger().error("Failed to send {} to HBase due to {}; routing to failure", new Object[]{putFlowFile.getFlowFile(), e});
final FlowFile failure = session.penalize(putFlowFile.getFlowFile());
Expand All @@ -240,7 +243,6 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
final String details = "Put " + putFlowFile.getColumns().size() + " cells to HBase";
session.getProvenanceReporter().send(putFlowFile.getFlowFile(), getTransitUri(putFlowFile), details, sendMillis);
}

}

protected String getTransitUri(PutFlowFile putFlowFile) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.krb.KerberosKeytabUser;
import org.apache.nifi.security.krb.KerberosLoginException;
import org.apache.nifi.security.krb.KerberosPasswordUser;
import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.util.hive.AuthenticationFailedException;
import org.apache.nifi.util.hive.HiveConfigurator;
import org.apache.nifi.util.hive.ValidationResources;

import javax.security.auth.login.LoginException;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
Expand Down Expand Up @@ -401,7 +401,7 @@ public Connection getConnection() throws ProcessException {
try {
getLogger().debug("checking TGT on kerberosUser [{}]", new Object[]{kerberosUser});
kerberosUser.checkTGTAndRelogin();
} catch (LoginException e) {
} catch (final KerberosLoginException e) {
throw new ProcessException("Unable to relogin with kerberos credentials for " + kerberosUser.getPrincipal(), e);
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
import org.apache.nifi.processor.util.pattern.RoutingResult;
import org.apache.nifi.security.krb.KerberosKeytabUser;
import org.apache.nifi.security.krb.KerberosLoginException;
import org.apache.nifi.security.krb.KerberosPasswordUser;
import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.util.hive.AuthenticationFailedException;
Expand All @@ -75,7 +76,6 @@
import org.apache.nifi.util.hive.ValidationResources;
import org.xerial.snappy.Snappy;

import javax.security.auth.login.LoginException;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -1173,7 +1173,7 @@ UserGroupInformation getUgi() {
try {
getLogger().debug("checking TGT on kerberosUser [{}]", new Object[] {kerberosUser});
kerberosUser.checkTGTAndRelogin();
} catch (LoginException e) {
} catch (final KerberosLoginException e) {
throw new ProcessException("Unable to relogin with kerberos credentials for " + kerberosUser.getPrincipal(), e);
}
} else {
Expand Down
Loading

0 comments on commit e436381

Please sign in to comment.