Skip to content

Commit

Permalink
PIP-45: Revalidate leader election after session is recovered (apache…
Browse files Browse the repository at this point in the history
…#10457)

* PIP-45: Revalidate leader election after session is recovered

* Fixed TopicOwnerTest

* Addresses comments
  • Loading branch information
merlimat authored May 4, 2021
1 parent 26118e6 commit 2698946
Show file tree
Hide file tree
Showing 4 changed files with 227 additions and 231 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
Expand All @@ -59,6 +61,7 @@
import org.apache.zookeeper.server.ByteBufferInputStream;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.server.SessionTracker;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.mockito.stubbing.Answer;
import org.powermock.reflect.Whitebox;
Expand Down Expand Up @@ -163,14 +166,14 @@ private MutableObject<PulsarService> spyLeaderNamespaceServiceForAuthorizedBroke
return leaderAuthorizedBroker;
}

private CompletableFuture<Void> watchZookeeperReconnect(ZooKeeper zooKeeper) throws Exception {
private CompletableFuture<Void> watchMetadataStoreReconnect(MetadataStoreExtended store) {
CompletableFuture<Void> reconnectedFuture = new CompletableFuture<>();
zooKeeper.exists("/", (WatchedEvent event) -> {
Watcher.Event.KeeperState state = event.getState();
if (state == Watcher.Event.KeeperState.SyncConnected) {
store.registerSessionListener(event -> {
if (event == SessionEvent.Reconnected || event == SessionEvent.SessionReestablished) {
reconnectedFuture.complete(null);
}
});

return reconnectedFuture;
}

Expand Down Expand Up @@ -238,103 +241,6 @@ private void spyZookeeperToDisconnectAfterPersist(ZooKeeper zooKeeper, RequestMa
}).when(spyZooKeeperServer).submitRequest(any(Request.class));
}

@Test
public void testAcquireOwnershipWithZookeeperDisconnectedBeforeOwnershipNodeCreated() throws Exception {
String topic1 = "persistent://my-tenant/my-ns/topic-1";
NamespaceService leaderNamespaceService = leaderPulsar.getNamespaceService();
NamespaceBundle namespaceBundle = leaderNamespaceService.getBundle(TopicName.get(topic1));

final MutableObject<PulsarService> leaderAuthorizedBroker = spyLeaderNamespaceServiceForAuthorizedBroker();

PulsarService pulsar1 = pulsarServices[1];
final ZooKeeper zooKeeper1 = pulsar1.getZkClient();

final CompletableFuture<Void> reconnectedFuture = watchZookeeperReconnect(zooKeeper1);

String namespaceBundlePath = ServiceUnitZkUtils.path(namespaceBundle);

spyZookeeperToDisconnectBeforePersist(zooKeeper1, request -> {
if (request.type != ZooDefs.OpCode.create) {
return false;
}

CreateRequest createRequest = new CreateRequest();
ByteBufferInputStream.byteBuffer2Record(request.request.duplicate(), createRequest);
return createRequest.getPath().contains(namespaceBundlePath);
});

leaderAuthorizedBroker.setValue(pulsar1);

try {
// Trigger ownership acquiring and zookeeper disconnecting before ownership node created.
//
// Ignore its execution result since whether it is fail or not depends on concrete implementation.
pulsarAdmins[1].lookups().lookupTopic(topic1);
} catch (Exception ex) {
// Ignored intentionally.
}

reconnectedFuture.join();

// We don't known whether previous lookup was successful or not, but now all lookups should succeed.
Assert.assertEquals(pulsarAdmins[0].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl());
Assert.assertEquals(pulsarAdmins[2].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl());
Assert.assertEquals(pulsarAdmins[3].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl());
Assert.assertEquals(pulsarAdmins[4].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl());

pulsar1.getBrokerService().getTopic(topic1, true).join();

Assert.assertEquals(pulsarAdmins[1].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl());
}

@Test
public void testAcquireOwnershipWithZookeeperDisconnectedAfterOwnershipNodeCreated() throws Exception {
String topic1 = "persistent://my-tenant/my-ns/topic-1";
NamespaceService leaderNamespaceService = leaderPulsar.getNamespaceService();
NamespaceBundle namespaceBundle = leaderNamespaceService.getBundle(TopicName.get(topic1));

final MutableObject<PulsarService> leaderAuthorizedBroker = spyLeaderNamespaceServiceForAuthorizedBroker();

PulsarService pulsar1 = pulsarServices[1];
final ZooKeeper zooKeeper1 = pulsar1.getZkClient();

final CompletableFuture<Void> reconnectedFuture = watchZookeeperReconnect(zooKeeper1);

String namespaceBundlePath = ServiceUnitZkUtils.path(namespaceBundle);

spyZookeeperToDisconnectAfterPersist(zooKeeper1, request -> {
if (request.type != ZooDefs.OpCode.create) {
return false;
}

CreateRequest createRequest = new CreateRequest();
ByteBufferInputStream.byteBuffer2Record(request.request.duplicate(), createRequest);
return createRequest.getPath().contains(namespaceBundlePath);
});

leaderAuthorizedBroker.setValue(pulsar1);

try {
// Trigger ownership acquiring and zookeeper disconnecting after ownership node created.
//
// Ignore its execution result since whether it is fail or not depends on concrete implementation.
pulsarAdmins[1].lookups().lookupTopic(topic1);
} catch (Exception ex) {
// Ignored intentionally.
}

reconnectedFuture.join();

Assert.assertEquals(pulsarAdmins[0].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl());
Assert.assertEquals(pulsarAdmins[2].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl());
Assert.assertEquals(pulsarAdmins[3].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl());
Assert.assertEquals(pulsarAdmins[4].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl());

pulsar1.getBrokerService().getTopic(topic1, true).join();

Assert.assertEquals(pulsarAdmins[1].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl());
}

@Test
public void testReestablishOwnershipAfterInvalidateCache() throws Exception {
String topic1 = "persistent://my-tenant/my-ns/topic-1";
Expand Down Expand Up @@ -388,112 +294,6 @@ public void testReestablishOwnershipAfterInvalidateCache() throws Exception {
Assert.assertNotNull(ownershipCache1.getOwnedBundle(namespaceBundle));
}

@Test
public void testReleaseOwnershipWithZookeeperDisconnectedBeforeOwnershipNodeDeleted() throws Exception {
String topic1 = "persistent://my-tenant/my-ns/topic-1";
NamespaceService leaderNamespaceService = leaderPulsar.getNamespaceService();
NamespaceBundle namespaceBundle = leaderNamespaceService.getBundle(TopicName.get(topic1));

final MutableObject<PulsarService> leaderAuthorizedBroker = spyLeaderNamespaceServiceForAuthorizedBroker();

PulsarService pulsar1 = pulsarServices[1];
PulsarService pulsar2 = pulsarServices[2];

leaderAuthorizedBroker.setValue(pulsar1);
Assert.assertEquals(pulsarAdmins[0].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl());
Assert.assertEquals(pulsarAdmins[1].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl());
Assert.assertEquals(pulsarAdmins[2].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl());
Assert.assertEquals(pulsarAdmins[3].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl());
Assert.assertEquals(pulsarAdmins[4].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl());

ZooKeeper zooKeeper1 = pulsar1.getZkClient();

CompletableFuture<Void> reconnectedFuture = watchZookeeperReconnect(zooKeeper1);

String namespaceBundlePath = ServiceUnitZkUtils.path(namespaceBundle);

spyZookeeperToDisconnectBeforePersist(zooKeeper1, request -> {
if (request.type != ZooDefs.OpCode.delete) {
return false;
}
DeleteRequest deleteRequest = new DeleteRequest();
ByteBufferInputStream.byteBuffer2Record(request.request.duplicate(), deleteRequest);
return deleteRequest.getPath().contains(namespaceBundlePath);
});

try {
pulsarAdmins[1].namespaces().unloadNamespaceBundle(namespaceBundle.getNamespaceObject().toString(), namespaceBundle.getBundleRange());
} catch (Exception ex) {
// Ignored since whether failing unloading when zk connection-loss is an implementation detail.
}

reconnectedFuture.join();

leaderAuthorizedBroker.setValue(pulsar2);

// We don't known whether previous unload was successful or not, but now all lookups should return same result.
final String currentBrokerServiceUrl = pulsarAdmins[0].lookups().lookupTopic(topic1);
Assert.assertEquals(pulsarAdmins[1].lookups().lookupTopic(topic1), currentBrokerServiceUrl);
Assert.assertEquals(pulsarAdmins[2].lookups().lookupTopic(topic1), currentBrokerServiceUrl);
Assert.assertEquals(pulsarAdmins[3].lookups().lookupTopic(topic1), currentBrokerServiceUrl);
Assert.assertEquals(pulsarAdmins[4].lookups().lookupTopic(topic1), currentBrokerServiceUrl);

pulsarAdmins[0].topics().createNonPartitionedTopic(topic1);
}

@Test
public void testReleaseOwnershipWithZookeeperDisconnectedAfterOwnershipNodeDeleted() throws Exception {
String topic1 = "persistent://my-tenant/my-ns/topic-1";
NamespaceService leaderNamespaceService = leaderPulsar.getNamespaceService();
NamespaceBundle namespaceBundle = leaderNamespaceService.getBundle(TopicName.get(topic1));

final MutableObject<PulsarService> leaderAuthorizedBroker = spyLeaderNamespaceServiceForAuthorizedBroker();

PulsarService pulsar1 = pulsarServices[1];
PulsarService pulsar2 = pulsarServices[2];

leaderAuthorizedBroker.setValue(pulsar1);
Assert.assertEquals(pulsarAdmins[0].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl());
Assert.assertEquals(pulsarAdmins[1].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl());
Assert.assertEquals(pulsarAdmins[2].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl());
Assert.assertEquals(pulsarAdmins[3].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl());
Assert.assertEquals(pulsarAdmins[4].lookups().lookupTopic(topic1), pulsar1.getBrokerServiceUrl());

ZooKeeper zooKeeper1 = pulsar1.getZkClient();

CompletableFuture<Void> reconnectedFuture = watchZookeeperReconnect(zooKeeper1);

String namespaceBundlePath = ServiceUnitZkUtils.path(namespaceBundle);

spyZookeeperToDisconnectAfterPersist(zooKeeper1, request -> {
if (request.type != ZooDefs.OpCode.delete) {
return false;
}
DeleteRequest deleteRequest = new DeleteRequest();
ByteBufferInputStream.byteBuffer2Record(request.request.duplicate(), deleteRequest);
return deleteRequest.getPath().contains(namespaceBundlePath);
});

try {
pulsarAdmins[1].namespaces().unloadNamespaceBundle(namespaceBundle.getNamespaceObject().toString(), namespaceBundle.getBundleRange());
} catch (Exception ex) {
// Ignored since whether failing unloading when zk connection-loss is an implementation detail.
}

reconnectedFuture.join();

leaderAuthorizedBroker.setValue(pulsar2);

Assert.assertEquals(pulsarAdmins[0].lookups().lookupTopic(topic1), pulsar2.getBrokerServiceUrl());
Assert.assertEquals(pulsarAdmins[3].lookups().lookupTopic(topic1), pulsar2.getBrokerServiceUrl());
Assert.assertEquals(pulsarAdmins[4].lookups().lookupTopic(topic1), pulsar2.getBrokerServiceUrl());

pulsar2.getBrokerService().getTopic(topic1, true).join();

Assert.assertEquals(pulsarAdmins[2].lookups().lookupTopic(topic1), pulsar2.getBrokerServiceUrl());
Assert.assertEquals(pulsarAdmins[1].lookups().lookupTopic(topic1), pulsar2.getBrokerServiceUrl());
}

@Test
public void testConnectToInvalidateBundleCacheBroker() throws Exception {
Assert.assertEquals(pulsarAdmins[0].namespaces().getPolicies("my-tenant/my-ns").bundles.getNumBundles(), 16);
Expand Down
Loading

0 comments on commit 2698946

Please sign in to comment.