Skip to content

Commit

Permalink
[FLINK-20047][coordination] DefaultLeaderRetrievalService should only…
Browse files Browse the repository at this point in the history
… notify the LeaderRetrievalListener when leader truly changed

This closes apache#13982.
  • Loading branch information
wangyang0918 authored and tillrohrmann committed Nov 9, 2020
1 parent 7b4c97b commit 8bc6a7b
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,10 @@ public void notifyLeaderAddress(LeaderInformation leaderInformation) {

lastLeaderAddress = newLeaderAddress;
lastLeaderSessionID = newLeaderSessionID;
}

leaderListener.notifyLeaderAddress(newLeaderAddress, newLeaderSessionID);
// Notify the listener only when the leader is truly changed.
leaderListener.notifyLeaderAddress(newLeaderAddress, newLeaderSessionID);
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring notification since the {} has already been closed.", leaderRetrievalDriver);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ public interface LeaderRetrievalEventHandler {
/**
* Called by specific {@link LeaderRetrievalDriver} to notify leader address.
*
* <p>Duplicated leader change events could happen, so the implementation should check whether
* the passed leader information is truly changed with last stored leader information.
*
* @param leaderInformation the new leader information to notify {@link LeaderRetrievalService}. It could be
* {@link LeaderInformation#empty()} if the leader address does not exist in the external storage.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ public void offerToLeaderQueue(LeaderInformation leaderInformation) {
this.leader = leaderInformation;
}

public int getLeaderEventQueueSize() {
return leaderEventQueue.size();
}

/**
* Please use {@link #waitForError} before get the error.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,25 @@ public void testErrorIsIgnoredAfterBeingStop() throws Exception {
}};
}

@Test
public void testNotifyLeaderAddressOnlyWhenLeaderTrulyChanged() throws Exception {
new Context() {{
runTest(() -> {
final LeaderInformation newLeader = LeaderInformation.known(UUID.randomUUID(), TEST_URL);
testingLeaderRetrievalDriver.onUpdate(newLeader);
assertThat(testingListener.getLeaderEventQueueSize(), is(1));

// Same leader information should not be notified twice.
testingLeaderRetrievalDriver.onUpdate(newLeader);
assertThat(testingListener.getLeaderEventQueueSize(), is(1));

// Leader truly changed.
testingLeaderRetrievalDriver.onUpdate(LeaderInformation.known(UUID.randomUUID(), TEST_URL + 1));
assertThat(testingListener.getLeaderEventQueueSize(), is(2));
});
}};
}

private class Context {
private final TestingLeaderRetrievalDriver.TestingLeaderRetrievalDriverFactory leaderRetrievalDriverFactory =
new TestingLeaderRetrievalDriver.TestingLeaderRetrievalDriverFactory();
Expand Down

0 comments on commit 8bc6a7b

Please sign in to comment.