Skip to content

Commit

Permalink
[hotfix][yarn][test] Avoid using Mockito for Container in YarnResourc…
Browse files Browse the repository at this point in the history
…eManagerTest.
  • Loading branch information
xintongsong authored and tillrohrmann committed Apr 25, 2020
1 parent 8882b86 commit bdd3978
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.yarn;

import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;

/**
* A {@link Container} implementation for testing.
*/
class TestingContainer extends ContainerPBImpl {
private final ContainerId containerId;
private final NodeId nodeId;
private final Resource resource;
private final Priority priority;

TestingContainer(
final ContainerId containerId,
final NodeId nodeId,
final Resource resource,
final Priority priority) {

this.containerId = containerId;
this.nodeId = nodeId;
this.resource = resource;
this.priority = priority;
}

@Override
public ContainerId getId() {
return containerId;
}

@Override
public NodeId getNodeId() {
return nodeId;
}

@Override
public Resource getResource() {
return resource;
}

@Override
public Priority getPriority() {
return priority;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -332,26 +332,16 @@ void verifyContainerHasBeenStarted(Container testingContainer) {
void verifyContainerHasBeenRequested() {
verify(mockResourceManagerClient, VERIFICATION_TIMEOUT).addContainerRequest(any(AMRMClient.ContainerRequest.class));
}
}

private static Container mockContainer(String host, int port, int containerId, Resource resource) {
Container mockContainer = mock(Container.class);

NodeId mockNodeId = NodeId.newInstance(host, port);
ContainerId mockContainerId = ContainerId.newInstance(
ApplicationAttemptId.newInstance(
ApplicationId.newInstance(System.currentTimeMillis(), 1),
1
),
containerId
);

when(mockContainer.getId()).thenReturn(mockContainerId);
when(mockContainer.getNodeId()).thenReturn(mockNodeId);
when(mockContainer.getResource()).thenReturn(resource);
when(mockContainer.getPriority()).thenReturn(Priority.UNDEFINED);

return mockContainer;
Container createTestingContainer() {
final ContainerId containerId = ContainerId.newInstance(
ApplicationAttemptId.newInstance(
ApplicationId.newInstance(System.currentTimeMillis(), 1),
1),
1);
final NodeId nodeId = NodeId.newInstance("container", 1234);
return new TestingContainer(containerId, nodeId, resourceManager.getContainerResource(), Priority.UNDEFINED);
}
}

private static ContainerStatus mockContainerStatus(ContainerId containerId) {
Expand Down Expand Up @@ -388,7 +378,7 @@ public void testStopWorker() throws Exception {
registerSlotRequest(resourceManager, rmServices, resourceProfile1, taskHost);

// Callback from YARN when container is allocated.
Container testingContainer = mockContainer("container", 1234, 1, resourceManager.getContainerResource());
Container testingContainer = createTestingContainer();

doReturn(Collections.singletonList(Collections.singletonList(resourceManager.getContainerRequest())))
.when(mockResourceManagerClient).getMatchingRequests(any(Priority.class), anyString(), any(Resource.class));
Expand Down Expand Up @@ -486,7 +476,7 @@ public void testOnContainerCompleted() throws Exception {
registerSlotRequest(resourceManager, rmServices, resourceProfile1, taskHost);

// Callback from YARN when container is allocated.
Container testingContainer = mockContainer("container", 1234, 1, resourceManager.getContainerResource());
Container testingContainer = createTestingContainer();

doReturn(Collections.singletonList(Collections.singletonList(resourceManager.getContainerRequest())))
.when(mockResourceManagerClient).getMatchingRequests(any(Priority.class), anyString(), any(Resource.class));
Expand Down Expand Up @@ -515,7 +505,7 @@ public void testOnStartContainerError() throws Exception {
new Context() {{
runTest(() -> {
registerSlotRequest(resourceManager, rmServices, resourceProfile1, taskHost);
Container testingContainer = mockContainer("container", 1234, 1, resourceManager.getContainerResource());
Container testingContainer = createTestingContainer();

doReturn(Collections.singletonList(Collections.singletonList(resourceManager.getContainerRequest())))
.when(mockResourceManagerClient).getMatchingRequests(any(Priority.class), anyString(), any(Resource.class));
Expand Down

0 comments on commit bdd3978

Please sign in to comment.