Skip to content

Commit

Permalink
Fixing bug where proxy user from flow parameter was not included (azk…
Browse files Browse the repository at this point in the history
…aban#3306)

* Fixing bug where proxy user from flow parameter was not included

* Removing variable instances of USER_TO_PROXY

* Added separate unit test for flow param and moved check to allow unit testing

---------

Co-authored-by: Utkarsh Kattishettar <[email protected]>
  • Loading branch information
utk-12 and Utkarsh Kattishettar authored Jun 26, 2023
1 parent 18f5a4d commit 20fc1c1
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,13 @@ public static int getFlowNameHashValMapping(ExecutableFlow flow) {
}

public static Set<String> getProxyUsersForFlow(final ProjectManager projectManager,
final ExecutableFlow flow) {
final ExecutableFlow flow, final Map<String, String> flowParam) {
final Set<String> proxyUsers = new HashSet<>();
// First add the proxy user from the top level flow parameter. Adding this here as individual job
// may or may not override this.
if (flowParam != null && flowParam.containsKey(USER_TO_PROXY)) {
proxyUsers.add(flowParam.get(USER_TO_PROXY));
}
// Get the project and flow Object that needs to be used repeatedly in the DAG.
Project project = projectManager.getProject(flow.getProjectId());
Flow flowObj = project.getFlow(flow.getFlowId());
Expand Down Expand Up @@ -131,6 +136,8 @@ public static Set<String> getProxyUsersForFlow(final ProjectManager projectManag
// DFS Walk of the Graph to find all the Proxy Users.
populateProxyUsersForFlow(flow, flowObj, project, projectManager, proxyUsers);
proxyUsers.removeAll(Collections.singleton(""));
// Removing instances of variables as USER_TO_PROXY
proxyUsers.removeIf(user -> user.contains("$"));
return proxyUsers;
}

Expand Down Expand Up @@ -199,4 +206,4 @@ public static Set<String> getJobTypeUsersForFlow(HashMap<String, String> jobType
}
return jobTypeProxyUserSet;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package azkaban.executor.container;

import static azkaban.Constants.JobProperties.*;
import static azkaban.executor.ExecutionControllerUtils.clusterQualifiedExecId;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -1172,18 +1173,18 @@ private void createPod(final int executionId)
Boolean.parseBoolean(flowParam.get(FlowParameters.PROXY_USER_PREFETCH_ALL));
/*
As an optimization for not walking through the DAG and making DB queries for large DAGs we
try to check if the proxy users list from the project page is less then a certain threshold.
try to check if the proxy users list from the project page is less than a certain threshold.
If it is less than this threshold, we simply add all of them. This configuration will be
custom to a given environment. An option to fetch all of them via the project page is
also provided with a flow parameter.
*/

if (prefetchAllProxyUserCredentials || flow.getProxyUsers().size() <= this.proxyUserPrefetchThreshold ) {
if (prefetchAllProxyUserCredentials || flow.getProxyUsers().size() <= this.proxyUserPrefetchThreshold) {
logger.info("Fetched proxy users from permissions page");
proxyUsersMap.addAll(flow.getProxyUsers());
} else{
Instant proxyUserFetchStartTime = Instant.now();
proxyUsersMap.addAll(ContainerImplUtils.getProxyUsersForFlow(this.projectManager, flow));
proxyUsersMap.addAll(ContainerImplUtils.getProxyUsersForFlow(this.projectManager, flow, flowParam));
logger.info("Fetching proxy users from DAG and took: {} seconds",
Duration.between(proxyUserFetchStartTime, Instant.now()).getSeconds());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
import static azkaban.Constants.EventReporterConstants.FLOW_STATUS;
import static azkaban.Constants.EventReporterConstants.VERSION_SET;
import static azkaban.ServiceProvider.SERVICE_PROVIDER;
import static azkaban.executor.container.ContainerImplUtils.getJobTypeUsersForFlow;
import static azkaban.executor.container.ContainerImplUtils.populateProxyUsersForFlow;
import static azkaban.executor.container.ContainerImplUtils.*;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -630,6 +629,34 @@ public void testVersionSetConstructionWithRampupManager() throws Exception {
Assert.assertEquals("6.4", versionSet.getVersion("dependency1").get().getVersion());
}

@Test
public void testPopulatingProxyUsersFromFlowParamOnly() throws Exception {
final ExecutableFlow flow = createTestFlow();
flow.setProjectId(1);
ProjectManager projectManager = mock(ProjectManager.class);
Project project = mock(Project.class);
Flow flowObj = mock(Flow.class);
when(flowObj.toString()).thenReturn(flow.getFlowName());

ExecutableNode node1 = new ExecutableNode();
node1.setId("node1");
node1.setJobSource("job1");
node1.setStatus(Status.PREPARING);
Props currentNodeProps1 = new Props();
Props currentNodeJobProps1 = new Props();

when(projectManager.getProject(flow.getProjectId())).thenReturn(project);
when(project.getFlow(flow.getFlowId())).thenReturn(flowObj);
when(projectManager.getProperties(project, flowObj, node1.getId(), node1.getJobSource()))
.thenReturn(currentNodeProps1);
when(projectManager.getJobOverrideProperty(project, flowObj, node1.getId(),
node1.getJobSource()))
.thenReturn(currentNodeJobProps1);
Map<String, String> flowParams = new HashMap<>();
flowParams.put("user.to.proxy", "testuser1");
Set<String> proxyUsers = new HashSet<>(getProxyUsersForFlow(projectManager, flow, flowParams));
Assert.assertTrue(proxyUsers.contains("testuser1"));
}
@Test
public void testPopulatingProxyUsersFromProject() throws Exception {
final ExecutableFlow flow = createTestFlow();
Expand Down
2 changes: 1 addition & 1 deletion azkaban-web-server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ node {
npmVersion = '5.6.0'

// Base URL for fetching node distributions (change if you have a mirror).
distBaseUrl = 'https://nodejs.org/dist'
distBaseUrl = 'https://nodejs.org/dist/'

// If true, it will download node using above parameters.
// If false, it will try to use globally installed node.
Expand Down

0 comments on commit 20fc1c1

Please sign in to comment.