Skip to content

Commit

Permalink
YARN-11367. [Federation] Fix DefaultRequestInterceptorREST Client NPE. (
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 authored Nov 9, 2022
1 parent f68f1a4 commit b398a7b
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
package org.apache.hadoop.yarn.server.router.webapp;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;

import java.io.IOException;

/**
* Extends the RequestInterceptor class and provides common functionality which
Expand All @@ -29,6 +33,7 @@ public abstract class AbstractRESTRequestInterceptor

private Configuration conf;
private RESTRequestInterceptor nextInterceptor;
private UserGroupInformation user = null;

/**
* Sets the {@link RESTRequestInterceptor} in the chain.
Expand Down Expand Up @@ -62,9 +67,10 @@ public Configuration getConf() {
* Initializes the {@link RESTRequestInterceptor}.
*/
@Override
public void init(String user) {
public void init(String userName) {
setupUser(userName);
if (this.nextInterceptor != null) {
this.nextInterceptor.init(user);
this.nextInterceptor.init(userName);
}
}

Expand All @@ -86,4 +92,35 @@ public RESTRequestInterceptor getNextInterceptor() {
return this.nextInterceptor;
}

/**
* Set User information.
*
* If the username is empty, we will use the Yarn Router user directly.
* Do not create a proxy user if user name matches the user name on current UGI.
* @param userName userName.
*/
private void setupUser(final String userName) {
try {
if (userName == null || userName.isEmpty()) {
user = UserGroupInformation.getCurrentUser();
} else if (UserGroupInformation.isSecurityEnabled()) {
user = UserGroupInformation.createProxyUser(userName, UserGroupInformation.getLoginUser());
} else if (userName.equalsIgnoreCase(UserGroupInformation.getCurrentUser().getUserName())) {
user = UserGroupInformation.getCurrentUser();
} else {
user = UserGroupInformation.createProxyUser(userName,
UserGroupInformation.getCurrentUser());
}
} catch (IOException e) {
String message = "Error while creating Router RMAdmin Service for user:";
if (user != null) {
message += ", user: " + user;
}
throw new YarnRuntimeException(message, e);
}
}

public UserGroupInformation getUser() {
return user;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import javax.ws.rs.core.Response;

import com.sun.jersey.api.client.Client;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
Expand Down Expand Up @@ -603,6 +604,11 @@ public Response signalToContainer(String containerId, String command,
null, getConf(), client);
}

@VisibleForTesting
public Client getClient() {
return client;
}

@Override
public NodeLabelsInfo getRMNodeLabels(HttpServletRequest hsr) {
return RouterWebServiceUtil.genericForward(webAppAddress, hsr,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {

@Override
public void init(String user) {

super.init(user);

federationFacade = FederationStateStoreFacade.getInstance();
rand = new Random();

Expand Down Expand Up @@ -239,7 +242,8 @@ private DefaultRequestInterceptorREST createInterceptorForSubCluster(
.isAssignableFrom(interceptorClass)) {
interceptorInstance = (DefaultRequestInterceptorREST) ReflectionUtils
.newInstance(interceptorClass, conf);

String userName = getUser().getUserName();
interceptorInstance.init(userName);
} else {
throw new YarnRuntimeException(
"Class: " + interceptorClassName + " not instance of "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1257,4 +1257,23 @@ private HttpServletRequest mockHttpServletRequestByUserName(String username) {
when(mockHsr.getUserPrincipal()).thenReturn(principal);
return mockHsr;
}

@Test
public void testCheckFederationInterceptorRESTClient() {
SubClusterId subClusterId = SubClusterId.newInstance("SC-1");
String webAppSocket = "SC-1:WebAddress";
String webAppAddress = "http://" + webAppSocket;

Configuration configuration = new Configuration();
FederationInterceptorREST rest = new FederationInterceptorREST();
rest.setConf(configuration);
rest.init("router");

DefaultRequestInterceptorREST interceptorREST =
rest.getOrCreateInterceptorForSubCluster(subClusterId, webAppSocket);

Assert.assertNotNull(interceptorREST);
Assert.assertNotNull(interceptorREST.getClient());
Assert.assertEquals(webAppAddress, interceptorREST.getWebAppAddress());
}
}

0 comments on commit b398a7b

Please sign in to comment.