Skip to content

Commit

Permalink
KAFKA-14133: Migrate Admin mock in TaskManagerTest to Mockito (apache…
Browse files Browse the repository at this point in the history
…#13712)

This pull requests migrates the Admin mock in TaskManagerTest from EasyMock to Mockito.
The change is restricted to a single mock to minimize the scope and make it easier for review.

Reviewers: Manyanda Chitimbo <[email protected]>, Bruno Cadonna <[email protected]>
  • Loading branch information
clolov authored Jun 13, 2023
1 parent 7556ce3 commit 7f0e455
Showing 1 changed file with 14 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.reset;
import static org.easymock.EasyMock.resetToStrict;
import static org.easymock.EasyMock.verify;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -197,7 +196,7 @@ public class TaskManagerTest {
private ActiveTaskCreator activeTaskCreator;
@Mock(type = MockType.NICE)
private StandbyTaskCreator standbyTaskCreator;
@Mock(type = MockType.NICE)
@org.mockito.Mock
private Admin adminClient;
final StateUpdater stateUpdater = Mockito.mock(StateUpdater.class);

Expand Down Expand Up @@ -3725,12 +3724,12 @@ public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {

@Test
public void shouldSendPurgeData() {
resetToStrict(adminClient);
expect(adminClient.deleteRecords(singletonMap(t1p1, RecordsToDelete.beforeOffset(5L))))
.andReturn(new DeleteRecordsResult(singletonMap(t1p1, completedFuture())));
expect(adminClient.deleteRecords(singletonMap(t1p1, RecordsToDelete.beforeOffset(17L))))
.andReturn(new DeleteRecordsResult(singletonMap(t1p1, completedFuture())));
replay(adminClient);
when(adminClient.deleteRecords(singletonMap(t1p1, RecordsToDelete.beforeOffset(5L))))
.thenReturn(new DeleteRecordsResult(singletonMap(t1p1, completedFuture())));
when(adminClient.deleteRecords(singletonMap(t1p1, RecordsToDelete.beforeOffset(17L))))
.thenReturn(new DeleteRecordsResult(singletonMap(t1p1, completedFuture())));

final InOrder inOrder = Mockito.inOrder(adminClient);

final Map<TopicPartition, Long> purgableOffsets = new HashMap<>();
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true) {
Expand All @@ -3757,16 +3756,16 @@ public Map<TopicPartition, Long> purgeableOffsets() {
purgableOffsets.put(t1p1, 17L);
taskManager.maybePurgeCommittedRecords();

verify(adminClient);
inOrder.verify(adminClient).deleteRecords(singletonMap(t1p1, RecordsToDelete.beforeOffset(5L)));
inOrder.verify(adminClient).deleteRecords(singletonMap(t1p1, RecordsToDelete.beforeOffset(17L)));
inOrder.verifyNoMoreInteractions();
}

@Test
public void shouldNotSendPurgeDataIfPreviousNotDone() {
resetToStrict(adminClient);
final KafkaFutureImpl<DeletedRecords> futureDeletedRecords = new KafkaFutureImpl<>();
expect(adminClient.deleteRecords(singletonMap(t1p1, RecordsToDelete.beforeOffset(5L))))
.andReturn(new DeleteRecordsResult(singletonMap(t1p1, futureDeletedRecords)));
replay(adminClient);
when(adminClient.deleteRecords(singletonMap(t1p1, RecordsToDelete.beforeOffset(5L))))
.thenReturn(new DeleteRecordsResult(singletonMap(t1p1, futureDeletedRecords)));

final Map<TopicPartition, Long> purgableOffsets = new HashMap<>();
final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true) {
Expand Down Expand Up @@ -3794,8 +3793,6 @@ public Map<TopicPartition, Long> purgeableOffsets() {
// so it would fail verification if we invoke the admin client again.
purgableOffsets.put(t1p1, 17L);
taskManager.maybePurgeCommittedRecords();

verify(adminClient);
}

@Test
Expand All @@ -3807,9 +3804,9 @@ public void shouldIgnorePurgeDataErrors() {
final KafkaFutureImpl<DeletedRecords> futureDeletedRecords = new KafkaFutureImpl<>();
final DeleteRecordsResult deleteRecordsResult = new DeleteRecordsResult(singletonMap(t1p1, futureDeletedRecords));
futureDeletedRecords.completeExceptionally(new Exception("KABOOM!"));
expect(adminClient.deleteRecords(anyObject())).andReturn(deleteRecordsResult).times(2);
when(adminClient.deleteRecords(any())).thenReturn(deleteRecordsResult);

replay(adminClient, consumer);
replay(consumer);

taskManager.addTask(task00);
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true));
Expand All @@ -3820,8 +3817,6 @@ public void shouldIgnorePurgeDataErrors() {

taskManager.maybePurgeCommittedRecords();
taskManager.maybePurgeCommittedRecords();

verify(adminClient);
}

@Test
Expand Down

0 comments on commit 7f0e455

Please sign in to comment.