diff --git a/azkaban-common/src/main/java/azkaban/executor/container/ContainerImplUtils.java b/azkaban-common/src/main/java/azkaban/executor/container/ContainerImplUtils.java index 784f83c206..b31f2423f1 100644 --- a/azkaban-common/src/main/java/azkaban/executor/container/ContainerImplUtils.java +++ b/azkaban-common/src/main/java/azkaban/executor/container/ContainerImplUtils.java @@ -98,8 +98,13 @@ public static int getFlowNameHashValMapping(ExecutableFlow flow) { } public static Set getProxyUsersForFlow(final ProjectManager projectManager, - final ExecutableFlow flow) { + final ExecutableFlow flow, final Map flowParam) { final Set proxyUsers = new HashSet<>(); + // First add the proxy user from the top level flow parameter. Adding this here as individual job + // may or may not override this. + if (flowParam != null && flowParam.containsKey(USER_TO_PROXY)) { + proxyUsers.add(flowParam.get(USER_TO_PROXY)); + } // Get the project and flow Object that needs to be used repeatedly in the DAG. Project project = projectManager.getProject(flow.getProjectId()); Flow flowObj = project.getFlow(flow.getFlowId()); @@ -131,6 +136,8 @@ public static Set getProxyUsersForFlow(final ProjectManager projectManag // DFS Walk of the Graph to find all the Proxy Users. populateProxyUsersForFlow(flow, flowObj, project, projectManager, proxyUsers); proxyUsers.removeAll(Collections.singleton("")); + // Removing instances of variables as USER_TO_PROXY + proxyUsers.removeIf(user -> user.contains("$")); return proxyUsers; } @@ -199,4 +206,4 @@ public static Set getJobTypeUsersForFlow(HashMap jobType } return jobTypeProxyUserSet; } -} \ No newline at end of file +} diff --git a/azkaban-common/src/main/java/azkaban/executor/container/KubernetesContainerizedImpl.java b/azkaban-common/src/main/java/azkaban/executor/container/KubernetesContainerizedImpl.java index ecd38f0fd7..446fd9b3f0 100644 --- a/azkaban-common/src/main/java/azkaban/executor/container/KubernetesContainerizedImpl.java +++ b/azkaban-common/src/main/java/azkaban/executor/container/KubernetesContainerizedImpl.java @@ -15,6 +15,7 @@ */ package azkaban.executor.container; +import static azkaban.Constants.JobProperties.*; import static azkaban.executor.ExecutionControllerUtils.clusterQualifiedExecId; import static java.util.Objects.requireNonNull; @@ -1172,18 +1173,18 @@ private void createPod(final int executionId) Boolean.parseBoolean(flowParam.get(FlowParameters.PROXY_USER_PREFETCH_ALL)); /* As an optimization for not walking through the DAG and making DB queries for large DAGs we - try to check if the proxy users list from the project page is less then a certain threshold. + try to check if the proxy users list from the project page is less than a certain threshold. If it is less than this threshold, we simply add all of them. This configuration will be custom to a given environment. An option to fetch all of them via the project page is also provided with a flow parameter. */ - if (prefetchAllProxyUserCredentials || flow.getProxyUsers().size() <= this.proxyUserPrefetchThreshold ) { + if (prefetchAllProxyUserCredentials || flow.getProxyUsers().size() <= this.proxyUserPrefetchThreshold) { logger.info("Fetched proxy users from permissions page"); proxyUsersMap.addAll(flow.getProxyUsers()); } else{ Instant proxyUserFetchStartTime = Instant.now(); - proxyUsersMap.addAll(ContainerImplUtils.getProxyUsersForFlow(this.projectManager, flow)); + proxyUsersMap.addAll(ContainerImplUtils.getProxyUsersForFlow(this.projectManager, flow, flowParam)); logger.info("Fetching proxy users from DAG and took: {} seconds", Duration.between(proxyUserFetchStartTime, Instant.now()).getSeconds()); } diff --git a/azkaban-common/src/test/java/azkaban/executor/container/KubernetesContainerizedImplTest.java b/azkaban-common/src/test/java/azkaban/executor/container/KubernetesContainerizedImplTest.java index e2594c68ee..5f5951a7cd 100644 --- a/azkaban-common/src/test/java/azkaban/executor/container/KubernetesContainerizedImplTest.java +++ b/azkaban-common/src/test/java/azkaban/executor/container/KubernetesContainerizedImplTest.java @@ -21,8 +21,7 @@ import static azkaban.Constants.EventReporterConstants.FLOW_STATUS; import static azkaban.Constants.EventReporterConstants.VERSION_SET; import static azkaban.ServiceProvider.SERVICE_PROVIDER; -import static azkaban.executor.container.ContainerImplUtils.getJobTypeUsersForFlow; -import static azkaban.executor.container.ContainerImplUtils.populateProxyUsersForFlow; +import static azkaban.executor.container.ContainerImplUtils.*; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; @@ -630,6 +629,34 @@ public void testVersionSetConstructionWithRampupManager() throws Exception { Assert.assertEquals("6.4", versionSet.getVersion("dependency1").get().getVersion()); } + @Test + public void testPopulatingProxyUsersFromFlowParamOnly() throws Exception { + final ExecutableFlow flow = createTestFlow(); + flow.setProjectId(1); + ProjectManager projectManager = mock(ProjectManager.class); + Project project = mock(Project.class); + Flow flowObj = mock(Flow.class); + when(flowObj.toString()).thenReturn(flow.getFlowName()); + + ExecutableNode node1 = new ExecutableNode(); + node1.setId("node1"); + node1.setJobSource("job1"); + node1.setStatus(Status.PREPARING); + Props currentNodeProps1 = new Props(); + Props currentNodeJobProps1 = new Props(); + + when(projectManager.getProject(flow.getProjectId())).thenReturn(project); + when(project.getFlow(flow.getFlowId())).thenReturn(flowObj); + when(projectManager.getProperties(project, flowObj, node1.getId(), node1.getJobSource())) + .thenReturn(currentNodeProps1); + when(projectManager.getJobOverrideProperty(project, flowObj, node1.getId(), + node1.getJobSource())) + .thenReturn(currentNodeJobProps1); + Map flowParams = new HashMap<>(); + flowParams.put("user.to.proxy", "testuser1"); + Set proxyUsers = new HashSet<>(getProxyUsersForFlow(projectManager, flow, flowParams)); + Assert.assertTrue(proxyUsers.contains("testuser1")); + } @Test public void testPopulatingProxyUsersFromProject() throws Exception { final ExecutableFlow flow = createTestFlow(); diff --git a/azkaban-web-server/build.gradle b/azkaban-web-server/build.gradle index fa4d47b48a..40ce12106d 100644 --- a/azkaban-web-server/build.gradle +++ b/azkaban-web-server/build.gradle @@ -26,7 +26,7 @@ node { npmVersion = '5.6.0' // Base URL for fetching node distributions (change if you have a mirror). - distBaseUrl = 'https://nodejs.org/dist' + distBaseUrl = 'https://nodejs.org/dist/' // If true, it will download node using above parameters. // If false, it will try to use globally installed node.